AI摘要

文章以订单支付为例,演示如何用 RocketMQ 事务消息替代同步调用,解决“用户已扣款但订单状态未更新”的数据不一致问题:先发送 Half 消息并记录支付流水,再经本地事务提交或回查决定消息是否生效;消费端幂等更新订单,配合重试、死信、补偿、降级、监控等手段,实现高可靠最终一致性。
我以为引入RocketMQ就能轻松解决分布式事务问题,直到那次深夜告警——用户付了钱但订单状态未更新,我才意识到分布式场景下的数据一致性远比想象中复杂。

一、问题现场:从数据不一致到业务损失

1.1 最初的同步调用架构

先看看我们最初的问题架构:

// 支付服务 - 最初的同步调用实现
@Service
@Slf4j
public class PaymentService {
    
    @Autowired
    private PaymentMapper paymentMapper;
    
    @Autowired
    private RestTemplate restTemplate;
    
    /**
     * 支付成功回调处理
     * 问题:同步调用订单服务,网络异常时数据不一致
     */
    @Transactional
    public void handlePaymentCallback(PaymentCallbackDTO callback) {
        try {
            // 1. 记录支付流水
            PaymentRecord payment = createPaymentRecord(callback);
            paymentMapper.insert(payment);
            
            // 2. 更新支付订单状态
            updatePaymentOrderStatus(callback.getOrderId(), "PAID");
            
            // 3. 同步调用订单服务更新订单状态
            String orderServiceUrl = "http://order-service/orders/" + 
                                   callback.getOrderId() + "/pay-success";
            
            ResponseEntity<String> response = restTemplate.postForEntity(
                orderServiceUrl, 
                callback, 
                String.class
            );
            
            if (!response.getStatusCode().is2xxSuccessful()) {
                throw new RuntimeException("订单状态更新失败");
            }
            
            log.info("支付成功处理完成,订单号:{}", callback.getOrderId());
            
        } catch (Exception e) {
            log.error("支付回调处理失败", e);
            // 这里抛出异常,整个事务回滚
            throw new RuntimeException("支付处理失败", e);
        }
    }
}

// 订单服务
@RestController
@RequestMapping("/orders")
public class OrderController {
    
    @PostMapping("/{orderId}/pay-success")
    public ResponseEntity<?> handlePaySuccess(@PathVariable String orderId, 
                                              @RequestBody PaymentCallbackDTO callback) {
        // 这里可能因为各种原因失败
        boolean success = orderService.updateOrderStatus(orderId, "PAID", callback.getAmount());
        
        if (!success) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
        }
        
        // 触发后续业务:扣减库存、发放优惠券等
        asyncProcessAfterPayment(orderId);
        
        return ResponseEntity.ok().build();
    }
}

1.2 问题的根源分析

image

核心问题​:第三方支付已经扣款成功,但由于订单服务调用失败,导致支付服务事务回滚,支付记录丢失。用户付了钱,但系统里没有任何记录。

二、RocketMQ事务消息:分布式事务的优雅解决方案

2.1 事务消息的基本原理

RocketMQ的事务消息机制分为两个阶段:
事务机制

2.2 事务消息的关键类解析

// RocketMQ事务消息的核心接口
public interface TransactionListener {
    
    /**
     * 执行本地事务
     * @param msg 消息
     * @param arg 业务参数
     * @return 本地事务执行结果
     */
    LocalTransactionState executeLocalTransaction(Message msg, Object arg);
    
    /**
     * 回查本地事务状态
     * @param msg 消息
     * @return 本地事务当前状态
     */
    LocalTransactionState checkLocalTransaction(MessageExt msg);
}

// 本地事务状态枚举
public enum LocalTransactionState {
    COMMIT_MESSAGE,     // 提交消息,允许消费
    ROLLBACK_MESSAGE,   // 回滚消息,删除消息
    UNKNOWN             // 未知状态,需要回查
}

三、实战:基于RocketMQ重构支付流程

3.1 支付服务改造:发送事务消息

@Service
@Slf4j
public class PaymentServiceV2 {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private PaymentMapper paymentMapper;
    
    // 支付结果回调Topic
    private static final String PAYMENT_RESULT_TOPIC = "payment-result-topic";
    
