首页  > 怀旧特辑

我工作中用MQ的10种场景

前言

最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?

记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?

直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。

今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。

加苏三的工作内推群

一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?

系统间的直接调用:

引入消息队列后:

接下来我们将通过10个具体场景,带大家来深入理解MQ的价值。

场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:

// 早期的紧耦合设计

public class OrderService {

private InventoryService inventoryService;

private PointsService pointsService;

private EmailService emailService;

private AnalyticsService analyticsService;

public void createOrder(Order order) {

// 1. 保存订单

orderDao.save(order);

// 2. 调用库存服务

inventoryService.updateInventory(order);

// 3. 调用积分服务

pointsService.addPoints(order.getUserId(), order.getAmount());

// 4. 发送邮件通知

emailService.sendOrderConfirmation(order);

// 5. 记录分析数据

analyticsService.trackOrderCreated(order);

// 更多服务...

}

}

这种架构存在严重问题:

紧耦合:订单服务需要知道所有下游服务

单点故障:任何一个下游服务挂掉都会导致订单创建失败

性能瓶颈:同步调用导致响应时间慢

MQ解决方案

引入MQ后,架构变为:

代码实现:

// 订单服务 - 生产者

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 1. 保存订单

orderDao.save(order);

// 2. 发送消息到MQ

rabbitTemplate.convertAndSend(

"order.exchange",

"order.created",

new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())

);

}

}

// 库存服务 - 消费者

@Component

@RabbitListener(queues = "inventory.queue")

public class InventoryConsumer {

@Autowired

private InventoryService inventoryService;

@RabbitHandler

public void handleOrderCreated(OrderCreatedEvent event) {

inventoryService.updateInventory(event.getOrderId());

}

}

技术要点

消息协议选择:根据业务需求选择RabbitMQ、Kafka或RocketMQ

消息格式:使用JSON或Protobuf等跨语言格式

错误处理:实现重试机制和死信队列

场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。

MQ解决方案

// 视频服务 - 生产者

@Service

public class VideoService {

@Autowired

private KafkaTemplate kafkaTemplate;

public UploadResponse uploadVideo(MultipartFile file, String userId) {

// 1. 保存原始视频

String videoId = saveOriginalVideo(file);

// 2. 发送处理消息

kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));

// 3. 立即返回响应

return new UploadResponse(videoId, "upload_success");

}

}

// 视频处理服务 - 消费者

@Service

public class VideoProcessingConsumer {

@KafkaListener(topics = "video-processing")

public void processVideo(VideoProcessingEvent event) {

// 异步执行耗时操作

videoProcessor.transcode(event.getVideoId());

videoProcessor.generateThumbnails(event.getVideoId());

contentModerationService.checkContent(event.getVideoId());

// 发送处理完成通知

notificationService.notifyUser(event.getUserId(), event.getVideoId());

}

}

架构优势

快速响应:用户上传后立即得到响应

弹性扩展:可以根据处理压力动态调整消费者数量

故障隔离:处理服务故障不会影响上传功能

场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。

MQ解决方案

代码实现:

// 秒杀服务

@Service

public class SecKillService {

@Autowired

private RedisTemplate redisTemplate;

@Autowired

private RabbitTemplate rabbitTemplate;

public SecKillResponse secKill(SecKillRequest request) {

// 1. 校验用户资格

if (!checkUserQualification(request.getUserId())) {

return SecKillResponse.failed("用户无资格");

}

// 2. 预减库存(Redis原子操作)

Long remaining = redisTemplate.opsForValue().decrement(

"sec_kill_stock:" + request.getItemId());

if (remaining == null || remaining < 0) {

// 库存不足,恢复库存

redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());

return SecKillResponse.failed("库存不足");

}

// 3. 发送秒杀成功消息到MQ

rabbitTemplate.convertAndSend(

"sec_kill.exchange",

"sec_kill.success",

new SecKillSuccessEvent(request.getUserId(), request.getItemId())

);

return SecKillResponse.success("秒杀成功");

}

}

// 订单处理消费者

@Component

@RabbitListener(queues = "sec_kill.order.queue")

