AI摘要

文章通过微服务上下文传递问题,系统解析ThreadLocal原理、内存泄漏风险与最佳实践,并深入演示CompletableFuture的链式、并行、异常及超时处理,结合性能测试给出IO/计算密集型线程池配置建议,总结生产环境监控与优雅关闭要点。

一、引言:一个微服务上下文传递的难题

在微服务架构中,我们经常需要在不同线程间传递用户上下文(如用户ID、请求ID等)。最初我们使用静态Map来存储,结果遇到了严重的数据错乱问题:

// ❌ 错误示例:静态Map存储上下文
public class UserContextHolder {
    private static final Map<String, Object> context = new ConcurrentHashMap<>();
    
    public static void set(String key, Object value) {
        context.put(key, value);
    }
    
    public static Object get(String key) {
        return context.get(key);
    }
}

在多线程环境下,多个请求会相互覆盖上下文数据。这让我深刻认识到:​共享可变状态是并发编程万恶之源​。

二、ThreadLocal:线程安全的局部变量

2.1 ThreadLocal的核心原理

ThreadLocal为每个线程提供独立的变量副本,避免了线程间的数据竞争。
ThreadLocal基本使用

import java.util.concurrent.TimeUnit;

public class ThreadLocalDemo {
    
    // 创建ThreadLocal实例
    private static final ThreadLocal<String> userContext = new ThreadLocal<>();
    private static final ThreadLocal<Integer> requestIdContext = ThreadLocal.withInitial(() -> 0);
    
    public static void main(String[] args) throws InterruptedException {
        // 创建并启动多个线程
        Thread thread1 = new Thread(new UserRequest("用户A", 1001), "Thread-1");
        Thread thread2 = new Thread(new UserRequest("用户B", 1002), "Thread-2");
        Thread thread3 = new Thread(new UserRequest("用户C", 1003), "Thread-3");
        
        thread1.start();
        thread2.start();
        thread3.start();
        
        // 等待所有线程执行完成
        thread1.join();
        thread2.join();
        thread3.join();
        
        System.out.println("所有线程执行完成");
    }
    
    // 用户请求处理类
    static class UserRequest implements Runnable {
        private final String userName;
        private final int requestId;
        
        public UserRequest(String userName, int requestId) {
            this.userName = userName;
            this.requestId = requestId;
        }
        
        @Override
        public void run() {
            try {
                // 设置线程局部变量
                userContext.set(userName);
                requestIdContext.set(requestId);
                
                // 模拟业务处理
                processRequest();
            } finally {
                // 重要:必须清理ThreadLocal,防止内存泄漏
                userContext.remove();
                requestIdContext.remove();
            }
        }
        
        private void processRequest() {
            // 获取当前线程的上下文数据
            String currentUser = userContext.get();
            Integer currentRequestId = requestIdContext.get();
            
            System.out.println(Thread.currentThread().getName() + 
                             " -> 用户: " + currentUser + 
                             ", 请求ID: " + currentRequestId);
            
            // 模拟嵌套方法调用,上下文依然可以获取
            nestedMethod();
        }
        
        private void nestedMethod() {
            // 嵌套方法中依然可以获取到ThreadLocal中的数据
            System.out.println(Thread.currentThread().getName() + 
                             " -> 嵌套方法获取用户: " + userContext.get());
        }
    }
}

2.2 ThreadLocal的内存泄漏问题与解决方案