    /**
     * 支付成功回调处理 - 使用事务消息
     */
    public void handlePaymentCallback(PaymentCallbackDTO callback) {
        String transactionId = UUID.randomUUID().toString();
        
        try {
            // 1. 验证回调的合法性(防止重复处理)
            if (!validateCallback(callback)) {
                log.warn("支付回调验证失败,订单号:{}", callback.getOrderId());
                return;
            }
            
            // 2. 构建消息
            Message<PaymentResultMessage> message = MessageBuilder
                .withPayload(buildPaymentResultMessage(callback))
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .setHeader("order_id", callback.getOrderId())
                .setHeader("callback_timestamp", System.currentTimeMillis())
                .build();
            
            // 3. 发送事务消息
            // 这里的arg参数会传递给executeLocalTransaction方法
            TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                PAYMENT_RESULT_TOPIC + ":PAY_SUCCESS",  // Topic:Tag
                message,
                callback  // 作为arg传递
            );
            
            log.info("事务消息发送结果:{}, 订单号:{}", 
                    sendResult.getLocalTransactionState(), 
                    callback.getOrderId());
            
        } catch (Exception e) {
            log.error("支付回调处理异常,订单号:{}", callback.getOrderId(), e);
            // 记录到异常表,后续人工处理
            recordException(callback, transactionId, e.getMessage());
        }
    }
    
    /**
     * 构建支付结果消息
     */
    private PaymentResultMessage buildPaymentResultMessage(PaymentCallbackDTO callback) {
        PaymentResultMessage message = new PaymentResultMessage();
        message.setOrderId(callback.getOrderId());
        message.setPaymentId(callback.getPaymentId());
        message.setAmount(callback.getAmount());
        message.setPayTime(callback.getPayTime());
        message.setUserId(callback.getUserId());
        message.setPaymentMethod(callback.getPaymentMethod());
        // 添加唯一标识,用于幂等性校验
        message.setBizId(callback.getOrderId() + "_" + callback.getPayTime().getTime());
        return message;
    }
}

// 支付结果消息体
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PaymentResultMessage implements Serializable {
    private String orderId;
    private String paymentId;
    private BigDecimal amount;
    private Date payTime;
    private Long userId;
    private String paymentMethod;
    private String bizId;  // 业务唯一标识
}

3.2 事务监听器实现:本地事务和回查机制

@Component
@RocketMQTransactionListener(txProducerGroup = "payment-transaction-group")
@Slf4j
public class PaymentTransactionListenerImpl implements RocketMQLocalTransactionListener {
    
    @Autowired
    private PaymentMapper paymentMapper;
    
    @Autowired
    private DeduplicationService deduplicationService;
    
    /**
     * 执行本地事务
     * 这个方法与发送事务消息在同一个本地事务中
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        PaymentCallbackDTO callback = (PaymentCallbackDTO) arg;
        String orderId = callback.getOrderId();
        String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID", String.class);
        
        log.info("开始执行本地事务,订单号:{}, 事务ID:{}", orderId, transactionId);
        
        try {
            // 1. 幂等性校验:防止重复处理
            if (deduplicationService.isProcessed(orderId, "payment_callback")) {
                log.warn("支付回调已处理,直接返回成功,订单号:{}", orderId);
                return RocketMQLocalTransactionState.COMMIT;
            }
            
            // 2. 记录支付流水(业务唯一性校验)
            PaymentRecord existingRecord = paymentMapper.selectByOrderId(orderId);
            if (existingRecord != null && "SUCCESS".equals(existingRecord.getStatus())) {
                log.warn("支付记录已存在且成功,直接提交消息,订单号:{}", orderId);
                return RocketMQLocalTransactionState.COMMIT;
            }
            
            // 3. 创建支付记录
            PaymentRecord payment = new PaymentRecord();
            payment.setOrderId(orderId);
            payment.setPaymentId(callback.getPaymentId());
            payment.setAmount(callback.getAmount());
            payment.setStatus("PROCESSING");  // 处理中状态
            payment.setTransactionId(transactionId);
            payment.setCreateTime(new Date());
            payment.setUpdateTime(new Date());
            
            paymentMapper.insert(payment);
            log.info("支付记录创建成功,订单号:{}", orderId);
            
            // 4. 更新幂等性记录
            deduplicationService.recordProcess(orderId, "payment_callback");
            
            // 5. 这里可以添加其他本地事务操作
            // updateLocalBusiness(orderId);
            
            log.info("本地事务执行成功,提交消息,订单号:{}", orderId);
            return RocketMQLocalTransactionState.COMMIT;
            
        } catch (Exception e) {
            log.error("本地事务执行异常,回滚消息,订单号:{}, 异常:{}", orderId, e.getMessage(), e);
            
            // 记录异常信息,便于排查
            recordTransactionException(transactionId, orderId, e);
            
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    /**
     * 回查本地事务状态
     * RocketMQ会定时调用这个方法,检查本地事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getHeaders().get("order_id", String.class);
        String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID", String.class);
        
        log.info("开始回查本地事务状态,订单号:{}, 事务ID:{}", orderId, transactionId);
        
        try {
            // 1. 查询支付记录
            PaymentRecord payment = paymentMapper.selectByOrderId(orderId);
            
            if (payment == null) {
                log.warn("支付记录不存在,状态未知,订单号:{}", orderId);
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            
            // 2. 根据支付记录状态决定消息状态
            if ("SUCCESS".equals(payment.getStatus()) || "PROCESSING".equals(payment.getStatus())) {
                log.info("支付记录状态为{},提交消息,订单号:{}", payment.getStatus(), orderId);
                return RocketMQLocalTransactionState.COMMIT;
            } else if ("FAILED".equals(payment.getStatus())) {
                log.warn("支付记录状态为失败,回滚消息,订单号:{}", orderId);
                return RocketMQLocalTransactionState.ROLLBACK;
            } else {
                log.warn("支付记录状态未知:{},需要继续回查,订单号:{}", payment.getStatus(), orderId);
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            
        } catch (Exception e) {
            log.error("回查本地事务状态异常,订单号:{}", orderId, e);
            // 回查异常,继续回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    
    /**
     * 记录事务异常信息
     */
    private void recordTransactionException(String transactionId, String orderId, Exception e) {
        // 这里可以将异常信息记录到数据库,便于后续排查
        TransactionExceptionRecord record = new TransactionExceptionRecord();
        record.setTransactionId(transactionId);
        record.setOrderId(orderId);
        record.setExceptionMessage(e.getMessage());
        record.setExceptionStack(ExceptionUtils.getStackTrace(e));
        record.setCreateTime(new Date());
        
        // transactionExceptionMapper.insert(record);
    }
}