public class SecKillOrderConsumer {

@RabbitHandler

public void handleSecKillSuccess(SecKillSuccessEvent event) {

// 异步创建订单

orderService.createSecKillOrder(event.getUserId(), event.getItemId());

}

}

技术要点

库存预扣:使用Redis原子操作避免超卖

队列缓冲:MQ缓冲请求,避免直接冲击数据库

限流控制:在网关层进行限流,拒绝过多请求

场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。

MQ解决方案

// 用户服务 - 数据变更时发送消息

@Service

public class UserService {

@Transactional

public User updateUser(User user) {

// 1. 更新数据库

userDao.update(user);

// 2. 发送消息(在事务内)

rocketMQTemplate.sendMessageInTransaction(

"user-update-topic",

MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))

.build(),

null

);

return user;

}

}

// 其他服务 - 消费用户更新消息

@Service

@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")

public class UserUpdateConsumer implements RocketMQListener {

@Override

public void onMessage(UserUpdateEvent event) {

// 更新本地用户信息缓存

orderService.updateUserCache(event.getUserId(), event.getStatus());

}

}

一致性保证

本地事务表:将消息和业务数据放在同一个数据库事务中

事务消息:使用RocketMQ的事务消息机制

幂等消费:消费者实现幂等性,避免重复处理

场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。

MQ解决方案

代码实现:

// 日志收集组件

@Component

public class LogCollector {

@Autowired

private KafkaTemplate kafkaTemplate;

public void collectLog(String appId, String level, String message, Map context) {

LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());

// 发送到Kafka

kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));

}

}

// 日志消费者

@Service

public class LogConsumer {

@KafkaListener(topics = "app-logs", groupId = "log-es")

public void consumeLog(String message) {

LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);

// 存储到Elasticsearch

elasticsearchService.indexLog(logEvent);

// 实时监控检查

if ("ERROR".equals(logEvent.getLevel())) {

alertService.checkAndAlert(logEvent);

}

}

}

技术优势

解耦:应用节点无需关心日志如何处理

缓冲:应对日志产生速率波动

多消费:同一份日志可以被多个消费者处理

场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。

MQ解决方案

// 配置服务 - 广播配置更新

@Service

public class ConfigService {

@Autowired

private RedisTemplate redisTemplate;

public void updateConfig(String configKey, String configValue) {

// 1. 更新配置存储

configDao.updateConfig(configKey, configValue);

// 2. 广播配置更新消息

redisTemplate.convertAndSend("config-update-channel",

new ConfigUpdateEvent(configKey, configValue));

}

}

// 服务节点 - 订阅配置更新

@Component

public class ConfigUpdateListener {

@Autowired

private LocalConfigCache localConfigCache;

@RedisListener(channel = "config-update-channel")

public void handleConfigUpdate(ConfigUpdateEvent event) {

// 更新本地配置缓存

localConfigCache.updateConfig(event.getKey(), event.getValue());

}

}

应用场景

功能开关:动态开启或关闭功能

参数调整:调整超时时间、限流阈值等

黑白名单:更新黑白名单配置

场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。

MQ解决方案

// 订单状态变更服务

@Service

public class OrderStateService {

@Autowired

private RocketMQTemplate rocketMQTemplate;

public void changeOrderState(String orderId, String oldState, String newState) {

OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);

// 发送顺序消息,使用orderId作为sharding key

rocketMQTemplate.syncSendOrderly(

"order-state-topic",

event,

orderId // 保证同一订单的消息按顺序处理

);

}

}

// 订单状态消费者

@Service

@RocketMQMessageListener(

topic = "order-state-topic",

consumerGroup = "order-state-group",

consumeMode = ConsumeMode.ORDERLY // 顺序消费

)

public class OrderStateConsumer implements RocketMQListener {

@Override

public void onMessage(OrderStateEvent event) {

// 按顺序处理订单状态变更

orderService.processStateChange(event);

}

}

顺序保证机制

分区顺序:同一分区内的消息保证顺序

顺序投递:MQ保证消息按发送顺序投递

顺序处理:消费者顺序处理消息

场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。

MQ解决方案