import java.lang.ref.WeakReference;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadLocalMemoryLeakDemo {
    
    // 模拟大对象
    static class BigObject {
        private final byte[] data = new byte[10 * 1024 * 1024]; // 10MB
        
        @Override
        protected void finalize() throws Throwable {
            System.out.println("BigObject被GC回收");
            super.finalize();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 1. 演示内存泄漏的场景
        demonstrateMemoryLeak();
        
        // 等待GC
        System.gc();
        TimeUnit.SECONDS.sleep(3);
        
        System.out.println("\n--- 分隔线 ---\n");
        
        // 2. 演示正确使用ThreadLocal
        demonstrateCorrectUsage();
        
        // 等待GC
        System.gc();
        TimeUnit.SECONDS.sleep(3);
    }
    
    private static void demonstrateMemoryLeak() {
        System.out.println("=== 演示ThreadLocal内存泄漏 ===");
        
        ThreadLocal<BigObject> threadLocal = new ThreadLocal<>();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 在线程池线程中设置ThreadLocal
        executor.submit(() -> {
            threadLocal.set(new BigObject());
            System.out.println("线程池线程设置了BigObject");
            // 注意:这里没有调用remove()!
        });
        
        // 等待任务完成
        executor.shutdown();
        try {
            executor.awaitTermination(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("线程池已关闭,但ThreadLocal中的BigObject可能还未被回收");
    }
    
    private static void demonstrateCorrectUsage() {
        System.out.println("=== 演示ThreadLocal正确用法 ===");
        
        ThreadLocal<BigObject> threadLocal = new ThreadLocal<>();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 在线程池线程中设置ThreadLocal,并正确清理
        executor.submit(() -> {
            try {
                threadLocal.set(new BigObject());
                System.out.println("线程池线程设置了BigObject");
                // 模拟业务处理
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                // 关键:使用完后一定要remove!
                threadLocal.remove();
                System.out.println("线程池线程清理了ThreadLocal");
            }
        });
        
        // 等待任务完成
        executor.shutdown();
        try {
            executor.awaitTermination(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("线程池已关闭,ThreadLocal已正确清理");
    }
    
    // 3. 使用InheritableThreadLocal实现父子线程数据传递
    private static void demonstrateInheritableThreadLocal() {
        System.out.println("=== 演示InheritableThreadLocal ===");
        
        InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
        inheritableThreadLocal.set("父线程数据");
        
        Thread childThread = new Thread(() -> {
            System.out.println("子线程获取数据: " + inheritableThreadLocal.get());
        });
        
        childThread.start();
        
        try {
            childThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 清理
        inheritableThreadLocal.remove();
    }
}

2.3 ThreadLocal在实际项目中的应用:请求上下文管理

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * 请求上下文管理器
 * 用于在同一个请求的整个调用链中传递上下文信息
 */
public class RequestContextHolder {
    
    private RequestContextHolder() {
        // 工具类,禁止实例化
    }
    
    // 使用ThreadLocal存储请求上下文
    private static final ThreadLocal<RequestContext> REQUEST_CONTEXT = new ThreadLocal<>();
    
    /**
     * 请求上下文类
     */
    public static class RequestContext {
        private final String requestId;
        private final long startTime;
        private final Map<String, Object> attributes;
        
        public RequestContext() {
            this.requestId = generateRequestId();
            this.startTime = System.currentTimeMillis();
            this.attributes = new HashMap<>();
        }
        
        public RequestContext(String requestId) {
            this.requestId = requestId;
            this.startTime = System.currentTimeMillis();
            this.attributes = new HashMap<>();
        }
        
        private String generateRequestId() {
            return UUID.randomUUID().toString().replace("-", "");
        }
        
        public String getRequestId() {
            return requestId;
        }
        
        public long getStartTime() {
            return startTime;
        }
        
        public long getElapsedTime() {
            return System.currentTimeMillis() - startTime;
        }
        
        public void setAttribute(String key, Object value) {
            attributes.put(key, value);
        }
        
        @SuppressWarnings("unchecked")
        public <T> T getAttribute(String key) {
            return (T) attributes.get(key);
        }
        
        public void removeAttribute(String key) {
            attributes.remove(key);
        }
        
        public Map<String, Object> getAllAttributes() {
            return new HashMap<>(attributes);
        }
    }
    
    /**
     * 初始化请求上下文
     */
    public static void init() {
        REQUEST_CONTEXT.set(new RequestContext());
    }
    
    /**
     * 初始化请求上下文(指定requestId)
     */
    public static void init(String requestId) {
        REQUEST_CONTEXT.set(new RequestContext(requestId));
    }
    
    /**
     * 获取当前请求上下文
     */
    public static RequestContext getCurrentContext() {
        RequestContext context = REQUEST_CONTEXT.get();
        if (context == null) {
            throw new IllegalStateException("请求上下文未初始化,请确保在请求入口处调用RequestContextHolder.init()");
        }
        return context;
    }
    
    /**
     * 获取当前请求ID
     */
    public static String getCurrentRequestId() {
        return getCurrentContext().getRequestId();
    }
    
    /**
     * 设置上下文属性
     */
    public static void setAttribute(String key, Object value) {
        getCurrentContext().setAttribute(key, value);
    }
    
    /**
     * 获取上下文属性
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAttribute(String key) {
        return getCurrentContext().getAttribute(key);
    }
    
    /**
     * 获取当前用户ID
     */
    public static String getCurrentUserId() {
        return getAttribute("userId");
    }
    
    /**
     * 设置当前用户ID
     */
    public static void setCurrentUserId(String userId) {
        setAttribute("userId", userId);
    }
    
    /**
     * 获取客户端IP
     */
    public static String getClientIp() {
        return getAttribute("clientIp");
    }
    
    /**
     * 设置客户端IP
     */
    public static void setClientIp(String clientIp) {
        setAttribute("clientIp", clientIp);
    }
    
    /**
     * 清理请求上下文(必须在请求结束时调用)
     */
    public static void clear() {
        REQUEST_CONTEXT.remove();
    }
    
    /**
     * 模拟业务服务类
     */
    public static class OrderService {
        public void createOrder(String productId, int quantity) {
            String requestId = RequestContextHolder.getCurrentRequestId();
            String userId = RequestContextHolder.getCurrentUserId();
            
            System.out.println(String.format("[%s] 用户 %s 创建订单 - 商品: %s, 数量: %d", 
                requestId, userId, productId, quantity));
            
            // 模拟调用其他服务
            InventoryService inventoryService = new InventoryService();
            inventoryService.deductInventory(productId, quantity);
        }
    }
    
    /**
     * 模拟库存服务
     */
    public static class InventoryService {
        public void deductInventory(String productId, int quantity) {
            String requestId = RequestContextHolder.getCurrentRequestId();
            
            System.out.println(String.format("[%s] 扣减库存 - 商品: %s, 数量: %d", 
                requestId, productId, quantity));
            
            // 这里可以继续传递上下文
        }
    }
    
    /**
     * 测试用例
     */
    public static void main(String[] args) {
        // 模拟HTTP请求处理
        simulateHttpRequest();
        
        // 模拟另一个并发请求
        new Thread(RequestContextHolder::simulateHttpRequest).start();
    }
    
    private static void simulateHttpRequest() {
        try {
            // 1. 请求开始时初始化上下文
            RequestContextHolder.init();
            
            // 2. 设置请求相关属性
            RequestContextHolder.setCurrentUserId("user_" + Thread.currentThread().getId());
            RequestContextHolder.setClientIp("192.168.1." + Thread.currentThread().getId());
            
            // 3. 执行业务逻辑
            OrderService orderService = new OrderService();
            orderService.createOrder("product_001", 2);
            
            // 4. 打印当前请求耗时
            long elapsed = RequestContextHolder.getCurrentContext().getElapsedTime();
            System.out.println(String.format("[%s] 请求处理完成,耗时: %dms", 
                RequestContextHolder.getCurrentRequestId(), elapsed));
            
        } finally {
            // 5. 请求结束时清理上下文(重要!)
            RequestContextHolder.clear();
        }
    }
}

三、CompletableFuture:异步编程的利器

3.1 CompletableFuture基础用法

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CompletableFutureBasicDemo {
    
    // 自定义线程池(生产环境建议使用)
    private static final ExecutorService customExecutor = 
        Executors.newFixedThreadPool(4, r -> {
            Thread thread = new Thread(r);
            thread.setName("custom-pool-" + thread.getId());
            return thread;
        });
    
    public static void main(String[] args) {
        System.out.println("主线程开始: " + Thread.currentThread().getName());
        
        // 示例1:基本异步执行
        basicAsyncExample();
        
        // 示例2:链式调用
        chainCallExample();
        
        // 示例3:组合多个Future
        combineFutureExample();
        
        // 示例4:异常处理
        exceptionHandlingExample();
        
        // 示例5:超时控制
        timeoutControlExample();
        
        // 关闭线程池
        customExecutor.shutdown();
    }
    
    private static void basicAsyncExample() {
        System.out.println("\n=== 示例1:基本异步执行 ===");
        
        // 方式1:使用默认的ForkJoinPool
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            System.out.println("future1执行线程: " + Thread.currentThread().getName());
            return "Hello";
        });
        
        // 方式2:使用自定义线程池
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            System.out.println("future2执行线程: " + Thread.currentThread().getName());
            return "World";
        }, customExecutor);
        
        // 阻塞获取结果
        try {
            String result1 = future1.get();
            String result2 = future2.get();
            System.out.println("结果: " + result1 + " " + result2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void chainCallExample() {
        System.out.println("\n=== 示例2:链式调用 ===");
        
        CompletableFuture.supplyAsync(() -> {
            System.out.println("第一步:获取用户ID - " + Thread.currentThread().getName());
            sleep(300);
            return "user_123";
        }, customExecutor).thenApply(userId -> {
            System.out.println("第二步:根据用户ID查询用户信息 - " + Thread.currentThread().getName());
            sleep(300);
            return "用户: " + userId;
        }).thenAccept(userInfo -> {
            System.out.println("第三步:处理用户信息 - " + Thread.currentThread().getName());
            System.out.println("处理结果: " + userInfo);
        }).thenRun(() -> {
            System.out.println("第四步:清理资源 - " + Thread.currentThread().getName());
        });
        
        sleep(2000); // 等待异步任务完成
    }
    
    private static void combineFutureExample() {
        System.out.println("\n=== 示例3:组合多个Future ===");
        
        // 模拟并行调用多个服务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(800);
            System.out.println("调用用户服务完成");
            return "用户信息";
        }, customExecutor);
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(600);
            System.out.println("调用商品服务完成");
            return "商品信息";
        }, customExecutor);
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            sleep(400);
            System.out.println("调用库存服务完成");
            return "库存信息";
        }, customExecutor);
        
        // 方式1:全部完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
        allOf.thenRun(() -> {
            System.out.println("所有服务调用完成");
            try {
                System.out.println("用户服务结果: " + future1.get());
                System.out.println("商品服务结果: " + future2.get());
                System.out.println("库存服务结果: " + future3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        
        // 方式2:任意一个完成
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
        anyOf.thenAccept(result -> {
            System.out.println("最快返回的服务结果: " + result);
        });
        
        sleep(2000);
    }
    
    private static void exceptionHandlingExample() {
        System.out.println("\n=== 示例4:异常处理 ===");
        
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始执行可能失败的任务");
            if (Math.random() > 0.5) {
                throw new RuntimeException("随机失败!");
            }
            return "成功结果";
        }, customExecutor);
        
        // 异常处理方式1:exceptionally(类似catch)
        future.exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "默认值";
        }).thenAccept(result -> {
            System.out.println("最终结果: " + result);
        });
        
        // 异常处理方式2:handle(无论成功失败都会执行)
        future.handle((result, ex) -> {
            if (ex != null) {
                System.out.println("处理异常: " + ex.getMessage());
                return "异常处理后的默认值";
            }
            return result + "(处理后的)";
        }).thenAccept(result -> {
            System.out.println("handle处理结果: " + result);
        });
        
        sleep(1000);
    }
    
    private static void timeoutControlExample() {
        System.out.println("\n=== 示例5:超时控制 ===");
        
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3000); // 模拟耗时操作
            return "正常结果";
        }, customExecutor);
        
        // 使用completeOnTimeout设置超时默认值
        CompletableFuture<String> timeoutFuture = future
            .completeOnTimeout("超时默认值", 1000, TimeUnit.MILLISECONDS)
            .exceptionally(ex -> "异常默认值");
        
        try {
            String result = timeoutFuture.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("超时控制结果: " + result);
        } catch (TimeoutException e) {
            System.out.println("获取结果超时");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void sleep(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3.2 CompletableFuture在实际项目中的应用:并行调用优化

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 并行服务调用优化示例
 * 场景:订单详情页需要聚合多个服务的数据
 */
public class ParallelServiceCallDemo {
    
    // 模拟商品信息服务
    static class ProductService {
        public Product getProductById(String productId) {
            sleep(200); // 模拟网络延迟
            System.out.println(Thread.currentThread().getName() + " - 获取商品信息: " + productId);
            return new Product(productId, "商品" + productId, new BigDecimal("99.99"));
        }
    }
    
    // 模拟库存服务
    static class InventoryService {
        public Integer getStock(String productId) {
            sleep(150); // 模拟网络延迟
            System.out.println(Thread.currentThread().getName() + " - 获取库存: " + productId);
            return (int) (Math.random() * 100); // 模拟库存数量
        }
    }
    
    // 模拟价格服务
    static class PriceService {
        public BigDecimal getPrice(String productId) {
            sleep(100); // 模拟网络延迟
            System.out.println(Thread.currentThread().getName() + " - 获取价格: " + productId);
            return new BigDecimal(Math.random() * 100 + 50).setScale(2, BigDecimal.ROUND_HALF_UP);
        }
    }
    
    // 模拟评价服务
    static class ReviewService {
        public Review getReview(String productId) {
            sleep(250); // 模拟网络延迟
            System.out.println(Thread.currentThread().getName() + " - 获取评价: " + productId);
            return new Review(productId, (int) (Math.random() * 5) + 1, "评价内容");
        }
    }
    
    // 商品类
    static class Product {
        String id;
        String name;
        BigDecimal price;
        
        public Product(String id, String name, BigDecimal price) {
            this.id = id;
            this.name = name;
            this.price = price;
        }
    }
    
    // 评价类
    static class Review {
        String productId;
        int rating;
        String content;
        
        public Review(String productId, int rating, String content) {
            this.productId = productId;
            this.rating = rating;
            this.content = content;
        }
    }
    
    // 商品详情类
    static class ProductDetail {
        String productId;
        Product product;
        Integer stock;
        BigDecimal price;
        Review review;
        
        @Override
        public String toString() {
            return String.format("商品详情{id=%s, 名称=%s, 价格=%.2f, 库存=%d, 评分=%d星}", 
                productId, product.name, price, stock, review != null ? review.rating : 0);
        }
    }
    
    // 传统的串行调用方式
    static class TraditionalService {
        private final ProductService productService = new ProductService();
        private final InventoryService inventoryService = new InventoryService();
        private final PriceService priceService = new PriceService();
        private final ReviewService reviewService = new ReviewService();
        
        public ProductDetail getProductDetail(String productId) {
            System.out.println("\n=== 传统串行调用 ===");
            long startTime = System.currentTimeMillis();
            
            // 串行调用各个服务
            Product product = productService.getProductById(productId);
            Integer stock = inventoryService.getStock(productId);
            BigDecimal price = priceService.getPrice(productId);
            Review review = reviewService.getReview(productId);
            
            long endTime = System.currentTimeMillis();
            System.out.println("串行调用总耗时: " + (endTime - startTime) + "ms");
            
            ProductDetail detail = new ProductDetail();
            detail.productId = productId;
            detail.product = product;
            detail.stock = stock;
            detail.price = price;
            detail.review = review;
            
            return detail;
        }
    }
    
    // 使用CompletableFuture的并行调用方式
    static class ParallelService {
        private final ProductService productService = new ProductService();
        private final InventoryService inventoryService = new InventoryService();
        private final PriceService priceService = new PriceService();
        private final ReviewService reviewService = new ReviewService();
        private final ExecutorService executor = Executors.newFixedThreadPool(4);
        
        public ProductDetail getProductDetail(String productId) {
            System.out.println("\n=== 并行调用 ===");
            long startTime = System.currentTimeMillis();
            
            // 并行调用各个服务
            CompletableFuture<Product> productFuture = 
                CompletableFuture.supplyAsync(() -> productService.getProductById(productId), executor);
            
            CompletableFuture<Integer> stockFuture = 
                CompletableFuture.supplyAsync(() -> inventoryService.getStock(productId), executor);
            
            CompletableFuture<BigDecimal> priceFuture = 
                CompletableFuture.supplyAsync(() -> priceService.getPrice(productId), executor);
            
            CompletableFuture<Review> reviewFuture = 
                CompletableFuture.supplyAsync(() -> reviewService.getReview(productId), executor);
            
            // 等待所有任务完成,并组装结果
            CompletableFuture<ProductDetail> detailFuture = 
                CompletableFuture.allOf(productFuture, stockFuture, priceFuture, reviewFuture)
                    .thenApply(v -> {
                        ProductDetail detail = new ProductDetail();
                        detail.productId = productId;
                        try {
                            detail.product = productFuture.get();
                            detail.stock = stockFuture.get();
                            detail.price = priceFuture.get();
                            detail.review = reviewFuture.get();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        return detail;
                    });
            
            ProductDetail detail = detailFuture.join();
            
            long endTime = System.currentTimeMillis();
            System.out.println("并行调用总耗时: " + (endTime - startTime) + "ms");
            
            return detail;
        }
        
        public void shutdown() {
            executor.shutdown();
        }
    }
    
    // 批量查询优化:同时查询多个商品
    static class BatchQueryService {
        private final ProductService productService = new ProductService();
        private final InventoryService inventoryService = new InventoryService();
        private final ExecutorService executor = Executors.newFixedThreadPool(8);
        
        public Map<String, ProductDetail> batchQueryProductDetails(List<String> productIds) {
            System.out.println("\n=== 批量查询优化 ===");
            long startTime = System.currentTimeMillis();
            
            // 为每个商品创建并行查询任务
            List<CompletableFuture<ProductDetail>> futures = productIds.stream()
                .map(productId -> querySingleProduct(productId))
                .collect(Collectors.toList());
            
            // 等待所有查询完成
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
            );
            
            // 收集结果
            Map<String, ProductDetail> result = new ConcurrentHashMap<>();
            allFutures.thenRun(() -> {
                futures.forEach(future -> {
                    try {
                        ProductDetail detail = future.get();
                        result.put(detail.productId, detail);
                    } catch (Exception e) {
                        // 记录异常,但不影响其他结果
                        System.err.println("查询商品失败: " + e.getMessage());
                    }
                });
            }).join(); // 阻塞直到所有任务完成
            
            long endTime = System.currentTimeMillis();
            System.out.println("批量查询" + productIds.size() + "个商品总耗时: " + (endTime - startTime) + "ms");
            
            return result;
        }
        
        private CompletableFuture<ProductDetail> querySingleProduct(String productId) {
            return CompletableFuture.supplyAsync(() -> {
                Product product = productService.getProductById(productId);
                return product;
            }, executor).thenCombineAsync(
                CompletableFuture.supplyAsync(() -> inventoryService.getStock(productId), executor),
                (product, stock) -> {
                    ProductDetail detail = new ProductDetail();
                    detail.productId = productId;
                    detail.product = product;
                    detail.stock = stock;
                    return detail;
                }
            , executor).exceptionally(ex -> {
                // 异常处理:返回一个空的ProductDetail
                System.err.println("查询商品" + productId + "失败: " + ex.getMessage());
                ProductDetail detail = new ProductDetail();
                detail.productId = productId;
                return detail;
            });
        }
        
        public void shutdown() {
            executor.shutdown();
        }
    }
    
    // 超时和降级策略
    static class ResilientService {
        private final PriceService priceService = new PriceService();
        private final ExecutorService executor = Executors.newFixedThreadPool(2);
        
        public BigDecimal getPriceWithFallback(String productId) {
            System.out.println("\n=== 带超时和降级的服务调用 ===");
            
            // 主服务调用,设置超时时间
            CompletableFuture<BigDecimal> mainService = CompletableFuture.supplyAsync(() -> {
                return priceService.getPrice(productId);
            }, executor).orTimeout(300, TimeUnit.MILLISECONDS); // 300ms超时
            
            // 降级服务(缓存或备用服务)
            CompletableFuture<BigDecimal> fallbackService = CompletableFuture.supplyAsync(() -> {
                sleep(100); // 降级服务更快
                return new BigDecimal("88.88"); // 默认价格
            }, executor);
            
            // 使用主服务,超时时使用降级服务
            return mainService.exceptionally(ex -> {
                System.out.println("主服务调用失败,使用降级服务: " + ex.getMessage());
                return fallbackService.join();
            }).join();
        }
        
        public void shutdown() {
            executor.shutdown();
        }
    }
    
    public static void main(String[] args) {
        // 1. 对比串行和并行调用
        TraditionalService traditionalService = new TraditionalService();
        ParallelService parallelService = new ParallelService();
        
        ProductDetail traditionalResult = traditionalService.getProductDetail("P001");
        System.out.println("串行结果: " + traditionalResult);
        
        ProductDetail parallelResult = parallelService.getProductDetail("P001");
        System.out.println("并行结果: " + parallelResult);
        
        // 2. 批量查询演示
        List<String> productIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            productIds.add("P00" + i);
        }
        
        BatchQueryService batchService = new BatchQueryService();
        Map<String, ProductDetail> batchResults = batchService.batchQueryProductDetails(productIds);
        
        System.out.println("\n批量查询结果:");
        batchResults.forEach((id, detail) -> {
            System.out.println("  " + id + ": " + detail);
        });
        
        // 3. 带超时和降级的服务调用
        ResilientService resilientService = new ResilientService();
        BigDecimal price = resilientService.getPriceWithFallback("P001");
        System.out.println("\n最终价格: " + price);
        
        // 清理资源
        parallelService.shutdown();
        batchService.shutdown();
        resilientService.shutdown();
    }
    
    private static void sleep(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3.3 性能对比测试与最佳实践

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * 并发性能对比测试
 */
public class ConcurrencyPerformanceTest {
    
    // 模拟一个耗时服务
    static class SlowService {
        private final int delayMs;
        
        public SlowService(int delayMs) {
            this.delayMs = delayMs;
        }
        
        public String process(int id) {
            try {
                // 模拟业务处理时间
                Thread.sleep(delayMs);
                return "Result-" + id;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Error-" + id;
            }
        }
    }
    
    // 测试配置
    static class TestConfig {
        int taskCount;          // 任务数量
        int serviceDelayMs;     // 每个任务耗时(ms)
        int threadPoolSize;     // 线程池大小
        
        public TestConfig(int taskCount, int serviceDelayMs, int threadPoolSize) {
            this.taskCount = taskCount;
            this.serviceDelayMs = serviceDelayMs;
            this.threadPoolSize = threadPoolSize;
        }
    }
    
    // 测试结果
    static class TestResult {
        String methodName;
        long totalTime;
        int successCount;
        int failedCount;
        
        public TestResult(String methodName, long totalTime, int successCount, int failedCount) {
            this.methodName = methodName;
            this.totalTime = totalTime;
            this.successCount = successCount;
            this.failedCount = failedCount;
        }
        
        @Override
        public String toString() {
            return String.format("%-20s 总耗时: %6dms  成功: %4d  失败: %4d  QPS: %.2f", 
                methodName, totalTime, successCount, failedCount, 
                successCount * 1000.0 / totalTime);
        }
    }
    
    /**
     * 方法1:传统串行处理
     */
    static TestResult testSerial(TestConfig config) {
        SlowService service = new SlowService(config.serviceDelayMs);
        List<String> results = new ArrayList<>();
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < config.taskCount; i++) {
            try {
                results.add(service.process(i));
            } catch (Exception e) {
                // 处理异常
            }
        }
        
        long endTime = System.currentTimeMillis();
        return new TestResult("串行处理", endTime - startTime, results.size(), 0);
    }
    
    /**
     * 方法2:使用ExecutorService
     */
    static TestResult testExecutorService(TestConfig config) throws InterruptedException {
        SlowService service = new SlowService(config.serviceDelayMs);
        ExecutorService executor = Executors.newFixedThreadPool(config.threadPoolSize);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failedCount = new AtomicInteger(0);
        
        long startTime = System.currentTimeMillis();
        
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < config.taskCount; i++) {
            final int taskId = i;
            Future<String> future = executor.submit(() -> service.process(taskId));
            futures.add(future);
        }
        
        // 等待所有任务完成
        for (Future<String> future : futures) {
            try {
                future.get();
                successCount.incrementAndGet();
            } catch (Exception e) {
                failedCount.incrementAndGet();
            }
        }
        
        long endTime = System.currentTimeMillis();
        executor.shutdown();
        
        return new TestResult("ExecutorService", endTime - startTime, 
                             successCount.get(), failedCount.get());
    }
    
    /**
     * 方法3:使用CompletableFuture(默认线程池)
     */
    static TestResult testCompletableFutureDefault(TestConfig config) {
        SlowService service = new SlowService(config.serviceDelayMs);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failedCount = new AtomicInteger(0);
        
        long startTime = System.currentTimeMillis();
        
        List<CompletableFuture<Void>> futures = IntStream.range(0, config.taskCount)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> service.process(i))
                .thenAccept(result -> successCount.incrementAndGet())
                .exceptionally(ex -> {
                    failedCount.incrementAndGet();
                    return null;
                })
            ).collect(Collectors.toList());
        
        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        
        long endTime = System.currentTimeMillis();
        
        return new TestResult("CompletableFuture(默认)", endTime - startTime, 
                             successCount.get(), failedCount.get());
    }
    
    /**
     * 方法4:使用CompletableFuture(自定义线程池)
     */
    static TestResult testCompletableFutureCustom(TestConfig config) {
        SlowService service = new SlowService(config.serviceDelayMs);
        ExecutorService executor = Executors.newFixedThreadPool(config.threadPoolSize);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failedCount = new AtomicInteger(0);
        
        long startTime = System.currentTimeMillis();
        
        List<CompletableFuture<Void>> futures = IntStream.range(0, config.taskCount)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> service.process(i), executor)
                .thenAccept(result -> successCount.incrementAndGet())
                .exceptionally(ex -> {
                    failedCount.incrementAndGet();
                    return null;
                })
            ).collect(Collectors.toList());
        
        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        
        long endTime = System.currentTimeMillis();
        executor.shutdown();
        
        return new TestResult("CompletableFuture(自定义)", endTime - startTime, 
                             successCount.get(), failedCount.get());
    }
    
    /**
     * 方法5:使用并行流(Parallel Stream)
     */
    static TestResult testParallelStream(TestConfig config) {
        SlowService service = new SlowService(config.serviceDelayMs);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failedCount = new AtomicInteger(0);
        
        long startTime = System.currentTimeMillis();
        
        IntStream.range(0, config.taskCount)
            .parallel()
            .forEach(i -> {
                try {
                    service.process(i);
                    successCount.incrementAndGet();
                } catch (Exception e) {
                    failedCount.incrementAndGet();
                }
            });
        
        long endTime = System.currentTimeMillis();
        
        return new TestResult("并行流(ParallelStream)", endTime - startTime, 
                             successCount.get(), failedCount.get());
    }
    
    /**
     * 运行性能测试
     */
    static void runPerformanceTest(TestConfig config) throws InterruptedException {
        System.out.println("\n" + "=".repeat(80));
        System.out.println("性能测试配置: 任务数=" + config.taskCount + 
                          ", 任务耗时=" + config.serviceDelayMs + "ms" +
                          ", 线程数=" + config.threadPoolSize);
        System.out.println("-".repeat(80));
        
        List<TestResult> results = new ArrayList<>();
        
        // 运行各种测试
        results.add(testSerial(config));
        results.add(testExecutorService(config));
        results.add(testCompletableFutureDefault(config));
        results.add(testCompletableFutureCustom(config));
        results.add(testParallelStream(config));
        
        // 打印结果
        results.forEach(System.out::println);
        
        // 找出最佳方案
        TestResult best = results.stream()
            .min((r1, r2) -> Long.compare(r1.totalTime, r2.totalTime))
            .orElseThrow();
        
        System.out.println("-".repeat(80));
        System.out.println("最佳方案: " + best.methodName + " (耗时最少)");
        System.out.println("=".repeat(80));
    }
    
    /**
     * CompletableFuture最佳实践总结
     */
    static class CompletableFutureBestPractices {
        
        // 最佳实践1:合理配置线程池
        public static ExecutorService createOptimalThreadPool() {
            int coreCount = Runtime.getRuntime().availableProcessors();
            return new ThreadPoolExecutor(
                coreCount * 2,           // 核心线程数:CPU核心数 * 2
                coreCount * 4,           // 最大线程数:CPU核心数 * 4
                60L, TimeUnit.SECONDS,   // 空闲线程存活时间
                new LinkedBlockingQueue<>(1000), // 任务队列容量
                new ThreadFactoryBuilder()
                    .setNameFormat("business-pool-%d")
                    .setUncaughtExceptionHandler((t, e) -> 
                        System.err.println("线程" + t.getName() + "异常: " + e.getMessage()))
                    .build(),
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用者线程执行
            );
        }
        
        // 最佳实践2:链式调用优化
        public static CompletableFuture<String> optimizedChainCall() {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("第一步:获取原始数据");
                return "原始数据";
            }).thenApplyAsync(data -> {
                System.out.println("第二步:处理数据");
                return data + " -> 处理后";
            }).thenApplyAsync(data -> {
                System.out.println("第三步:转换格式");
                return data + " -> 转换后";
            }).exceptionally(ex -> {
                System.err.println("处理失败: " + ex.getMessage());
                return "默认值";
            });
        }
        
        // 最佳实践3:超时控制
        public static CompletableFuture<String> withTimeoutControl() {
            ExecutorService executor = createOptimalThreadPool();
            
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000); // 模拟耗时操作
                    return "正常结果";
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }, executor)
            .completeOnTimeout("超时默认值", 1000, TimeUnit.MILLISECONDS)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    System.err.println("任务异常完成: " + ex.getMessage());
                } else {
                    System.out.println("任务完成,结果: " + result);
                }
            });
        }
        
        // 最佳实践4:资源清理
        public static void withResourceCleanup() {
            ExecutorService executor = createOptimalThreadPool();
            
            CompletableFuture.runAsync(() -> {
                System.out.println("执行任务...");
                // 模拟资源使用
                try {
                    // 业务逻辑
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 确保资源清理
                    System.out.println("清理资源...");
                }
            }, executor);
            
            // 优雅关闭
            executor.shutdown();
            try {
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Java并发性能对比测试");
        System.out.println("测试环境: CPU核心数=" + Runtime.getRuntime().availableProcessors());
        
        // 测试场景1:IO密集型任务(短耗时,多任务)
        System.out.println("\n场景1:IO密集型(100个任务,每个耗时50ms)");
        TestConfig ioIntensiveConfig = new TestConfig(100, 50, 20);
        runPerformanceTest(ioIntensiveConfig);
        
        // 测试场景2:计算密集型任务(长耗时,少任务)
        System.out.println("\n场景2:计算密集型(20个任务,每个耗时500ms)");
        TestConfig cpuIntensiveConfig = new TestConfig(20, 500, 
            Runtime.getRuntime().availableProcessors());
        runPerformanceTest(cpuIntensiveConfig);
        
        // 测试场景3:混合型任务
        System.out.println("\n场景3:混合型(50个任务,每个耗时200ms)");
        TestConfig mixedConfig = new TestConfig(50, 200, 
            Runtime.getRuntime().availableProcessors() * 2);
        runPerformanceTest(mixedConfig);
        
        // 演示最佳实践
        System.out.println("\n" + "=".repeat(80));
        System.out.println("CompletableFuture最佳实践示例");
        System.out.println("-".repeat(80));
        
        // 示例1:链式调用优化
        System.out.println("1. 链式调用优化示例:");
        CompletableFutureBestPractices.optimizedChainCall()
            .thenAccept(result -> System.out.println("最终结果: " + result))
            .join();
        
        // 示例2:超时控制
        System.out.println("\n2. 超时控制示例:");
        CompletableFutureBestPractices.withTimeoutControl()
            .thenAccept(result -> System.out.println("超时控制结果: " + result))
            .join();
        
        // 示例3:资源清理
        System.out.println("\n3. 资源清理示例:");
        CompletableFutureBestPractices.withResourceCleanup();
        
        System.out.println("=".repeat(80));
    }
}