3.3 订单服务:可靠的消息消费者

@Service
@Slf4j
@RocketMQMessageListener(
    topic = "payment-result-topic",
    consumerGroup = "order-payment-consumer-group",
    selectorExpression = "PAY_SUCCESS",  // 只消费支付成功的消息
    consumeMode = ConsumeMode.CONCURRENTLY,
    messageModel = MessageModel.CLUSTERING,
    consumeThreadMax = 10  // 根据业务量调整
)
public class OrderPaymentConsumer implements RocketMQListener<PaymentResultMessage> {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private DeduplicationService deduplicationService;
    
    @Autowired
    private MetricsService metricsService;
    
    @Override
    public void onMessage(PaymentResultMessage message) {
        String orderId = message.getOrderId();
        String bizId = message.getBizId();
        
        long startTime = System.currentTimeMillis();
        
        try {
            log.info("开始处理支付成功消息,订单号:{}, bizId:{}", orderId, bizId);
            
            // 1. 幂等性校验(基于bizId)
            if (deduplicationService.isProcessed(bizId, "order_payment")) {
                log.warn("消息已处理,直接确认,bizId:{}", bizId);
                return;
            }
            
            // 2. 执行业务逻辑
            boolean success = processOrderPayment(message);
            
            if (success) {
                // 3. 记录处理成功
                deduplicationService.recordProcess(bizId, "order_payment");
                log.info("支付成功消息处理完成,订单号:{}", orderId);
                
                // 4. 记录监控指标
                metricsService.recordSuccess(orderId, "order_payment");
            } else {
                log.error("订单支付处理失败,订单号:{}", orderId);
                // 抛出异常,让RocketMQ重试
                throw new RuntimeException("订单支付处理失败");
            }
            
        } catch (Exception e) {
            long costTime = System.currentTimeMillis() - startTime;
            log.error("处理支付成功消息异常,订单号:{}, 耗时:{}ms", orderId, costTime, e);
            
            // 记录失败指标
            metricsService.recordError(orderId, "order_payment", e.getMessage());
            
            // 根据异常类型决定是否重试
            if (shouldRetry(e)) {
                throw new RuntimeException("需要重试的异常", e);
            } else {
                // 业务异常,不重试,记录到死信队列
                sendToDeadLetterQueue(message, e);
                log.warn("业务异常,不重试,消息进入死信队列,订单号:{}", orderId);
            }
            
        } finally {
            long costTime = System.currentTimeMillis() - startTime;
            metricsService.recordProcessTime("order_payment", costTime);
        }
    }
    
