AI摘要
文章以订单支付为例,演示如何用 RocketMQ 事务消息替代同步调用,解决“用户已扣款但订单状态未更新”的数据不一致问题:先发送 Half 消息并记录支付流水,再经本地事务提交或回查决定消息是否生效;消费端幂等更新订单,配合重试、死信、补偿、降级、监控等手段,实现高可靠最终一致性。
我以为引入RocketMQ就能轻松解决分布式事务问题,直到那次深夜告警——用户付了钱但订单状态未更新,我才意识到分布式场景下的数据一致性远比想象中复杂。
一、问题现场:从数据不一致到业务损失
1.1 最初的同步调用架构
先看看我们最初的问题架构:
// 支付服务 - 最初的同步调用实现
@Service
@Slf4j
public class PaymentService {
@Autowired
private PaymentMapper paymentMapper;
@Autowired
private RestTemplate restTemplate;
/**
* 支付成功回调处理
* 问题:同步调用订单服务,网络异常时数据不一致
*/
@Transactional
public void handlePaymentCallback(PaymentCallbackDTO callback) {
try {
// 1. 记录支付流水
PaymentRecord payment = createPaymentRecord(callback);
paymentMapper.insert(payment);
// 2. 更新支付订单状态
updatePaymentOrderStatus(callback.getOrderId(), "PAID");
// 3. 同步调用订单服务更新订单状态
String orderServiceUrl = "http://order-service/orders/" +
callback.getOrderId() + "/pay-success";
ResponseEntity<String> response = restTemplate.postForEntity(
orderServiceUrl,
callback,
String.class
);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("订单状态更新失败");
}
log.info("支付成功处理完成,订单号:{}", callback.getOrderId());
} catch (Exception e) {
log.error("支付回调处理失败", e);
// 这里抛出异常,整个事务回滚
throw new RuntimeException("支付处理失败", e);
}
}
}
// 订单服务
@RestController
@RequestMapping("/orders")
public class OrderController {
@PostMapping("/{orderId}/pay-success")
public ResponseEntity<?> handlePaySuccess(@PathVariable String orderId,
@RequestBody PaymentCallbackDTO callback) {
// 这里可能因为各种原因失败
boolean success = orderService.updateOrderStatus(orderId, "PAID", callback.getAmount());
if (!success) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
// 触发后续业务:扣减库存、发放优惠券等
asyncProcessAfterPayment(orderId);
return ResponseEntity.ok().build();
}
}1.2 问题的根源分析