四、总结与最佳实践

关键技术点总结:

  1. ThreadLocal的核心要点:

    • 每个线程有独立的变量副本,线程安全
    • 必须在使用后调用remove()避免内存泄漏
    • 在线程池中使用时要格外小心
    • InheritableThreadLocal可以实现父子线程数据传递
  2. CompletableFuture的核心要点:

    • 提供了强大的异步编程能力
    • 支持链式调用、组合、异常处理等
    • 合理使用线程池,避免耗尽资源
    • 注意异常处理和超时控制
  3. 性能优化建议:

    • IO密集型任务:使用较大的线程池(核心数 * 2 \~ *4)
    • 计算密集型任务:使用较小的线程池(≈ 核心数)
    • 合理设置任务队列大小,避免内存溢出
    • 监控线程池状态,及时调整参数
  4. 生产环境注意事项:

    • 为线程池设置合理的命名,便于监控和排查
    • 实现优雅的关闭逻辑,避免任务丢失
    • 添加完善的监控和告警机制
    • 定期进行性能压测,优化线程池参数

通过深入理解ThreadLocal和CompletableFuture的原理和最佳实践,我们可以构建出高性能、可维护的并发Java应用。记住:并发编程没有银弹,只有合适的场景选择合适的工具和技术。

版权声明 ▶ 本网站名称:黄磊的博客
▶ 本文标题:Java并发编程实战:从ThreadLocal到CompletableFuture的深度解析
▶ 本文链接:https://www.huangleicole.com/backend-related/38.html
▶ 转载本站文章需要遵守:商业转载请联系站长,非商业转载请注明出处!!

如果觉得我的文章对你有用,请随意赞赏