    /**
     * 处理订单支付逻辑
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean processOrderPayment(PaymentResultMessage message) {
        String orderId = message.getOrderId();
        
        // 1. 查询订单
        Order order = orderService.getOrderById(orderId);
        if (order == null) {
            log.error("订单不存在,订单号:{}", orderId);
            return false;
        }
        
        // 2. 校验订单状态
        if (!"UNPAID".equals(order.getStatus())) {
            log.warn("订单当前状态为{},无需处理支付,订单号:{}", order.getStatus(), orderId);
            
            // 如果订单已经是已支付状态,认为是重复消息,返回成功
            if ("PAID".equals(order.getStatus())) {
                return true;
            }
            
            // 其他状态视为异常
            return false;
        }
        
        // 3. 更新订单状态
        order.setStatus("PAID");
        order.setPayTime(message.getPayTime());
        order.setPaymentId(message.getPaymentId());
        order.setUpdateTime(new Date());
        
        boolean updateSuccess = orderService.updateOrder(order);
        if (!updateSuccess) {
            log.error("订单状态更新失败,订单号:{}", orderId);
            return false;
        }
        
        // 4. 记录订单支付流水
        OrderPaymentRecord paymentRecord = new OrderPaymentRecord();
        paymentRecord.setOrderId(orderId);
        paymentRecord.setPaymentId(message.getPaymentId());
        paymentRecord.setAmount(message.getAmount());
        paymentRecord.setPaymentMethod(message.getPaymentMethod());
        paymentRecord.setCreateTime(new Date());
        
        orderService.savePaymentRecord(paymentRecord);
        
        // 5. 触发后续业务流程(异步)
        triggerPostPaymentProcess(orderId);
        
        return true;
    }
    
    /**
     * 触发支付后处理流程
     */
    private void triggerPostPaymentProcess(String orderId) {
        try {
            // 使用异步处理,避免影响主流程
            CompletableFuture.runAsync(() -> {
                // 5.1 扣减库存
                inventoryService.deductInventory(orderId);
                
                // 5.2 发放优惠券
                couponService.grantCouponAfterPayment(orderId);
                
                // 5.3 发送通知
                notificationService.sendPaymentSuccessNotification(orderId);
                
                // 5.4 更新统计数据
                statisticsService.updatePaymentStatistics(orderId);
                
            }).exceptionally(e -> {
                log.error("支付后处理流程异常,订单号:{}", orderId, e);
                // 这里可以记录异常,但不影响主流程
                return null;
            });
            
        } catch (Exception e) {
            log.error("触发支付后处理流程异常", e);
            // 这里只记录日志,不抛异常,避免影响支付主流程
        }
    }
    
    /**
     * 判断是否需要重试
     */
    private boolean shouldRetry(Exception e) {
        // 网络异常、数据库连接异常等需要重试
        if (e instanceof TimeoutException || 
            e instanceof ConnectException ||
            e instanceof SQLException) {
            return true;
        }
        
        // 业务异常不需要重试
        if (e instanceof BusinessException) {
            return false;
        }
        
        // 默认重试
        return true;
    }
    
    /**
     * 发送到死信队列
     */
    private void sendToDeadLetterQueue(PaymentResultMessage message, Exception e) {
        // 这里可以将处理失败的消息发送到死信队列
        // 后续可以人工处理或自动补偿
        DeadLetterMessage deadLetter = new DeadLetterMessage();
        deadLetter.setOriginalMessage(message);
        deadLetter.setExceptionInfo(e.getMessage());
        deadLetter.setFailTime(new Date());
        
        // deadLetterService.send(deadLetter);
    }
}

3.4 幂等性服务实现

