@Async注解看似简单,只需添加一个注解就能实现异步执行,但背后隐藏着线程池管理、异常处理、事务传播等复杂问题。本文将详细记录我在使用Spring异步编程时遇到的各种"坑",包括线程池配置不当导致的OOM、异常丢失难以排查、事务失效等实际问题,并提供完整的解决方案。

1. 初识@Async:简单的开始与隐藏的陷阱

​场景:​​ 用户注册后需要发送欢迎邮件,邮件发送耗时2-3秒,不应阻塞主注册流程。

V1.0 天真的实现:

@Service
public class UserService {
    
    @Autowired
    private EmailService emailService;
    
    public void register(User user) {
        // 1. 保存用户到数据库(同步)
        userMapper.insert(user);
        
        // 2. 发送欢迎邮件(希望异步执行)
        emailService.sendWelcomeEmail(user);
        
        // 3. 立即返回响应
    }
}

@Service
public class EmailService {
    
    @Async  // 添加异步注解
    public void sendWelcomeEmail(User user) {
        try {
            // 模拟邮件发送耗时
            Thread.sleep(3000);
            System.out.println("邮件发送成功给:" + user.getEmail());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

// 启动类添加@EnableAsync
@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

问题立即出现:

  1. ​默认线程池问题:​​ Spring默认使用SimpleAsyncTaskExecutor,每次调用都创建新线程
  2. ​异常丢失:​​ 异步方法中的异常不会传播到调用方
  3. ​事务上下文丢失:​​ 异步方法中无法使用调用方的事务

2. 线程池配置:从OOM到性能优化

​问题现象:​​ 在高并发场景下,应用频繁创建线程,最终导致内存溢出。

V2.0 自定义线程池配置:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    
    /**
     * 自定义线程池
     * 核心配置参数理解:
     * - corePoolSize: 核心线程数,线程池长期维持的线程数
     * - maxPoolSize: 最大线程数,当队列满时线程池可以创建的最大线程数  
     * - queueCapacity: 队列容量,缓冲执行任务的队列
     * - keepAliveSeconds: 线程空闲时间,超过核心线程数的线程空闲多久后被回收
     */
    @Bean("mailTaskExecutor")
    public ThreadPoolTaskExecutor mailTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心配置
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        
        // 线程命名,便于监控
        executor.setThreadNamePrefix("mail-async-");
        
        // 拒绝策略:当线程池和队列都满时的处理策略
        // CallerRunsPolicy: 由调用线程(通常是主线程)执行该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        
        executor.initialize();
        return executor;
    }
    
    @Bean("smsTaskExecutor") 
    public ThreadPoolTaskExecutor smsTaskExecutor() {
        // 为短信服务配置独立的线程池,避免互相影响
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("sms-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
    @Override
    public Executor getAsyncExecutor() {
        return mailTaskExecutor(); // 默认执行器
    }
}

使用指定的线程池:

@Service
public class EmailService {
    
    @Async("mailTaskExecutor")  // 指定使用邮件线程池
    public void sendWelcomeEmail(User user) {
        // 异步执行逻辑
    }
}

@Service 
public class SmsService {
    
    @Async("smsTaskExecutor")  // 指定使用短信线程池
    public void sendWelcomeSms(User user) {
        // 异步执行逻辑
    }
}

3. 异常处理:从"静默失败"到完整监控

​问题:​​ 异步方法抛出异常时,调用方完全不知情,问题难以排查。

V3.0 完善的异常处理方案:

方案1:实现AsyncUncaughtExceptionHandler(适用于无返回值异步方法)

@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
    
    @Autowired
    private ThreadPoolTaskExecutor mailTaskExecutor;
    
    @Override
    public Executor getAsyncExecutor() {
        return mailTaskExecutor;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

/**
 * 自定义异步异常处理器
 */
@Component
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    
    @Autowired
    private AlarmService alarmService; // 告警服务
    
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        // 1. 记录详细错误日志
        log.error("异步方法执行异常: 方法名[{}], 参数[{}]", method.getName(), Arrays.toString(params), ex);
        
        // 2. 发送告警通知
        String errorMsg = String.format("异步任务异常: %s.%s, 错误: %s", 
            method.getDeclaringClass().getSimpleName(), method.getName(), ex.getMessage());
        alarmService.sendAlert("ASYNC_TASK_ERROR", errorMsg);
        
        // 3. 记录到数据库便于排查
        saveAsyncErrorLog(method, params, ex);
    }
    
    private void saveAsyncErrorLog(Method method, Object[] params, Throwable ex) {
        AsyncErrorLog log = new AsyncErrorLog();
        log.setClassName(method.getDeclaringClass().getName());
        log.setMethodName(method.getName());
        log.setParameters(JSON.toJSONString(params));
        log.setErrorMessage(ex.getMessage());
        log.setStacktrace(ExceptionUtils.getStackTrace(ex));
        log.setCreateTime(new Date());
        asyncErrorLogMapper.insert(log);
    }
}

方案2:使用CompletableFuture(适用于有返回值场景)

@Service
public class EmailService {
    
    @Async("mailTaskExecutor")
    public CompletableFuture<Boolean> sendWelcomeEmail(User user) {
        try {
            // 模拟业务逻辑
            Thread.sleep(1000);
            
            // 模拟可能发生的异常
            if (user.getEmail().contains("test")) {
                throw new RuntimeException("测试邮箱发送失败");
            }
            
            log.info("邮件发送成功: {}", user.getEmail());
            return CompletableFuture.completedFuture(true);
            
        } catch (Exception e) {
            // 异常被包装在CompletableFuture中,不会被AsyncUncaughtExceptionHandler处理
            CompletableFuture<Boolean> future = new CompletableFuture<>();
            future.completeExceptionally(e);
            return future;
        }
    }
}

// 调用方处理异常
@Service
public class UserService {
    
    public void register(User user) {
        // 注册逻辑...
        
        CompletableFuture<Boolean> emailFuture = emailService.sendWelcomeEmail(user);
        
        // 方式1:同步等待并处理异常
        try {
            Boolean result = emailFuture.get(5, TimeUnit.SECONDS); // 设置超时时间
            log.info("邮件发送结果: {}", result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("邮件发送失败", e);
            // 可以在这里进行补偿操作,如记录到重试表
        }
        
        // 方式2:异步回调处理
        emailFuture.whenComplete((result, throwable) -> {
            if (throwable != null) {
                log.error("邮件发送异步回调中处理异常", throwable);
                // 执行补偿逻辑
                compensateEmailSending(user, throwable);
            } else {
                log.info("邮件发送成功,结果: {}", result);
            }
        });
    }
}

4. 事务管理:异步方法中的事务传播问题

​问题:​​ 在@Transactional方法中调用@Async方法,事务上下文不会传播到异步线程。

错误示例:

@Service
public class OrderService {
    
    @Transactional
    public void createOrder(Order order) {
        // 1. 保存订单(在事务中)
        orderMapper.insert(order);
        
        // 2. 异步发送通知(新线程,不在事务中)
        notificationService.asyncSendOrderCreatedMsg(order);
        
        // 3. 如果这里抛出异常,希望回滚订单保存,但通知已经发送无法撤回
        if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            throw new RuntimeException("订单金额必须大于0");
        }
    }
}

解决方案1:在异步方法内部管理事务

@Service
public class NotificationService {
    
    @Autowired
    private TransactionTemplate transactionTemplate;
    
    @Async
    public void asyncSendOrderCreatedMsg(Order order) {
        // 在异步方法内部开启新事务
        transactionTemplate.execute(status -> {
            try {
                // 查询订单最新状态(在新事务中)
                Order latestOrder = orderMapper.selectById(order.getId());
                if ("CREATED".equals(latestOrder.getStatus())) {
                    // 发送通知
                    sendNotification(latestOrder);
                    // 更新通知状态
                    updateNotificationStatus(latestOrder.getId(), "SENT");
                }
                return null;
            } catch (Exception e) {
                status.setRollbackOnly(); // 标记回滚
                log.error("发送订单通知失败", e);
                throw e;
            }
        });
    }
}

解决方案2:使用事务同步器(事务提交后执行)

@Service
public class OrderService {
    
    @Transactional
    public void createOrder(Order order) {
        // 1. 保存订单
        orderMapper.insert(order);
        
        // 2. 注册事务同步回调(事务提交成功后才执行)
        TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 事务提交后异步发送通知
                    notificationService.asyncSendOrderCreatedMsg(order);
                }
                
                @Override
                public void afterCompletion(int status) {
                    if (status == STATUS_ROLLED_BACK) {
                        log.info("事务回滚,取消通知发送");
                    }
                }
            }
        );
    }
}

