@Async注解看似简单,只需添加一个注解就能实现异步执行,但背后隐藏着线程池管理、异常处理、事务传播等复杂问题。本文将详细记录我在使用Spring异步编程时遇到的各种"坑",包括线程池配置不当导致的OOM、异常丢失难以排查、事务失效等实际问题,并提供完整的解决方案。1. 初识@Async:简单的开始与隐藏的陷阱
场景: 用户注册后需要发送欢迎邮件,邮件发送耗时2-3秒,不应阻塞主注册流程。
V1.0 天真的实现:
@Service
public class UserService {
@Autowired
private EmailService emailService;
public void register(User user) {
// 1. 保存用户到数据库(同步)
userMapper.insert(user);
// 2. 发送欢迎邮件(希望异步执行)
emailService.sendWelcomeEmail(user);
// 3. 立即返回响应
}
}
@Service
public class EmailService {
@Async // 添加异步注解
public void sendWelcomeEmail(User user) {
try {
// 模拟邮件发送耗时
Thread.sleep(3000);
System.out.println("邮件发送成功给:" + user.getEmail());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 启动类添加@EnableAsync
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}问题立即出现:
- 默认线程池问题: Spring默认使用
SimpleAsyncTaskExecutor,每次调用都创建新线程 - 异常丢失: 异步方法中的异常不会传播到调用方
- 事务上下文丢失: 异步方法中无法使用调用方的事务
2. 线程池配置:从OOM到性能优化
问题现象: 在高并发场景下,应用频繁创建线程,最终导致内存溢出。
V2.0 自定义线程池配置:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
/**
* 自定义线程池
* 核心配置参数理解:
* - corePoolSize: 核心线程数,线程池长期维持的线程数
* - maxPoolSize: 最大线程数,当队列满时线程池可以创建的最大线程数
* - queueCapacity: 队列容量,缓冲执行任务的队列
* - keepAliveSeconds: 线程空闲时间,超过核心线程数的线程空闲多久后被回收
*/
@Bean("mailTaskExecutor")
public ThreadPoolTaskExecutor mailTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心配置
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
// 线程命名,便于监控
executor.setThreadNamePrefix("mail-async-");
// 拒绝策略:当线程池和队列都满时的处理策略
// CallerRunsPolicy: 由调用线程(通常是主线程)执行该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean("smsTaskExecutor")
public ThreadPoolTaskExecutor smsTaskExecutor() {
// 为短信服务配置独立的线程池,避免互相影响
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("sms-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return mailTaskExecutor(); // 默认执行器
}
}使用指定的线程池:
@Service
public class EmailService {
@Async("mailTaskExecutor") // 指定使用邮件线程池
public void sendWelcomeEmail(User user) {
// 异步执行逻辑
}
}
@Service
public class SmsService {
@Async("smsTaskExecutor") // 指定使用短信线程池
public void sendWelcomeSms(User user) {
// 异步执行逻辑
}
}3. 异常处理:从"静默失败"到完整监控
问题: 异步方法抛出异常时,调用方完全不知情,问题难以排查。
V3.0 完善的异常处理方案:
方案1:实现AsyncUncaughtExceptionHandler(适用于无返回值异步方法)
@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
@Autowired
private ThreadPoolTaskExecutor mailTaskExecutor;
@Override
public Executor getAsyncExecutor() {
return mailTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
/**
* 自定义异步异常处理器
*/
@Component
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Autowired
private AlarmService alarmService; // 告警服务
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
// 1. 记录详细错误日志
log.error("异步方法执行异常: 方法名[{}], 参数[{}]", method.getName(), Arrays.toString(params), ex);
// 2. 发送告警通知
String errorMsg = String.format("异步任务异常: %s.%s, 错误: %s",
method.getDeclaringClass().getSimpleName(), method.getName(), ex.getMessage());
alarmService.sendAlert("ASYNC_TASK_ERROR", errorMsg);
// 3. 记录到数据库便于排查
saveAsyncErrorLog(method, params, ex);
}
private void saveAsyncErrorLog(Method method, Object[] params, Throwable ex) {
AsyncErrorLog log = new AsyncErrorLog();
log.setClassName(method.getDeclaringClass().getName());
log.setMethodName(method.getName());
log.setParameters(JSON.toJSONString(params));
log.setErrorMessage(ex.getMessage());
log.setStacktrace(ExceptionUtils.getStackTrace(ex));
log.setCreateTime(new Date());
asyncErrorLogMapper.insert(log);
}
}方案2:使用CompletableFuture(适用于有返回值场景)
@Service
public class EmailService {
@Async("mailTaskExecutor")
public CompletableFuture<Boolean> sendWelcomeEmail(User user) {
try {
// 模拟业务逻辑
Thread.sleep(1000);
// 模拟可能发生的异常
if (user.getEmail().contains("test")) {
throw new RuntimeException("测试邮箱发送失败");
}
log.info("邮件发送成功: {}", user.getEmail());
return CompletableFuture.completedFuture(true);
} catch (Exception e) {
// 异常被包装在CompletableFuture中,不会被AsyncUncaughtExceptionHandler处理
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
}
// 调用方处理异常
@Service
public class UserService {
public void register(User user) {
// 注册逻辑...
CompletableFuture<Boolean> emailFuture = emailService.sendWelcomeEmail(user);
// 方式1:同步等待并处理异常
try {
Boolean result = emailFuture.get(5, TimeUnit.SECONDS); // 设置超时时间
log.info("邮件发送结果: {}", result);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("邮件发送失败", e);
// 可以在这里进行补偿操作,如记录到重试表
}
// 方式2:异步回调处理
emailFuture.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("邮件发送异步回调中处理异常", throwable);
// 执行补偿逻辑
compensateEmailSending(user, throwable);
} else {
log.info("邮件发送成功,结果: {}", result);
}
});
}
}4. 事务管理:异步方法中的事务传播问题
问题: 在@Transactional方法中调用@Async方法,事务上下文不会传播到异步线程。
错误示例:
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 保存订单(在事务中)
orderMapper.insert(order);
// 2. 异步发送通知(新线程,不在事务中)
notificationService.asyncSendOrderCreatedMsg(order);
// 3. 如果这里抛出异常,希望回滚订单保存,但通知已经发送无法撤回
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new RuntimeException("订单金额必须大于0");
}
}
}解决方案1:在异步方法内部管理事务
@Service
public class NotificationService {
@Autowired
private TransactionTemplate transactionTemplate;
@Async
public void asyncSendOrderCreatedMsg(Order order) {
// 在异步方法内部开启新事务
transactionTemplate.execute(status -> {
try {
// 查询订单最新状态(在新事务中)
Order latestOrder = orderMapper.selectById(order.getId());
if ("CREATED".equals(latestOrder.getStatus())) {
// 发送通知
sendNotification(latestOrder);
// 更新通知状态
updateNotificationStatus(latestOrder.getId(), "SENT");
}
return null;
} catch (Exception e) {
status.setRollbackOnly(); // 标记回滚
log.error("发送订单通知失败", e);
throw e;
}
});
}
}解决方案2:使用事务同步器(事务提交后执行)
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 注册事务同步回调(事务提交成功后才执行)
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 事务提交后异步发送通知
notificationService.asyncSendOrderCreatedMsg(order);
}
@Override
public void afterCompletion(int status) {
if (status == STATUS_ROLLED_BACK) {
log.info("事务回滚,取消通知发送");
}
}
}
);
}
}5. 异步编排:复杂异步任务的处理
场景: 用户注册后需要并行执行多个任务:发送邮件、初始化用户资料、赠送优惠券。
使用CompletableFuture进行任务编排:
@Service
public class UserRegistrationService {
@Async("taskExecutor")
public CompletableFuture<Void> processUserRegistration(User user) {
// 并行执行多个任务
CompletableFuture<Void> emailFuture = CompletableFuture.runAsync(() ->
emailService.sendWelcomeEmail(user), mailTaskExecutor);
CompletableFuture<Void> profileFuture = CompletableFuture.runAsync(() ->
profileService.initUserProfile(user), profileTaskExecutor);
CompletableFuture<Void> couponFuture = CompletableFuture.runAsync(() ->
couponService.giveWelcomeCoupon(user), couponTaskExecutor);
// 等待所有任务完成
return CompletableFuture.allOf(emailFuture, profileFuture, couponFuture)
.exceptionally(throwable -> {
log.error("用户注册后处理任务执行失败", throwable);
// 这里可以执行整体补偿逻辑
compensateRegistrationTasks(user, throwable);
return null;
});
}
// 顺序执行:先A后B
public CompletableFuture<Void> sequentialTasks(User user) {
return CompletableFuture.runAsync(() -> taskA(user))
.thenRunAsync(() -> taskB(user))
.thenRunAsync(() -> taskC(user));
}
// 条件执行:A成功才执行B
public CompletableFuture<Void> conditionalTasks(User user) {
return CompletableFuture.runAsync(() -> taskA(user))
.thenCompose(result -> taskB(user)) // 前提是taskA成功
.exceptionally(throwable -> {
log.error("条件任务执行失败", throwable);
return null;
});
}
}6. 监控与运维:异步任务的可见性
问题: 异步任务执行状态不透明,难以监控和管理。
解决方案: 建立异步任务监控体系
@Aspect
@Component
@Slf4j
public class AsyncTaskMonitorAspect {
@Autowired
private AsyncTaskMetrics metrics;
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object monitorAsyncTask(ProceedingJoinPoint joinPoint) throws Throwable {
String taskName = joinPoint.getSignature().toShortString();
long startTime = System.currentTimeMillis();
metrics.incrementActiveCount(taskName);
try {
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - startTime;
metrics.recordSuccess(taskName, duration);
log.info("异步任务执行成功: {}, 耗时: {}ms", taskName, duration);
return result;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
metrics.recordFailure(taskName, duration);
log.error("异步任务执行失败: {}, 耗时: {}ms", taskName, duration, e);
throw e;
} finally {
metrics.decrementActiveCount(taskName);
}
}
}
@Component
public class AsyncTaskMetrics {
private final MeterRegistry meterRegistry;
private final Map<String, AtomicInteger> activeTasks = new ConcurrentHashMap<>();
public AsyncTaskMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void incrementActiveCount(String taskName) {
activeTasks.computeIfAbsent(taskName, k -> new AtomicInteger()).incrementAndGet();
meterRegistry.gauge("async.task.active", Tags.of("task", taskName),
activeTasks.get(taskName), AtomicInteger::get);
}
public void recordSuccess(String taskName, long duration) {
meterRegistry.counter("async.task.success", "task", taskName).increment();
meterRegistry.timer("async.task.duration", "task", taskName).record(duration, TimeUnit.MILLISECONDS);
}
}7. 最佳实践总结
- 始终配置自定义线程池,避免使用默认实现
- 为不同业务类型配置独立线程池,避免互相影响
- 完善的异常处理机制,避免静默失败
- 注意事务边界,在异步方法内部管理事务
- 建立监控体系,保证异步任务的可见性
- 合理设置超时时间,避免任务堆积
- 考虑重试机制,对于可重试的失败任务
总结:
@Async注解虽然简单,但要真正用好却需要深入理解其背后的线程模型、异常处理、事务管理等复杂机制。两年的实践经验告诉我,异步编程不是简单的添加注解,而是需要建立完整的异步任务治理体系。从线程池配置到异常处理,从事务管理到监控运维,每一个环节都需要精心设计。