@Service
@Slf4j
public class DeduplicationService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private DeduplicationMapper deduplicationMapper;
    
    // Redis key前缀
    private static final String REDIS_KEY_PREFIX = "dedup:";
    // Redis过期时间:24小时
    private static final long REDIS_EXPIRE_SECONDS = 24 * 60 * 60;
    
    /**
     * 检查是否已处理
     */
    public boolean isProcessed(String bizId, String bizType) {
        String key = buildKey(bizId, bizType);
        
        // 1. 先查Redis(快速路径)
        Boolean exists = redisTemplate.hasKey(key);
        if (Boolean.TRUE.equals(exists)) {
            log.debug("Redis中已存在处理记录,bizId:{}, bizType:{}", bizId, bizType);
            return true;
        }
        
        // 2. 再查数据库(兜底)
        DeduplicationRecord record = deduplicationMapper.selectByBizIdAndType(bizId, bizType);
        if (record != null) {
            // 回写到Redis
            redisTemplate.opsForValue().set(key, "1", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
            return true;
        }
        
        return false;
    }
    
    /**
     * 记录处理完成
     */
    @Transactional
    public void recordProcess(String bizId, String bizType) {
        String key = buildKey(bizId, bizType);
        
        // 1. 写入数据库
        DeduplicationRecord record = new DeduplicationRecord();
        record.setBizId(bizId);
        record.setBizType(bizType);
        record.setCreateTime(new Date());
        
        try {
            deduplicationMapper.insert(record);
        } catch (DuplicateKeyException e) {
            // 唯一键冲突,说明已处理过
            log.warn("重复记录,bizId:{}, bizType:{}", bizId, bizType);
        }
        
        // 2. 写入Redis
        redisTemplate.opsForValue().set(key, "1", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
    }
    
    /**
     * 构建Redis key
     */
    private String buildKey(String bizId, String bizType) {
        return REDIS_KEY_PREFIX + bizType + ":" + bizId;
    }
    
    /**
     * 清理过期的幂等记录
     * 可以定时执行,清理比如30天前的记录
     */
    @Scheduled(cron = "0 0 2 * * ?")  // 每天凌晨2点执行
    public void cleanExpiredRecords() {
        Date expireDate = DateUtils.addDays(new Date(), -30);
        int deleted = deduplicationMapper.deleteBeforeDate(expireDate);
        log.info("清理过期幂等记录,数量:{}", deleted);
    }
}

四、消息可靠性保障:从生产到消费的全链路保证

4.1 生产者可靠性保障

@Component
@Slf4j
public class ReliableMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 消息发送重试次数
    private static final int MAX_RETRY_TIMES = 3;
    // 重试间隔(毫秒)
    private static final long RETRY_INTERVAL = 1000;
    
    /**
     * 可靠的消息发送(支持重试)
     */
    public SendResult sendReliably(String topic, Message<?> message) {
        int retryCount = 0;
        SendResult sendResult = null;
        Exception lastException = null;
        
        while (retryCount <= MAX_RETRY_TIMES) {
            try {
                sendResult = rocketMQTemplate.syncSend(topic, message);
                log.info("消息发送成功,消息ID:{}", sendResult.getMsgId());
                break;
                
            } catch (Exception e) {
                lastException = e;
                retryCount++;
                
                if (retryCount <= MAX_RETRY_TIMES) {
                    log.warn("消息发送失败,第{}次重试,topic:{}", retryCount, topic, e);
                    try {
                        Thread.sleep(RETRY_INTERVAL * retryCount);  // 指数退避
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        
        if (sendResult == null) {
            log.error("消息发送失败,达到最大重试次数,topic:{}", topic, lastException);
            throw new MessageSendException("消息发送失败", lastException);
        }
        
        return sendResult;
    }
    
    /**
     * 发送事务消息的可靠版本
     */
    public TransactionSendResult sendTransactionReliably(
            String topic, 
            Message<?> message, 
            Object arg) {
        
        // 记录发送前的消息
        String messageId = UUID.randomUUID().toString();
        recordMessageBeforeSend(messageId, topic, message, arg);
        
        try {
            TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, message, arg);
            
            // 记录发送结果
            recordMessageSendResult(messageId, result);
            
            return result;
            
        } catch (Exception e) {
            log.error("事务消息发送异常,messageId:{}", messageId, e);
            
            // 记录发送失败
            recordMessageSendFailure(messageId, e);
            
            throw e;
        }
    }
    
    /**
     * 记录发送前的消息(用于故障恢复)
     */
    private void recordMessageBeforeSend(String messageId, String topic, 
                                        Message<?> message, Object arg) {
        // 这里可以将消息记录到数据库,如果发送失败可以人工介入或自动补偿
        // 在实际生产环境中,这是一个重要的可靠性保障措施
    }
}

4.2 消费者可靠性保障

@Component
@Slf4j
public class MessageConsumeRecorder {
    
    @Autowired
    private MessageConsumeRecordMapper recordMapper;
    
    /**
     * 记录消息消费开始
     */
    public String recordConsumeStart(MessageExt message) {
        String recordId = UUID.randomUUID().toString();
        MessageConsumeRecord record = new MessageConsumeRecord();
        record.setId(recordId);
        record.setMsgId(message.getMsgId());
        record.setTopic(message.getTopic());
        record.setTags(message.getTags());
        record.setKeys(message.getKeys());
        record.setBody(new String(message.getBody()));
        record.setStatus("PROCESSING");
        record.setConsumeTimes(message.getReconsumeTimes() + 1);  // 重试次数
        record.setCreateTime(new Date());
        
        recordMapper.insert(record);
        return recordId;
    }
    
    /**
     * 记录消息消费完成
     */
    public void recordConsumeSuccess(String recordId) {
        MessageConsumeRecord record = new MessageConsumeRecord();
        record.setId(recordId);
        record.setStatus("SUCCESS");
        record.setUpdateTime(new Date());
        
        recordMapper.updateStatus(record);
    }
    
    /**
     * 记录消息消费失败
     */
    public void recordConsumeFailure(String recordId, String errorMsg) {
        MessageConsumeRecord record = new MessageConsumeRecord();
        record.setId(recordId);
        record.setStatus("FAILED");
        record.setErrorMsg(errorMsg);
        record.setUpdateTime(new Date());
        
        recordMapper.updateStatus(record);
    }
    
    /**
     * 检查消息是否已消费成功(用于幂等性)
     */
    public boolean isMessageConsumed(String msgId) {
        MessageConsumeRecord record = recordMapper.selectByMsgId(msgId);
        return record != null && "SUCCESS".equals(record.getStatus());
    }
}

五、监控与运维:构建可观测的消息系统

5.1 消息轨迹追踪

@Component
@Aspect
@Slf4j
public class MessageTraceAspect {
    
    @Autowired
    private MetricsService metricsService;
    
    /**
     * 监控消息发送
     */
    @Around("execution(* org.apache.rocketmq.spring.core.RocketMQTemplate.send*(..))")
    public Object traceSendMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        long startTime = System.currentTimeMillis();
        String methodName = joinPoint.getSignature().getName();
        
        try {
            Object result = joinPoint.proceed();
            long costTime = System.currentTimeMillis() - startTime;
            
            // 记录成功指标
            metricsService.recordMessageSendSuccess(methodName, costTime);
            
            if (result instanceof SendResult) {
                SendResult sendResult = (SendResult) result;
                log.info("消息发送成功,消息ID:{},耗时:{}ms", 
                        sendResult.getMsgId(), costTime);
            }
            
            return result;
            
        } catch (Exception e) {
            long costTime = System.currentTimeMillis() - startTime;
            
            // 记录失败指标
            metricsService.recordMessageSendFailure(methodName, costTime, e.getMessage());
            
            log.error("消息发送失败,方法:{},耗时:{}ms", methodName, costTime, e);
            throw e;
        }
    }
    
    /**
     * 监控消息消费
     */
    @Around("@annotation(org.apache.rocketmq.spring.annotation.RocketMQMessageListener)")
    public Object traceConsumeMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        MessageExt message = null;
        
        // 提取消息
        for (Object arg : args) {
            if (arg instanceof MessageExt) {
                message = (MessageExt) arg;
                break;
            }
        }
        
        long startTime = System.currentTimeMillis();
        String msgId = message != null ? message.getMsgId() : "unknown";
        
        try {
            Object result = joinPoint.proceed();
            long costTime = System.currentTimeMillis() - startTime;
            
            // 记录消费成功指标
            metricsService.recordMessageConsumeSuccess(msgId, costTime);
            
            log.info("消息消费成功,消息ID:{},耗时:{}ms", msgId, costTime);
            
            return result;
            
        } catch (Exception e) {
            long costTime = System.currentTimeMillis() - startTime;
            
            // 记录消费失败指标
            metricsService.recordMessageConsumeFailure(msgId, costTime, e.getMessage());
            
            log.error("消息消费失败,消息ID:{},耗时:{}ms,重试次数:{}", 
                    msgId, costTime, 
                    message != null ? message.getReconsumeTimes() : 0, e);
            
            // 根据异常类型决定是否重试
            if (shouldRetryConsume(e)) {
                throw e;  // 抛出异常,触发重试
            } else {
                // 业务异常,不重试
                log.warn("业务异常,消息不重试,消息ID:{}", msgId);
                return null;
            }
        }
    }
    
    private boolean shouldRetryConsume(Exception e) {
        // 系统异常重试,业务异常不重试
        return !(e instanceof BusinessException);
    }
}

5.2 关键监控指标

@Service
@Slf4j
public class MessageMetricsService {
    
    // 使用Micrometer记录指标
    private final MeterRegistry meterRegistry;
    
    // 计数器
    private final Counter messagesSentCounter;
    private final Counter messagesConsumedCounter;
    private final Counter messageSendErrorsCounter;
    private final Counter messageConsumeErrorsCounter;
    
    // 计时器
    private final Timer messageSendTimer;
    private final Timer messageConsumeTimer;
    
    public MessageMetricsService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 初始化计数器
        this.messagesSentCounter = Counter.builder("rocketmq.messages.sent")
            .description("发送的消息总数")
            .register(meterRegistry);
        
        this.messagesConsumedCounter = Counter.builder("rocketmq.messages.consumed")
            .description("消费的消息总数")
            .register(meterRegistry);
        
        this.messageSendErrorsCounter = Counter.builder("rocketmq.messages.send.errors")
            .description("消息发送错误数")
            .register(meterRegistry);
        
        this.messageConsumeErrorsCounter = Counter.builder("rocketmq.messages.consume.errors")
            .description("消息消费错误数")
            .register(meterRegistry);
        
        // 初始化计时器
        this.messageSendTimer = Timer.builder("rocketmq.messages.send.time")
            .description("消息发送耗时")
            .register(meterRegistry);
        
        this.messageConsumeTimer = Timer.builder("rocketmq.messages.consume.time")
            .description("消息消费耗时")
            .register(meterRegistry);
    }
    
    /**
     * 记录消息发送成功
     */
    public void recordMessageSendSuccess(String topic, long costTime) {
        messagesSentCounter.increment();
        messageSendTimer.record(costTime, TimeUnit.MILLISECONDS);
        
        // 按Topic记录
        Counter.builder("rocketmq.messages.sent")
            .tag("topic", topic)
            .register(meterRegistry)
            .increment();
    }
    
    /**
     * 记录消息消费延迟
     */
    public void recordMessageConsumeDelay(String msgId, long bornTimestamp) {
        long delay = System.currentTimeMillis() - bornTimestamp;
        
        Timer.builder("rocketmq.messages.consume.delay")
            .register(meterRegistry)
            .record(delay, TimeUnit.MILLISECONDS);
        
        // 延迟超过阈值告警
        if (delay > 30000) {  // 30秒
            log.warn("消息消费延迟过高,消息ID:{},延迟:{}ms", msgId, delay);
            // 可以发送告警通知
        }
    }
}