5. 异步编排:复杂异步任务的处理

​场景:​​ 用户注册后需要并行执行多个任务:发送邮件、初始化用户资料、赠送优惠券。

使用CompletableFuture进行任务编排:

@Service
public class UserRegistrationService {
    
    @Async("taskExecutor")
    public CompletableFuture<Void> processUserRegistration(User user) {
        // 并行执行多个任务
        CompletableFuture<Void> emailFuture = CompletableFuture.runAsync(() -> 
            emailService.sendWelcomeEmail(user), mailTaskExecutor);
            
        CompletableFuture<Void> profileFuture = CompletableFuture.runAsync(() -> 
            profileService.initUserProfile(user), profileTaskExecutor);
            
        CompletableFuture<Void> couponFuture = CompletableFuture.runAsync(() -> 
            couponService.giveWelcomeCoupon(user), couponTaskExecutor);
        
        // 等待所有任务完成
        return CompletableFuture.allOf(emailFuture, profileFuture, couponFuture)
            .exceptionally(throwable -> {
                log.error("用户注册后处理任务执行失败", throwable);
                // 这里可以执行整体补偿逻辑
                compensateRegistrationTasks(user, throwable);
                return null;
            });
    }
    
    // 顺序执行:先A后B
    public CompletableFuture<Void> sequentialTasks(User user) {
        return CompletableFuture.runAsync(() -> taskA(user))
            .thenRunAsync(() -> taskB(user))
            .thenRunAsync(() -> taskC(user));
    }
    