// 订单服务 - 发送延迟消息

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 保存订单

orderDao.save(order);

// 发送延迟消息,30分钟后检查支付状态

rabbitTemplate.convertAndSend(

"order.delay.exchange",

"order.create",

new OrderCreateEvent(order.getId()),

message -> {

message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟

return message;

}

);

}

}

// 订单超时检查消费者

@Component

@RabbitListener(queues = "order.delay.queue")

public class OrderTimeoutConsumer {

@RabbitHandler

public void checkOrderPayment(OrderCreateEvent event) {

Order order = orderDao.findById(event.getOrderId());

if ("UNPAID".equals(order.getStatus())) {

// 超时未支付,取消订单

orderService.cancelOrder(order.getId(), "超时未支付");

}

}

}

替代方案对比

方案

优点

缺点

数据库轮询

实现简单

实时性差,数据库压力大

延时队列

实时性好

实现复杂,消息堆积问题

定时任务

可控性强

分布式协调复杂

场景九:消息重试

背景描述

处理消息时可能遇到临时故障,需要重试机制保证最终处理成功。

MQ解决方案

// 消息消费者 with 重试机制

@Service

@Slf4j

public class RetryableConsumer {

@Autowired

private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = "business.queue")

public void processMessage(Message message, Channel channel) {

try {

// 业务处理

businessService.process(message);

// 确认消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (TemporaryException e) {

// 临时异常,重试

log.warn("处理失败,准备重试", e);

// 拒绝消息,requeue=true

channel.basicNack(

message.getMessageProperties().getDeliveryTag(),

false,

true // 重新入队

);

} catch (PermanentException e) {

// 永久异常,进入死信队列

log.error("处理失败,进入死信队列", e);

channel.basicNack(

message.getMessageProperties().getDeliveryTag(),

false,

false // 不重新入队

);

}

}

}

重试策略

立即重试:临时故障立即重试

延迟重试:逐步增加重试间隔

死信队列:最终无法处理的消息进入死信队列

场景十:事务消息

背景描述

分布式系统中,需要保证多个服务的数据一致性。

MQ解决方案

// 事务消息生产者

@Service

public class TransactionalMessageService {

@Autowired

private RocketMQTemplate rocketMQTemplate;

@Transactional

public void createOrderWithTransaction(Order order) {

// 1. 保存订单(数据库事务)

orderDao.save(order);

// 2. 发送事务消息

TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(

"order-tx-topic",

MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))

.build(),

order // 事务参数

);

if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {

throw new RuntimeException("事务消息发送失败");

}

}

}

// 事务消息监听器

@Component

@RocketMQTransactionListener

public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Autowired

private OrderDao orderDao;

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {

// 检查本地事务状态

Order order = (Order) arg;

Order existOrder = orderDao.findById(order.getId());

if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {

return RocketMQLocalTransactionState.COMMIT_MESSAGE;

} else {

return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;

}

} catch (Exception e) {

return RocketMQLocalTransactionState.UNKNOWN;

}

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

// 回查本地事务状态

String orderId = (String) msg.getHeaders().get("order_id");

Order order = orderDao.findById(orderId);

if (order != null && "CREATED".equals(order.getStatus())) {

return RocketMQLocalTransactionState.COMMIT_MESSAGE;

} else {

return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;

}

}

}

事务消息流程

总结

通过以上10个场景,我们可以总结出MQ使用的核心原则:

适用场景

异步处理:提升系统响应速度

系统解耦:降低系统间依赖

流量削峰:应对突发流量

数据同步:保证最终一致性

分布式事务:解决数据一致性问题

技术选型建议

场景

推荐MQ

原因

高吞吐

Kafka

高吞吐量,持久化存储

事务消息

RocketMQ

完整的事务消息机制

复杂路由

RabbitMQ

灵活的路由配置

延迟消息

RabbitMQ

原生支持延迟队列

最佳实践

消息幂等性:消费者必须实现幂等处理

死信队列:处理失败的消息要有兜底方案

监控告警:完善的消息堆积监控和告警

性能优化:根据业务特点调整MQ参数

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。

本文收录于我的技术网站:http://www.susan.net.cn