六、容灾与降级:应对极端情况

6.1 消息补偿机制

@Service
@Slf4j
public class MessageCompensationService {
    
    @Autowired
    private MessageConsumeRecordMapper recordMapper;
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 定时检查未消费成功的消息
     */
    @Scheduled(cron = "0 */5 * * * ?")  // 每5分钟执行一次
    public void compensateFailedMessages() {
        log.info("开始执行消息补偿任务");
        
        // 查询10分钟内处理失败的消息
        Date tenMinutesAgo = DateUtils.addMinutes(new Date(), -10);
        List<MessageConsumeRecord> failedRecords = 
            recordMapper.selectFailedRecords(tenMinutesAgo, 100);  // 每次补偿100条
        
        for (MessageConsumeRecord record : failedRecords) {
            try {
                log.info("补偿处理消息,消息ID:{},重试次数:{}", 
                        record.getMsgId(), record.getConsumeTimes());
                
                // 重新处理消息
                boolean success = reprocessMessage(record);
                
                if (success) {
                    record.setStatus("COMPENSATED");
                    record.setUpdateTime(new Date());
                    recordMapper.updateStatus(record);
                    
                    log.info("消息补偿成功,消息ID:{}", record.getMsgId());
                } else {
                    log.warn("消息补偿失败,消息ID:{}", record.getMsgId());
                }
                
            } catch (Exception e) {
                log.error("消息补偿异常,消息ID:{}", record.getMsgId(), e);
            }
        }
        
        log.info("消息补偿任务执行完成,处理数量:{}", failedRecords.size());
    }
    