核心问题:第三方支付已经扣款成功,但由于订单服务调用失败,导致支付服务事务回滚,支付记录丢失。用户付了钱,但系统里没有任何记录。
二、RocketMQ事务消息:分布式事务的优雅解决方案
2.1 事务消息的基本原理
RocketMQ的事务消息机制分为两个阶段:
2.2 事务消息的关键类解析
// RocketMQ事务消息的核心接口
public interface TransactionListener {
/**
* 执行本地事务
* @param msg 消息
* @param arg 业务参数
* @return 本地事务执行结果
*/
LocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
* 回查本地事务状态
* @param msg 消息
* @return 本地事务当前状态
*/
LocalTransactionState checkLocalTransaction(MessageExt msg);
}
// 本地事务状态枚举
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交消息,允许消费
ROLLBACK_MESSAGE, // 回滚消息,删除消息
UNKNOWN // 未知状态,需要回查
}三、实战:基于RocketMQ重构支付流程
3.1 支付服务改造:发送事务消息
@Service
@Slf4j
public class PaymentServiceV2 {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private PaymentMapper paymentMapper;
// 支付结果回调Topic
private static final String PAYMENT_RESULT_TOPIC = "payment-result-topic";
/**
* 支付成功回调处理 - 使用事务消息
*/
public void handlePaymentCallback(PaymentCallbackDTO callback) {
String transactionId = UUID.randomUUID().toString();
try {
// 1. 验证回调的合法性(防止重复处理)
if (!validateCallback(callback)) {
log.warn("支付回调验证失败,订单号:{}", callback.getOrderId());
return;
}
// 2. 构建消息
Message<PaymentResultMessage> message = MessageBuilder
.withPayload(buildPaymentResultMessage(callback))
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("order_id", callback.getOrderId())
.setHeader("callback_timestamp", System.currentTimeMillis())
.build();
// 3. 发送事务消息
// 这里的arg参数会传递给executeLocalTransaction方法
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
PAYMENT_RESULT_TOPIC + ":PAY_SUCCESS", // Topic:Tag
message,
callback // 作为arg传递
);
log.info("事务消息发送结果:{}, 订单号:{}",
sendResult.getLocalTransactionState(),
callback.getOrderId());
} catch (Exception e) {
log.error("支付回调处理异常,订单号:{}", callback.getOrderId(), e);
// 记录到异常表,后续人工处理
recordException(callback, transactionId, e.getMessage());
}
}
/**
* 构建支付结果消息
*/
private PaymentResultMessage buildPaymentResultMessage(PaymentCallbackDTO callback) {
PaymentResultMessage message = new PaymentResultMessage();
message.setOrderId(callback.getOrderId());
message.setPaymentId(callback.getPaymentId());
message.setAmount(callback.getAmount());
message.setPayTime(callback.getPayTime());
message.setUserId(callback.getUserId());
message.setPaymentMethod(callback.getPaymentMethod());
// 添加唯一标识,用于幂等性校验
message.setBizId(callback.getOrderId() + "_" + callback.getPayTime().getTime());
return message;
}
}
// 支付结果消息体
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PaymentResultMessage implements Serializable {
private String orderId;
private String paymentId;
private BigDecimal amount;
private Date payTime;
private Long userId;
private String paymentMethod;
private String bizId; // 业务唯一标识
}3.2 事务监听器实现:本地事务和回查机制
@Component
@RocketMQTransactionListener(txProducerGroup = "payment-transaction-group")
@Slf4j
public class PaymentTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private PaymentMapper paymentMapper;
@Autowired
private DeduplicationService deduplicationService;
/**
* 执行本地事务
* 这个方法与发送事务消息在同一个本地事务中
*/
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
PaymentCallbackDTO callback = (PaymentCallbackDTO) arg;
String orderId = callback.getOrderId();
String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID", String.class);
log.info("开始执行本地事务,订单号:{}, 事务ID:{}", orderId, transactionId);
try {
// 1. 幂等性校验:防止重复处理
if (deduplicationService.isProcessed(orderId, "payment_callback")) {
log.warn("支付回调已处理,直接返回成功,订单号:{}", orderId);
return RocketMQLocalTransactionState.COMMIT;
}
// 2. 记录支付流水(业务唯一性校验)
PaymentRecord existingRecord = paymentMapper.selectByOrderId(orderId);
if (existingRecord != null && "SUCCESS".equals(existingRecord.getStatus())) {
log.warn("支付记录已存在且成功,直接提交消息,订单号:{}", orderId);
return RocketMQLocalTransactionState.COMMIT;
}
// 3. 创建支付记录
PaymentRecord payment = new PaymentRecord();
payment.setOrderId(orderId);
payment.setPaymentId(callback.getPaymentId());
payment.setAmount(callback.getAmount());
payment.setStatus("PROCESSING"); // 处理中状态
payment.setTransactionId(transactionId);
payment.setCreateTime(new Date());
payment.setUpdateTime(new Date());
paymentMapper.insert(payment);
log.info("支付记录创建成功,订单号:{}", orderId);
// 4. 更新幂等性记录
deduplicationService.recordProcess(orderId, "payment_callback");
// 5. 这里可以添加其他本地事务操作
// updateLocalBusiness(orderId);
log.info("本地事务执行成功,提交消息,订单号:{}", orderId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行异常,回滚消息,订单号:{}, 异常:{}", orderId, e.getMessage(), e);
// 记录异常信息,便于排查
recordTransactionException(transactionId, orderId, e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 回查本地事务状态
* RocketMQ会定时调用这个方法,检查本地事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("order_id", String.class);
String transactionId = msg.getHeaders().get("rocketmq_TRANSACTION_ID", String.class);
log.info("开始回查本地事务状态,订单号:{}, 事务ID:{}", orderId, transactionId);
try {
// 1. 查询支付记录
PaymentRecord payment = paymentMapper.selectByOrderId(orderId);
if (payment == null) {
log.warn("支付记录不存在,状态未知,订单号:{}", orderId);
return RocketMQLocalTransactionState.UNKNOWN;
}
// 2. 根据支付记录状态决定消息状态
if ("SUCCESS".equals(payment.getStatus()) || "PROCESSING".equals(payment.getStatus())) {
log.info("支付记录状态为{},提交消息,订单号:{}", payment.getStatus(), orderId);
return RocketMQLocalTransactionState.COMMIT;
} else if ("FAILED".equals(payment.getStatus())) {
log.warn("支付记录状态为失败,回滚消息,订单号:{}", orderId);
return RocketMQLocalTransactionState.ROLLBACK;
} else {
log.warn("支付记录状态未知:{},需要继续回查,订单号:{}", payment.getStatus(), orderId);
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
log.error("回查本地事务状态异常,订单号:{}", orderId, e);
// 回查异常,继续回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 记录事务异常信息
*/
private void recordTransactionException(String transactionId, String orderId, Exception e) {
// 这里可以将异常信息记录到数据库,便于后续排查
TransactionExceptionRecord record = new TransactionExceptionRecord();
record.setTransactionId(transactionId);
record.setOrderId(orderId);
record.setExceptionMessage(e.getMessage());
record.setExceptionStack(ExceptionUtils.getStackTrace(e));
record.setCreateTime(new Date());
// transactionExceptionMapper.insert(record);
}
}3.3 订单服务:可靠的消息消费者
@Service
@Slf4j
@RocketMQMessageListener(
topic = "payment-result-topic",
consumerGroup = "order-payment-consumer-group",
selectorExpression = "PAY_SUCCESS", // 只消费支付成功的消息
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = 10 // 根据业务量调整
)
public class OrderPaymentConsumer implements RocketMQListener<PaymentResultMessage> {
@Autowired
private OrderService orderService;
@Autowired
private DeduplicationService deduplicationService;
@Autowired
private MetricsService metricsService;
@Override
public void onMessage(PaymentResultMessage message) {
String orderId = message.getOrderId();
String bizId = message.getBizId();
long startTime = System.currentTimeMillis();
try {
log.info("开始处理支付成功消息,订单号:{}, bizId:{}", orderId, bizId);
// 1. 幂等性校验(基于bizId)
if (deduplicationService.isProcessed(bizId, "order_payment")) {
log.warn("消息已处理,直接确认,bizId:{}", bizId);
return;
}
// 2. 执行业务逻辑
boolean success = processOrderPayment(message);
if (success) {
// 3. 记录处理成功
deduplicationService.recordProcess(bizId, "order_payment");
log.info("支付成功消息处理完成,订单号:{}", orderId);
// 4. 记录监控指标
metricsService.recordSuccess(orderId, "order_payment");
} else {
log.error("订单支付处理失败,订单号:{}", orderId);
// 抛出异常,让RocketMQ重试
throw new RuntimeException("订单支付处理失败");
}
} catch (Exception e) {
long costTime = System.currentTimeMillis() - startTime;
log.error("处理支付成功消息异常,订单号:{}, 耗时:{}ms", orderId, costTime, e);
// 记录失败指标
metricsService.recordError(orderId, "order_payment", e.getMessage());
// 根据异常类型决定是否重试
if (shouldRetry(e)) {
throw new RuntimeException("需要重试的异常", e);
} else {
// 业务异常,不重试,记录到死信队列
sendToDeadLetterQueue(message, e);
log.warn("业务异常,不重试,消息进入死信队列,订单号:{}", orderId);
}
} finally {
long costTime = System.currentTimeMillis() - startTime;
metricsService.recordProcessTime("order_payment", costTime);
}
}
/**
* 处理订单支付逻辑
*/
@Transactional(rollbackFor = Exception.class)
public boolean processOrderPayment(PaymentResultMessage message) {
String orderId = message.getOrderId();
// 1. 查询订单
Order order = orderService.getOrderById(orderId);
if (order == null) {
log.error("订单不存在,订单号:{}", orderId);
return false;
}
// 2. 校验订单状态
if (!"UNPAID".equals(order.getStatus())) {
log.warn("订单当前状态为{},无需处理支付,订单号:{}", order.getStatus(), orderId);
// 如果订单已经是已支付状态,认为是重复消息,返回成功
if ("PAID".equals(order.getStatus())) {
return true;
}
// 其他状态视为异常
return false;
}
// 3. 更新订单状态
order.setStatus("PAID");
order.setPayTime(message.getPayTime());
order.setPaymentId(message.getPaymentId());
order.setUpdateTime(new Date());
boolean updateSuccess = orderService.updateOrder(order);
if (!updateSuccess) {
log.error("订单状态更新失败,订单号:{}", orderId);
return false;
}
// 4. 记录订单支付流水
OrderPaymentRecord paymentRecord = new OrderPaymentRecord();
paymentRecord.setOrderId(orderId);
paymentRecord.setPaymentId(message.getPaymentId());
paymentRecord.setAmount(message.getAmount());
paymentRecord.setPaymentMethod(message.getPaymentMethod());
paymentRecord.setCreateTime(new Date());
orderService.savePaymentRecord(paymentRecord);
// 5. 触发后续业务流程(异步)
triggerPostPaymentProcess(orderId);
return true;
}
/**
* 触发支付后处理流程
*/
private void triggerPostPaymentProcess(String orderId) {
try {
// 使用异步处理,避免影响主流程
CompletableFuture.runAsync(() -> {
// 5.1 扣减库存
inventoryService.deductInventory(orderId);
// 5.2 发放优惠券
couponService.grantCouponAfterPayment(orderId);
// 5.3 发送通知
notificationService.sendPaymentSuccessNotification(orderId);
// 5.4 更新统计数据
statisticsService.updatePaymentStatistics(orderId);
}).exceptionally(e -> {
log.error("支付后处理流程异常,订单号:{}", orderId, e);
// 这里可以记录异常,但不影响主流程
return null;
});
} catch (Exception e) {
log.error("触发支付后处理流程异常", e);
// 这里只记录日志,不抛异常,避免影响支付主流程
}
}
/**
* 判断是否需要重试
*/
private boolean shouldRetry(Exception e) {
// 网络异常、数据库连接异常等需要重试
if (e instanceof TimeoutException ||
e instanceof ConnectException ||
e instanceof SQLException) {
return true;
}
// 业务异常不需要重试
if (e instanceof BusinessException) {
return false;
}
// 默认重试
return true;
}
/**
* 发送到死信队列
*/
private void sendToDeadLetterQueue(PaymentResultMessage message, Exception e) {
// 这里可以将处理失败的消息发送到死信队列
// 后续可以人工处理或自动补偿
DeadLetterMessage deadLetter = new DeadLetterMessage();
deadLetter.setOriginalMessage(message);
deadLetter.setExceptionInfo(e.getMessage());
deadLetter.setFailTime(new Date());
// deadLetterService.send(deadLetter);
}
}3.4 幂等性服务实现
@Service
@Slf4j
public class DeduplicationService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private DeduplicationMapper deduplicationMapper;
// Redis key前缀
private static final String REDIS_KEY_PREFIX = "dedup:";
// Redis过期时间:24小时
private static final long REDIS_EXPIRE_SECONDS = 24 * 60 * 60;
/**
* 检查是否已处理
*/
public boolean isProcessed(String bizId, String bizType) {
String key = buildKey(bizId, bizType);
// 1. 先查Redis(快速路径)
Boolean exists = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exists)) {
log.debug("Redis中已存在处理记录,bizId:{}, bizType:{}", bizId, bizType);
return true;
}
// 2. 再查数据库(兜底)
DeduplicationRecord record = deduplicationMapper.selectByBizIdAndType(bizId, bizType);
if (record != null) {
// 回写到Redis
redisTemplate.opsForValue().set(key, "1", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
return true;
}
return false;
}
/**
* 记录处理完成
*/
@Transactional
public void recordProcess(String bizId, String bizType) {
String key = buildKey(bizId, bizType);
// 1. 写入数据库
DeduplicationRecord record = new DeduplicationRecord();
record.setBizId(bizId);
record.setBizType(bizType);
record.setCreateTime(new Date());
try {
deduplicationMapper.insert(record);
} catch (DuplicateKeyException e) {
// 唯一键冲突,说明已处理过
log.warn("重复记录,bizId:{}, bizType:{}", bizId, bizType);
}
// 2. 写入Redis
redisTemplate.opsForValue().set(key, "1", REDIS_EXPIRE_SECONDS, TimeUnit.SECONDS);
}
/**
* 构建Redis key
*/
private String buildKey(String bizId, String bizType) {
return REDIS_KEY_PREFIX + bizType + ":" + bizId;
}
/**
* 清理过期的幂等记录
* 可以定时执行,清理比如30天前的记录
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cleanExpiredRecords() {
Date expireDate = DateUtils.addDays(new Date(), -30);
int deleted = deduplicationMapper.deleteBeforeDate(expireDate);
log.info("清理过期幂等记录,数量:{}", deleted);
}
}四、消息可靠性保障:从生产到消费的全链路保证
4.1 生产者可靠性保障
@Component
@Slf4j
public class ReliableMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 消息发送重试次数
private static final int MAX_RETRY_TIMES = 3;
// 重试间隔(毫秒)
private static final long RETRY_INTERVAL = 1000;
/**
* 可靠的消息发送(支持重试)
*/
public SendResult sendReliably(String topic, Message<?> message) {
int retryCount = 0;
SendResult sendResult = null;
Exception lastException = null;
while (retryCount <= MAX_RETRY_TIMES) {
try {
sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("消息发送成功,消息ID:{}", sendResult.getMsgId());
break;
} catch (Exception e) {
lastException = e;
retryCount++;
if (retryCount <= MAX_RETRY_TIMES) {
log.warn("消息发送失败,第{}次重试,topic:{}", retryCount, topic, e);
try {
Thread.sleep(RETRY_INTERVAL * retryCount); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
if (sendResult == null) {
log.error("消息发送失败,达到最大重试次数,topic:{}", topic, lastException);
throw new MessageSendException("消息发送失败", lastException);
}
return sendResult;
}
/**
* 发送事务消息的可靠版本
*/
public TransactionSendResult sendTransactionReliably(
String topic,
Message<?> message,
Object arg) {
// 记录发送前的消息
String messageId = UUID.randomUUID().toString();
recordMessageBeforeSend(messageId, topic, message, arg);
try {
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, message, arg);
// 记录发送结果
recordMessageSendResult(messageId, result);
return result;
} catch (Exception e) {
log.error("事务消息发送异常,messageId:{}", messageId, e);
// 记录发送失败
recordMessageSendFailure(messageId, e);
throw e;
}
}
/**
* 记录发送前的消息(用于故障恢复)
*/
private void recordMessageBeforeSend(String messageId, String topic,
Message<?> message, Object arg) {
// 这里可以将消息记录到数据库,如果发送失败可以人工介入或自动补偿
// 在实际生产环境中,这是一个重要的可靠性保障措施
}
}4.2 消费者可靠性保障
@Component
@Slf4j
public class MessageConsumeRecorder {
@Autowired
private MessageConsumeRecordMapper recordMapper;
/**
* 记录消息消费开始
*/
public String recordConsumeStart(MessageExt message) {
String recordId = UUID.randomUUID().toString();
MessageConsumeRecord record = new MessageConsumeRecord();
record.setId(recordId);
record.setMsgId(message.getMsgId());
record.setTopic(message.getTopic());
record.setTags(message.getTags());
record.setKeys(message.getKeys());
record.setBody(new String(message.getBody()));
record.setStatus("PROCESSING");
record.setConsumeTimes(message.getReconsumeTimes() + 1); // 重试次数
record.setCreateTime(new Date());
recordMapper.insert(record);
return recordId;
}
/**
* 记录消息消费完成
*/
public void recordConsumeSuccess(String recordId) {
MessageConsumeRecord record = new MessageConsumeRecord();
record.setId(recordId);
record.setStatus("SUCCESS");
record.setUpdateTime(new Date());
recordMapper.updateStatus(record);
}
/**
* 记录消息消费失败
*/
public void recordConsumeFailure(String recordId, String errorMsg) {
MessageConsumeRecord record = new MessageConsumeRecord();
record.setId(recordId);
record.setStatus("FAILED");
record.setErrorMsg(errorMsg);
record.setUpdateTime(new Date());
recordMapper.updateStatus(record);
}
/**
* 检查消息是否已消费成功(用于幂等性)
*/
public boolean isMessageConsumed(String msgId) {
MessageConsumeRecord record = recordMapper.selectByMsgId(msgId);
return record != null && "SUCCESS".equals(record.getStatus());
}
}五、监控与运维:构建可观测的消息系统
5.1 消息轨迹追踪
@Component
@Aspect
@Slf4j
public class MessageTraceAspect {
@Autowired
private MetricsService metricsService;
/**
* 监控消息发送
*/
@Around("execution(* org.apache.rocketmq.spring.core.RocketMQTemplate.send*(..))")
public Object traceSendMessage(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
String methodName = joinPoint.getSignature().getName();
try {
Object result = joinPoint.proceed();
long costTime = System.currentTimeMillis() - startTime;
// 记录成功指标
metricsService.recordMessageSendSuccess(methodName, costTime);
if (result instanceof SendResult) {
SendResult sendResult = (SendResult) result;
log.info("消息发送成功,消息ID:{},耗时:{}ms",
sendResult.getMsgId(), costTime);
}
return result;
} catch (Exception e) {
long costTime = System.currentTimeMillis() - startTime;
// 记录失败指标
metricsService.recordMessageSendFailure(methodName, costTime, e.getMessage());
log.error("消息发送失败,方法:{},耗时:{}ms", methodName, costTime, e);
throw e;
}
}
/**
* 监控消息消费
*/
@Around("@annotation(org.apache.rocketmq.spring.annotation.RocketMQMessageListener)")
public Object traceConsumeMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
MessageExt message = null;
// 提取消息
for (Object arg : args) {
if (arg instanceof MessageExt) {
message = (MessageExt) arg;
break;
}
}
long startTime = System.currentTimeMillis();
String msgId = message != null ? message.getMsgId() : "unknown";
try {
Object result = joinPoint.proceed();
long costTime = System.currentTimeMillis() - startTime;
// 记录消费成功指标
metricsService.recordMessageConsumeSuccess(msgId, costTime);
log.info("消息消费成功,消息ID:{},耗时:{}ms", msgId, costTime);
return result;
} catch (Exception e) {
long costTime = System.currentTimeMillis() - startTime;
// 记录消费失败指标
metricsService.recordMessageConsumeFailure(msgId, costTime, e.getMessage());
log.error("消息消费失败,消息ID:{},耗时:{}ms,重试次数:{}",
msgId, costTime,
message != null ? message.getReconsumeTimes() : 0, e);
// 根据异常类型决定是否重试
if (shouldRetryConsume(e)) {
throw e; // 抛出异常,触发重试
} else {
// 业务异常,不重试
log.warn("业务异常,消息不重试,消息ID:{}", msgId);
return null;
}
}
}
private boolean shouldRetryConsume(Exception e) {
// 系统异常重试,业务异常不重试
return !(e instanceof BusinessException);
}
}5.2 关键监控指标
@Service
@Slf4j
public class MessageMetricsService {
// 使用Micrometer记录指标
private final MeterRegistry meterRegistry;
// 计数器
private final Counter messagesSentCounter;
private final Counter messagesConsumedCounter;
private final Counter messageSendErrorsCounter;
private final Counter messageConsumeErrorsCounter;
// 计时器
private final Timer messageSendTimer;
private final Timer messageConsumeTimer;
public MessageMetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化计数器
this.messagesSentCounter = Counter.builder("rocketmq.messages.sent")
.description("发送的消息总数")
.register(meterRegistry);
this.messagesConsumedCounter = Counter.builder("rocketmq.messages.consumed")
.description("消费的消息总数")
.register(meterRegistry);
this.messageSendErrorsCounter = Counter.builder("rocketmq.messages.send.errors")
.description("消息发送错误数")
.register(meterRegistry);
this.messageConsumeErrorsCounter = Counter.builder("rocketmq.messages.consume.errors")
.description("消息消费错误数")
.register(meterRegistry);
// 初始化计时器
this.messageSendTimer = Timer.builder("rocketmq.messages.send.time")
.description("消息发送耗时")
.register(meterRegistry);
this.messageConsumeTimer = Timer.builder("rocketmq.messages.consume.time")
.description("消息消费耗时")
.register(meterRegistry);
}
/**
* 记录消息发送成功
*/
public void recordMessageSendSuccess(String topic, long costTime) {
messagesSentCounter.increment();
messageSendTimer.record(costTime, TimeUnit.MILLISECONDS);
// 按Topic记录
Counter.builder("rocketmq.messages.sent")
.tag("topic", topic)
.register(meterRegistry)
.increment();
}
/**
* 记录消息消费延迟
*/
public void recordMessageConsumeDelay(String msgId, long bornTimestamp) {
long delay = System.currentTimeMillis() - bornTimestamp;
Timer.builder("rocketmq.messages.consume.delay")
.register(meterRegistry)
.record(delay, TimeUnit.MILLISECONDS);
// 延迟超过阈值告警
if (delay > 30000) { // 30秒
log.warn("消息消费延迟过高,消息ID:{},延迟:{}ms", msgId, delay);
// 可以发送告警通知
}
}
}六、容灾与降级:应对极端情况
6.1 消息补偿机制
@Service
@Slf4j
public class MessageCompensationService {
@Autowired
private MessageConsumeRecordMapper recordMapper;
@Autowired
private OrderService orderService;
/**
* 定时检查未消费成功的消息
*/
@Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
public void compensateFailedMessages() {
log.info("开始执行消息补偿任务");
// 查询10分钟内处理失败的消息
Date tenMinutesAgo = DateUtils.addMinutes(new Date(), -10);
List<MessageConsumeRecord> failedRecords =
recordMapper.selectFailedRecords(tenMinutesAgo, 100); // 每次补偿100条
for (MessageConsumeRecord record : failedRecords) {
try {
log.info("补偿处理消息,消息ID:{},重试次数:{}",
record.getMsgId(), record.getConsumeTimes());
// 重新处理消息
boolean success = reprocessMessage(record);
if (success) {
record.setStatus("COMPENSATED");
record.setUpdateTime(new Date());
recordMapper.updateStatus(record);
log.info("消息补偿成功,消息ID:{}", record.getMsgId());
} else {
log.warn("消息补偿失败,消息ID:{}", record.getMsgId());
}
} catch (Exception e) {
log.error("消息补偿异常,消息ID:{}", record.getMsgId(), e);
}
}
log.info("消息补偿任务执行完成,处理数量:{}", failedRecords.size());
}
/**
* 重新处理消息
*/
private boolean reprocessMessage(MessageConsumeRecord record) {
// 根据消息内容重新执行业务逻辑
// 这里需要根据具体业务实现
// 示例:处理订单支付消息
if ("payment-result-topic".equals(record.getTopic()) &&
"PAY_SUCCESS".equals(record.getTags())) {
PaymentResultMessage message = JSON.parseObject(
record.getBody(), PaymentResultMessage.class);
return orderService.processOrderPayment(message);
}
return false;
}
}6.2 降级策略:消息队列不可用时的处理
@Component
@Slf4j
public class MessageQueueDegradeService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private DatabaseFallbackService databaseFallbackService;
// 降级开关
private volatile boolean degradeEnabled = false;
// 最后一次检查时间
private volatile long lastCheckTime = 0;
// 检查间隔:10秒
private static final long CHECK_INTERVAL = 10 * 1000;
/**
* 发送消息(带降级)
*/
public SendResult sendWithDegrade(String topic, Message<?> message) {
// 检查是否需要降级
if (shouldDegrade()) {
log.warn("消息队列降级中,使用数据库降级方案,topic:{}", topic);
return sendViaDatabaseFallback(topic, message);
}
try {
// 正常发送
return rocketMQTemplate.syncSend(topic, message);
} catch (Exception e) {
log.error("消息发送失败,启用降级,topic:{}", topic, e);
// 发送失败,启用降级
enableDegrade();
// 使用降级方案
return sendViaDatabaseFallback(topic, message);
}
}
/**
* 检查是否需要降级
*/
private boolean shouldDegrade() {
long currentTime = System.currentTimeMillis();
// 检查间隔
if (currentTime - lastCheckTime < CHECK_INTERVAL) {
return degradeEnabled;
}
lastCheckTime = currentTime;
try {
// 测试消息队列连通性
rocketMQTemplate.syncSend("health-check-topic", "ping");
degradeEnabled = false; // 恢复
return false;
} catch (Exception e) {
degradeEnabled = true; // 降级
return true;
}
}
/**
* 启用降级
*/
private void enableDegrade() {
degradeEnabled = true;
lastCheckTime = System.currentTimeMillis();
// 发送告警
sendDegradeAlert();
}
/**
* 数据库降级方案
*/
private SendResult sendViaDatabaseFallback(String topic, Message<?> message) {
try {
// 将消息存入数据库
databaseFallbackService.saveMessage(topic, message);
// 返回模拟的SendResult
SendResult result = new SendResult();
result.setMsgId("DB_FALLBACK_" + UUID.randomUUID().toString());
result.setSendStatus(SendStatus.SEND_OK);
return result;
} catch (Exception e) {
log.error("数据库降级方案也失败了", e);
throw new RuntimeException("消息发送失败且降级方案失败", e);
}
}
}七、经验总结与最佳实践
通过这次支付系统的重构,我总结了以下几点经验:
7.1 消息设计原则
- 消息要精简:只传递必要的信息,避免大消息
- 消息要自包含:消息应该包含所有必要信息,避免消费者需要额外查询
- 消息要版本化:消息结构变化时要考虑兼容性
@Data
public class PaymentResultMessageV2 implements Serializable {
// 基础信息
private String orderId;
private String paymentId;
private BigDecimal amount;
// 版本控制
private String version = "2.0";
// 扩展信息(向后兼容)
private Map<String, Object> extensions;
// 兼容V1版本
public PaymentResultMessageV1 toV1() {
PaymentResultMessageV1 v1 = new PaymentResultMessageV1();
v1.setOrderId(this.orderId);
v1.setPaymentId(this.paymentId);
v1.setAmount(this.amount);
return v1;
}
}7.2 事务消息使用建议
- 本地事务要简单:事务消息中的本地事务应该尽可能简单
- 回查逻辑要可靠:回查逻辑必须能够准确判断本地事务状态
- 超时时间要合理:事务消息的未知状态处理超时要根据业务设置
// RocketMQ配置
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate() {
RocketMQTemplate template = new RocketMQTemplate();
// 事务消息配置
template.setProducer(new TransactionMQProducer("payment-producer-group") {{
// 事务回查线程池
setExecutorService(Executors.newFixedThreadPool(10));
// 事务超时时间:30秒
setTransactionTimeout(30000);
// 最大重试次数:3次
setMaxReconsumeTimes(3);
}});
return template;
}
}7.3 监控告警指标
必须监控的关键指标:
- 消息发送成功率/失败率
- 消息消费延迟
- 消息积压数量
- 事务消息回查次数
- 死信队列大小
总结
通过RocketMQ事务消息实现最终一致性,我们成功解决了支付系统中的数据不一致问题。这次重构让我深刻认识到:
- 没有完美的解决方案:最终一致性是以牺牲强一致性为代价的
- 幂等性是基础:在分布式系统中,任何操作都要考虑幂等性
- 监控是生命线:没有监控的系统就像在黑暗中开车
- 降级是必须的:任何中间件都可能失败,要有降级方案
最重要的收获:技术方案的选择需要权衡一致性、可用性和性能。RocketMQ事务消息方案适合需要高可靠、异步解耦的场景,但同时也带来了复杂性。在实际项目中,我们需要根据业务场景选择最合适的方案。
这次经历让我明白了分布式系统设计的复杂性,也让我养成了在设计阶段就考虑故障恢复和数据一致性的习惯。真正的系统稳定性不是靠运气,而是靠严谨的设计和充分的测试。