    // 条件执行:A成功才执行B
    public CompletableFuture<Void> conditionalTasks(User user) {
        return CompletableFuture.runAsync(() -> taskA(user))
            .thenCompose(result -> taskB(user)) // 前提是taskA成功
            .exceptionally(throwable -> {
                log.error("条件任务执行失败", throwable);
                return null;
            });
    }
}

6. 监控与运维:异步任务的可见性

​问题:​​ 异步任务执行状态不透明,难以监控和管理。

​解决方案:​​ 建立异步任务监控体系

@Aspect
@Component
@Slf4j
public class AsyncTaskMonitorAspect {
    
    @Autowired
    private AsyncTaskMetrics metrics;
    
    @Around("@annotation(org.springframework.scheduling.annotation.Async)")
    public Object monitorAsyncTask(ProceedingJoinPoint joinPoint) throws Throwable {
        String taskName = joinPoint.getSignature().toShortString();
        long startTime = System.currentTimeMillis();
        metrics.incrementActiveCount(taskName);
        
        try {
            Object result = joinPoint.proceed();
            long duration = System.currentTimeMillis() - startTime;
            metrics.recordSuccess(taskName, duration);
            log.info("异步任务执行成功: {}, 耗时: {}ms", taskName, duration);
            return result;
            
        } catch (Exception e) {
            long duration = System.currentTimeMillis() - startTime;
            metrics.recordFailure(taskName, duration);
            log.error("异步任务执行失败: {}, 耗时: {}ms", taskName, duration, e);
            throw e;
            
        } finally {
            metrics.decrementActiveCount(taskName);
        }
    }
}

@Component
public class AsyncTaskMetrics {
    private final MeterRegistry meterRegistry;
    private final Map<String, AtomicInteger> activeTasks = new ConcurrentHashMap<>();
    
    public AsyncTaskMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void incrementActiveCount(String taskName) {
        activeTasks.computeIfAbsent(taskName, k -> new AtomicInteger()).incrementAndGet();
        meterRegistry.gauge("async.task.active", Tags.of("task", taskName), 
            activeTasks.get(taskName), AtomicInteger::get);
    }
    
    public void recordSuccess(String taskName, long duration) {
        meterRegistry.counter("async.task.success", "task", taskName).increment();
        meterRegistry.timer("async.task.duration", "task", taskName).record(duration, TimeUnit.MILLISECONDS);
    }
}

7. 最佳实践总结

  1. 始终配置自定义线程池​​,避免使用默认实现
  2. 为不同业务类型配置独立线程池​​,避免互相影响
  3. 完善的异常处理机制​,避免静默失败
  4. 注意事务边界​,在异步方法内部管理事务
  5. 建立监控体系​,保证异步任务的可见性
  6. 合理设置超时时间​,避免任务堆积
  7. 考虑重试机制​,对于可重试的失败任务

总结:

@Async注解虽然简单,但要真正用好却需要深入理解其背后的线程模型、异常处理、事务管理等复杂机制。两年的实践经验告诉我,​异步编程不是简单的添加注解,而是需要建立完整的异步任务治理体系​。从线程池配置到异常处理,从事务管理到监控运维,每一个环节都需要精心设计。

如果觉得我的文章对你有用,请随意赞赏