    /**
     * 重新处理消息
     */
    private boolean reprocessMessage(MessageConsumeRecord record) {
        // 根据消息内容重新执行业务逻辑
        // 这里需要根据具体业务实现
        
        // 示例:处理订单支付消息
        if ("payment-result-topic".equals(record.getTopic()) && 
            "PAY_SUCCESS".equals(record.getTags())) {
            
            PaymentResultMessage message = JSON.parseObject(
                record.getBody(), PaymentResultMessage.class);
            
            return orderService.processOrderPayment(message);
        }
        
        return false;
    }
}

6.2 降级策略:消息队列不可用时的处理

@Component
@Slf4j
public class MessageQueueDegradeService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private DatabaseFallbackService databaseFallbackService;
    
    // 降级开关
    private volatile boolean degradeEnabled = false;
    // 最后一次检查时间
    private volatile long lastCheckTime = 0;
    // 检查间隔:10秒
    private static final long CHECK_INTERVAL = 10 * 1000;
    
    /**
     * 发送消息(带降级)
     */
    public SendResult sendWithDegrade(String topic, Message<?> message) {
        // 检查是否需要降级
        if (shouldDegrade()) {
            log.warn("消息队列降级中,使用数据库降级方案,topic:{}", topic);
            return sendViaDatabaseFallback(topic, message);
        }
        
        try {
            // 正常发送
            return rocketMQTemplate.syncSend(topic, message);
            
        } catch (Exception e) {
            log.error("消息发送失败,启用降级,topic:{}", topic, e);
            
            // 发送失败,启用降级
            enableDegrade();
            
            // 使用降级方案
            return sendViaDatabaseFallback(topic, message);
        }
    }
    
    /**
     * 检查是否需要降级
     */
    private boolean shouldDegrade() {
        long currentTime = System.currentTimeMillis();
        
        // 检查间隔
        if (currentTime - lastCheckTime < CHECK_INTERVAL) {
            return degradeEnabled;
        }
        
        lastCheckTime = currentTime;
        
        try {
            // 测试消息队列连通性
            rocketMQTemplate.syncSend("health-check-topic", "ping");
            degradeEnabled = false;  // 恢复
            return false;
            
        } catch (Exception e) {
            degradeEnabled = true;  // 降级
            return true;
        }
    }
    
    /**
     * 启用降级
     */
    private void enableDegrade() {
        degradeEnabled = true;
        lastCheckTime = System.currentTimeMillis();
        
        // 发送告警
        sendDegradeAlert();
    }
    
    /**
     * 数据库降级方案
     */
    private SendResult sendViaDatabaseFallback(String topic, Message<?> message) {
        try {
            // 将消息存入数据库
            databaseFallbackService.saveMessage(topic, message);
            
            // 返回模拟的SendResult
            SendResult result = new SendResult();
            result.setMsgId("DB_FALLBACK_" + UUID.randomUUID().toString());
            result.setSendStatus(SendStatus.SEND_OK);
            
            return result;
            
        } catch (Exception e) {
            log.error("数据库降级方案也失败了", e);
            throw new RuntimeException("消息发送失败且降级方案失败", e);
        }
    }
}

七、经验总结与最佳实践

通过这次支付系统的重构,我总结了以下几点经验:

7.1 消息设计原则

  1. 消息要精简​:只传递必要的信息,避免大消息
  2. 消息要自包含​:消息应该包含所有必要信息,避免消费者需要额外查询
  3. 消息要版本化​:消息结构变化时要考虑兼容性
@Data
public class PaymentResultMessageV2 implements Serializable {
    // 基础信息
    private String orderId;
    private String paymentId;
    private BigDecimal amount;
    
    // 版本控制
    private String version = "2.0";
    
    // 扩展信息(向后兼容)
    private Map<String, Object> extensions;
    
    // 兼容V1版本
    public PaymentResultMessageV1 toV1() {
        PaymentResultMessageV1 v1 = new PaymentResultMessageV1();
        v1.setOrderId(this.orderId);
        v1.setPaymentId(this.paymentId);
        v1.setAmount(this.amount);
        return v1;
    }
}

7.2 事务消息使用建议

  1. 本地事务要简单​:事务消息中的本地事务应该尽可能简单
  2. 回查逻辑要可靠​​:回查逻辑必须能够准确判断本地事务状态
  3. 超时时间要合理​:事务消息的未知状态处理超时要根据业务设置
// RocketMQ配置
@Configuration
public class RocketMQConfig {
    
    @Bean
    public RocketMQTemplate rocketMQTemplate() {
        RocketMQTemplate template = new RocketMQTemplate();
        
        // 事务消息配置
        template.setProducer(new TransactionMQProducer("payment-producer-group") {{
            // 事务回查线程池
            setExecutorService(Executors.newFixedThreadPool(10));
            // 事务超时时间:30秒
            setTransactionTimeout(30000);
            // 最大重试次数:3次
            setMaxReconsumeTimes(3);
        }});
        
        return template;
    }
}

7.3 监控告警指标

必须监控的关键指标:

  • 消息发送成功率/失败率
  • 消息消费延迟
  • 消息积压数量
  • 事务消息回查次数
  • 死信队列大小

总结

通过RocketMQ事务消息实现最终一致性,我们成功解决了支付系统中的数据不一致问题。这次重构让我深刻认识到:

  1. 没有完美的解决方案​:最终一致性是以牺牲强一致性为代价的
  2. 幂等性是基础​:在分布式系统中,任何操作都要考虑幂等性
  3. 监控是生命线​:没有监控的系统就像在黑暗中开车
  4. 降级是必须的​:任何中间件都可能失败,要有降级方案

最重要的收获​:技术方案的选择需要权衡一致性、可用性和性能。RocketMQ事务消息方案适合需要高可靠、异步解耦的场景,但同时也带来了复杂性。在实际项目中,我们需要根据业务场景选择最合适的方案。

这次经历让我明白了分布式系统设计的复杂性,也让我养成了在设计阶段就考虑故障恢复和数据一致性的习惯。真正的系统稳定性不是靠运气,而是靠严谨的设计和充分的测试。

版权声明 ▶ 本网站名称:黄磊的博客
▶ 本文标题:利用RocketMQ实现分布式场景下的最终一致性:一个订单支付的例子
▶ 本文链接:https://www.huangleicole.com/middleware/53.html
▶ 转载本站文章需要遵守:商业转载请联系站长,非商业转载请注明出处!!

如果觉得我的文章对你有用,请随意赞赏