AI摘要
文章通过微服务上下文传递问题,系统解析ThreadLocal原理、内存泄漏风险与最佳实践,并深入演示CompletableFuture的链式、并行、异常及超时处理,结合性能测试给出IO/计算密集型线程池配置建议,总结生产环境监控与优雅关闭要点。
一、引言:一个微服务上下文传递的难题
在微服务架构中,我们经常需要在不同线程间传递用户上下文(如用户ID、请求ID等)。最初我们使用静态Map来存储,结果遇到了严重的数据错乱问题:
// ❌ 错误示例:静态Map存储上下文
public class UserContextHolder {
private static final Map<String, Object> context = new ConcurrentHashMap<>();
public static void set(String key, Object value) {
context.put(key, value);
}
public static Object get(String key) {
return context.get(key);
}
}在多线程环境下,多个请求会相互覆盖上下文数据。这让我深刻认识到:共享可变状态是并发编程万恶之源。
二、ThreadLocal:线程安全的局部变量
2.1 ThreadLocal的核心原理
ThreadLocal为每个线程提供独立的变量副本,避免了线程间的数据竞争。
ThreadLocal基本使用
import java.util.concurrent.TimeUnit;
public class ThreadLocalDemo {
// 创建ThreadLocal实例
private static final ThreadLocal<String> userContext = new ThreadLocal<>();
private static final ThreadLocal<Integer> requestIdContext = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) throws InterruptedException {
// 创建并启动多个线程
Thread thread1 = new Thread(new UserRequest("用户A", 1001), "Thread-1");
Thread thread2 = new Thread(new UserRequest("用户B", 1002), "Thread-2");
Thread thread3 = new Thread(new UserRequest("用户C", 1003), "Thread-3");
thread1.start();
thread2.start();
thread3.start();
// 等待所有线程执行完成
thread1.join();
thread2.join();
thread3.join();
System.out.println("所有线程执行完成");
}
// 用户请求处理类
static class UserRequest implements Runnable {
private final String userName;
private final int requestId;
public UserRequest(String userName, int requestId) {
this.userName = userName;
this.requestId = requestId;
}
@Override
public void run() {
try {
// 设置线程局部变量
userContext.set(userName);
requestIdContext.set(requestId);
// 模拟业务处理
processRequest();
} finally {
// 重要:必须清理ThreadLocal,防止内存泄漏
userContext.remove();
requestIdContext.remove();
}
}
private void processRequest() {
// 获取当前线程的上下文数据
String currentUser = userContext.get();
Integer currentRequestId = requestIdContext.get();
System.out.println(Thread.currentThread().getName() +
" -> 用户: " + currentUser +
", 请求ID: " + currentRequestId);
// 模拟嵌套方法调用,上下文依然可以获取
nestedMethod();
}
private void nestedMethod() {
// 嵌套方法中依然可以获取到ThreadLocal中的数据
System.out.println(Thread.currentThread().getName() +
" -> 嵌套方法获取用户: " + userContext.get());
}
}
}2.2 ThreadLocal的内存泄漏问题与解决方案
import java.lang.ref.WeakReference;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadLocalMemoryLeakDemo {
// 模拟大对象
static class BigObject {
private final byte[] data = new byte[10 * 1024 * 1024]; // 10MB
@Override
protected void finalize() throws Throwable {
System.out.println("BigObject被GC回收");
super.finalize();
}
}
public static void main(String[] args) throws InterruptedException {
// 1. 演示内存泄漏的场景
demonstrateMemoryLeak();
// 等待GC
System.gc();
TimeUnit.SECONDS.sleep(3);
System.out.println("\n--- 分隔线 ---\n");
// 2. 演示正确使用ThreadLocal
demonstrateCorrectUsage();
// 等待GC
System.gc();
TimeUnit.SECONDS.sleep(3);
}
private static void demonstrateMemoryLeak() {
System.out.println("=== 演示ThreadLocal内存泄漏 ===");
ThreadLocal<BigObject> threadLocal = new ThreadLocal<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
// 在线程池线程中设置ThreadLocal
executor.submit(() -> {
threadLocal.set(new BigObject());
System.out.println("线程池线程设置了BigObject");
// 注意:这里没有调用remove()!
});
// 等待任务完成
executor.shutdown();
try {
executor.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程池已关闭,但ThreadLocal中的BigObject可能还未被回收");
}
private static void demonstrateCorrectUsage() {
System.out.println("=== 演示ThreadLocal正确用法 ===");
ThreadLocal<BigObject> threadLocal = new ThreadLocal<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
// 在线程池线程中设置ThreadLocal,并正确清理
executor.submit(() -> {
try {
threadLocal.set(new BigObject());
System.out.println("线程池线程设置了BigObject");
// 模拟业务处理
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 关键:使用完后一定要remove!
threadLocal.remove();
System.out.println("线程池线程清理了ThreadLocal");
}
});
// 等待任务完成
executor.shutdown();
try {
executor.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程池已关闭,ThreadLocal已正确清理");
}
// 3. 使用InheritableThreadLocal实现父子线程数据传递
private static void demonstrateInheritableThreadLocal() {
System.out.println("=== 演示InheritableThreadLocal ===");
InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
inheritableThreadLocal.set("父线程数据");
Thread childThread = new Thread(() -> {
System.out.println("子线程获取数据: " + inheritableThreadLocal.get());
});
childThread.start();
try {
childThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 清理
inheritableThreadLocal.remove();
}
}2.3 ThreadLocal在实际项目中的应用:请求上下文管理
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 请求上下文管理器
* 用于在同一个请求的整个调用链中传递上下文信息
*/
public class RequestContextHolder {
private RequestContextHolder() {
// 工具类,禁止实例化
}
// 使用ThreadLocal存储请求上下文
private static final ThreadLocal<RequestContext> REQUEST_CONTEXT = new ThreadLocal<>();
/**
* 请求上下文类
*/
public static class RequestContext {
private final String requestId;
private final long startTime;
private final Map<String, Object> attributes;
public RequestContext() {
this.requestId = generateRequestId();
this.startTime = System.currentTimeMillis();
this.attributes = new HashMap<>();
}
public RequestContext(String requestId) {
this.requestId = requestId;
this.startTime = System.currentTimeMillis();
this.attributes = new HashMap<>();
}
private String generateRequestId() {
return UUID.randomUUID().toString().replace("-", "");
}
public String getRequestId() {
return requestId;
}
public long getStartTime() {
return startTime;
}
public long getElapsedTime() {
return System.currentTimeMillis() - startTime;
}
public void setAttribute(String key, Object value) {
attributes.put(key, value);
}
@SuppressWarnings("unchecked")
public <T> T getAttribute(String key) {
return (T) attributes.get(key);
}
public void removeAttribute(String key) {
attributes.remove(key);
}
public Map<String, Object> getAllAttributes() {
return new HashMap<>(attributes);
}
}
/**
* 初始化请求上下文
*/
public static void init() {
REQUEST_CONTEXT.set(new RequestContext());
}
/**
* 初始化请求上下文(指定requestId)
*/
public static void init(String requestId) {
REQUEST_CONTEXT.set(new RequestContext(requestId));
}
/**
* 获取当前请求上下文
*/
public static RequestContext getCurrentContext() {
RequestContext context = REQUEST_CONTEXT.get();
if (context == null) {
throw new IllegalStateException("请求上下文未初始化,请确保在请求入口处调用RequestContextHolder.init()");
}
return context;
}
/**
* 获取当前请求ID
*/
public static String getCurrentRequestId() {
return getCurrentContext().getRequestId();
}
/**
* 设置上下文属性
*/
public static void setAttribute(String key, Object value) {
getCurrentContext().setAttribute(key, value);
}
/**
* 获取上下文属性
*/
@SuppressWarnings("unchecked")
public static <T> T getAttribute(String key) {
return getCurrentContext().getAttribute(key);
}
/**
* 获取当前用户ID
*/
public static String getCurrentUserId() {
return getAttribute("userId");
}
/**
* 设置当前用户ID
*/
public static void setCurrentUserId(String userId) {
setAttribute("userId", userId);
}
/**
* 获取客户端IP
*/
public static String getClientIp() {
return getAttribute("clientIp");
}
/**
* 设置客户端IP
*/
public static void setClientIp(String clientIp) {
setAttribute("clientIp", clientIp);
}
/**
* 清理请求上下文(必须在请求结束时调用)
*/
public static void clear() {
REQUEST_CONTEXT.remove();
}
/**
* 模拟业务服务类
*/
public static class OrderService {
public void createOrder(String productId, int quantity) {
String requestId = RequestContextHolder.getCurrentRequestId();
String userId = RequestContextHolder.getCurrentUserId();
System.out.println(String.format("[%s] 用户 %s 创建订单 - 商品: %s, 数量: %d",
requestId, userId, productId, quantity));
// 模拟调用其他服务
InventoryService inventoryService = new InventoryService();
inventoryService.deductInventory(productId, quantity);
}
}
/**
* 模拟库存服务
*/
public static class InventoryService {
public void deductInventory(String productId, int quantity) {
String requestId = RequestContextHolder.getCurrentRequestId();
System.out.println(String.format("[%s] 扣减库存 - 商品: %s, 数量: %d",
requestId, productId, quantity));
// 这里可以继续传递上下文
}
}
/**
* 测试用例
*/
public static void main(String[] args) {
// 模拟HTTP请求处理
simulateHttpRequest();
// 模拟另一个并发请求
new Thread(RequestContextHolder::simulateHttpRequest).start();
}
private static void simulateHttpRequest() {
try {
// 1. 请求开始时初始化上下文
RequestContextHolder.init();
// 2. 设置请求相关属性
RequestContextHolder.setCurrentUserId("user_" + Thread.currentThread().getId());
RequestContextHolder.setClientIp("192.168.1." + Thread.currentThread().getId());
// 3. 执行业务逻辑
OrderService orderService = new OrderService();
orderService.createOrder("product_001", 2);
// 4. 打印当前请求耗时
long elapsed = RequestContextHolder.getCurrentContext().getElapsedTime();
System.out.println(String.format("[%s] 请求处理完成,耗时: %dms",
RequestContextHolder.getCurrentRequestId(), elapsed));
} finally {
// 5. 请求结束时清理上下文(重要!)
RequestContextHolder.clear();
}
}
}三、CompletableFuture:异步编程的利器
3.1 CompletableFuture基础用法
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompletableFutureBasicDemo {
// 自定义线程池(生产环境建议使用)
private static final ExecutorService customExecutor =
Executors.newFixedThreadPool(4, r -> {
Thread thread = new Thread(r);
thread.setName("custom-pool-" + thread.getId());
return thread;
});
public static void main(String[] args) {
System.out.println("主线程开始: " + Thread.currentThread().getName());
// 示例1:基本异步执行
basicAsyncExample();
// 示例2:链式调用
chainCallExample();
// 示例3:组合多个Future
combineFutureExample();
// 示例4:异常处理
exceptionHandlingExample();
// 示例5:超时控制
timeoutControlExample();
// 关闭线程池
customExecutor.shutdown();
}
private static void basicAsyncExample() {
System.out.println("\n=== 示例1:基本异步执行 ===");
// 方式1:使用默认的ForkJoinPool
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("future1执行线程: " + Thread.currentThread().getName());
return "Hello";
});
// 方式2:使用自定义线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
System.out.println("future2执行线程: " + Thread.currentThread().getName());
return "World";
}, customExecutor);
// 阻塞获取结果
try {
String result1 = future1.get();
String result2 = future2.get();
System.out.println("结果: " + result1 + " " + result2);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void chainCallExample() {
System.out.println("\n=== 示例2:链式调用 ===");
CompletableFuture.supplyAsync(() -> {
System.out.println("第一步:获取用户ID - " + Thread.currentThread().getName());
sleep(300);
return "user_123";
}, customExecutor).thenApply(userId -> {
System.out.println("第二步:根据用户ID查询用户信息 - " + Thread.currentThread().getName());
sleep(300);
return "用户: " + userId;
}).thenAccept(userInfo -> {
System.out.println("第三步:处理用户信息 - " + Thread.currentThread().getName());
System.out.println("处理结果: " + userInfo);
}).thenRun(() -> {
System.out.println("第四步:清理资源 - " + Thread.currentThread().getName());
});
sleep(2000); // 等待异步任务完成
}
private static void combineFutureExample() {
System.out.println("\n=== 示例3:组合多个Future ===");
// 模拟并行调用多个服务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(800);
System.out.println("调用用户服务完成");
return "用户信息";
}, customExecutor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(600);
System.out.println("调用商品服务完成");
return "商品信息";
}, customExecutor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(400);
System.out.println("调用库存服务完成");
return "库存信息";
}, customExecutor);
// 方式1:全部完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.thenRun(() -> {
System.out.println("所有服务调用完成");
try {
System.out.println("用户服务结果: " + future1.get());
System.out.println("商品服务结果: " + future2.get());
System.out.println("库存服务结果: " + future3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// 方式2:任意一个完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.thenAccept(result -> {
System.out.println("最快返回的服务结果: " + result);
});
sleep(2000);
}
private static void exceptionHandlingExample() {
System.out.println("\n=== 示例4:异常处理 ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行可能失败的任务");
if (Math.random() > 0.5) {
throw new RuntimeException("随机失败!");
}
return "成功结果";
}, customExecutor);
// 异常处理方式1:exceptionally(类似catch)
future.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认值";
}).thenAccept(result -> {
System.out.println("最终结果: " + result);
});
// 异常处理方式2:handle(无论成功失败都会执行)
future.handle((result, ex) -> {
if (ex != null) {
System.out.println("处理异常: " + ex.getMessage());
return "异常处理后的默认值";
}
return result + "(处理后的)";
}).thenAccept(result -> {
System.out.println("handle处理结果: " + result);
});
sleep(1000);
}
private static void timeoutControlExample() {
System.out.println("\n=== 示例5:超时控制 ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(3000); // 模拟耗时操作
return "正常结果";
}, customExecutor);
// 使用completeOnTimeout设置超时默认值
CompletableFuture<String> timeoutFuture = future
.completeOnTimeout("超时默认值", 1000, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "异常默认值");
try {
String result = timeoutFuture.get(2000, TimeUnit.MILLISECONDS);
System.out.println("超时控制结果: " + result);
} catch (TimeoutException e) {
System.out.println("获取结果超时");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sleep(long millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}3.2 CompletableFuture在实际项目中的应用:并行调用优化
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 并行服务调用优化示例
* 场景:订单详情页需要聚合多个服务的数据
*/
public class ParallelServiceCallDemo {
// 模拟商品信息服务
static class ProductService {
public Product getProductById(String productId) {
sleep(200); // 模拟网络延迟
System.out.println(Thread.currentThread().getName() + " - 获取商品信息: " + productId);
return new Product(productId, "商品" + productId, new BigDecimal("99.99"));
}
}
// 模拟库存服务
static class InventoryService {
public Integer getStock(String productId) {
sleep(150); // 模拟网络延迟
System.out.println(Thread.currentThread().getName() + " - 获取库存: " + productId);
return (int) (Math.random() * 100); // 模拟库存数量
}
}
// 模拟价格服务
static class PriceService {
public BigDecimal getPrice(String productId) {
sleep(100); // 模拟网络延迟
System.out.println(Thread.currentThread().getName() + " - 获取价格: " + productId);
return new BigDecimal(Math.random() * 100 + 50).setScale(2, BigDecimal.ROUND_HALF_UP);
}
}
// 模拟评价服务
static class ReviewService {
public Review getReview(String productId) {
sleep(250); // 模拟网络延迟
System.out.println(Thread.currentThread().getName() + " - 获取评价: " + productId);
return new Review(productId, (int) (Math.random() * 5) + 1, "评价内容");
}
}
// 商品类
static class Product {
String id;
String name;
BigDecimal price;
public Product(String id, String name, BigDecimal price) {
this.id = id;
this.name = name;
this.price = price;
}
}
// 评价类
static class Review {
String productId;
int rating;
String content;
public Review(String productId, int rating, String content) {
this.productId = productId;
this.rating = rating;
this.content = content;
}
}
// 商品详情类
static class ProductDetail {
String productId;
Product product;
Integer stock;
BigDecimal price;
Review review;
@Override
public String toString() {
return String.format("商品详情{id=%s, 名称=%s, 价格=%.2f, 库存=%d, 评分=%d星}",
productId, product.name, price, stock, review != null ? review.rating : 0);
}
}
// 传统的串行调用方式
static class TraditionalService {
private final ProductService productService = new ProductService();
private final InventoryService inventoryService = new InventoryService();
private final PriceService priceService = new PriceService();
private final ReviewService reviewService = new ReviewService();
public ProductDetail getProductDetail(String productId) {
System.out.println("\n=== 传统串行调用 ===");
long startTime = System.currentTimeMillis();
// 串行调用各个服务
Product product = productService.getProductById(productId);
Integer stock = inventoryService.getStock(productId);
BigDecimal price = priceService.getPrice(productId);
Review review = reviewService.getReview(productId);
long endTime = System.currentTimeMillis();
System.out.println("串行调用总耗时: " + (endTime - startTime) + "ms");
ProductDetail detail = new ProductDetail();
detail.productId = productId;
detail.product = product;
detail.stock = stock;
detail.price = price;
detail.review = review;
return detail;
}
}
// 使用CompletableFuture的并行调用方式
static class ParallelService {
private final ProductService productService = new ProductService();
private final InventoryService inventoryService = new InventoryService();
private final PriceService priceService = new PriceService();
private final ReviewService reviewService = new ReviewService();
private final ExecutorService executor = Executors.newFixedThreadPool(4);
public ProductDetail getProductDetail(String productId) {
System.out.println("\n=== 并行调用 ===");
long startTime = System.currentTimeMillis();
// 并行调用各个服务
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() -> productService.getProductById(productId), executor);
CompletableFuture<Integer> stockFuture =
CompletableFuture.supplyAsync(() -> inventoryService.getStock(productId), executor);
CompletableFuture<BigDecimal> priceFuture =
CompletableFuture.supplyAsync(() -> priceService.getPrice(productId), executor);
CompletableFuture<Review> reviewFuture =
CompletableFuture.supplyAsync(() -> reviewService.getReview(productId), executor);
// 等待所有任务完成,并组装结果
CompletableFuture<ProductDetail> detailFuture =
CompletableFuture.allOf(productFuture, stockFuture, priceFuture, reviewFuture)
.thenApply(v -> {
ProductDetail detail = new ProductDetail();
detail.productId = productId;
try {
detail.product = productFuture.get();
detail.stock = stockFuture.get();
detail.price = priceFuture.get();
detail.review = reviewFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return detail;
});
ProductDetail detail = detailFuture.join();
long endTime = System.currentTimeMillis();
System.out.println("并行调用总耗时: " + (endTime - startTime) + "ms");
return detail;
}
public void shutdown() {
executor.shutdown();
}
}
// 批量查询优化:同时查询多个商品
static class BatchQueryService {
private final ProductService productService = new ProductService();
private final InventoryService inventoryService = new InventoryService();
private final ExecutorService executor = Executors.newFixedThreadPool(8);
public Map<String, ProductDetail> batchQueryProductDetails(List<String> productIds) {
System.out.println("\n=== 批量查询优化 ===");
long startTime = System.currentTimeMillis();
// 为每个商品创建并行查询任务
List<CompletableFuture<ProductDetail>> futures = productIds.stream()
.map(productId -> querySingleProduct(productId))
.collect(Collectors.toList());
// 等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 收集结果
Map<String, ProductDetail> result = new ConcurrentHashMap<>();
allFutures.thenRun(() -> {
futures.forEach(future -> {
try {
ProductDetail detail = future.get();
result.put(detail.productId, detail);
} catch (Exception e) {
// 记录异常,但不影响其他结果
System.err.println("查询商品失败: " + e.getMessage());
}
});
}).join(); // 阻塞直到所有任务完成
long endTime = System.currentTimeMillis();
System.out.println("批量查询" + productIds.size() + "个商品总耗时: " + (endTime - startTime) + "ms");
return result;
}
private CompletableFuture<ProductDetail> querySingleProduct(String productId) {
return CompletableFuture.supplyAsync(() -> {
Product product = productService.getProductById(productId);
return product;
}, executor).thenCombineAsync(
CompletableFuture.supplyAsync(() -> inventoryService.getStock(productId), executor),
(product, stock) -> {
ProductDetail detail = new ProductDetail();
detail.productId = productId;
detail.product = product;
detail.stock = stock;
return detail;
}
, executor).exceptionally(ex -> {
// 异常处理:返回一个空的ProductDetail
System.err.println("查询商品" + productId + "失败: " + ex.getMessage());
ProductDetail detail = new ProductDetail();
detail.productId = productId;
return detail;
});
}
public void shutdown() {
executor.shutdown();
}
}
// 超时和降级策略
static class ResilientService {
private final PriceService priceService = new PriceService();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public BigDecimal getPriceWithFallback(String productId) {
System.out.println("\n=== 带超时和降级的服务调用 ===");
// 主服务调用,设置超时时间
CompletableFuture<BigDecimal> mainService = CompletableFuture.supplyAsync(() -> {
return priceService.getPrice(productId);
}, executor).orTimeout(300, TimeUnit.MILLISECONDS); // 300ms超时
// 降级服务(缓存或备用服务)
CompletableFuture<BigDecimal> fallbackService = CompletableFuture.supplyAsync(() -> {
sleep(100); // 降级服务更快
return new BigDecimal("88.88"); // 默认价格
}, executor);
// 使用主服务,超时时使用降级服务
return mainService.exceptionally(ex -> {
System.out.println("主服务调用失败,使用降级服务: " + ex.getMessage());
return fallbackService.join();
}).join();
}
public void shutdown() {
executor.shutdown();
}
}
public static void main(String[] args) {
// 1. 对比串行和并行调用
TraditionalService traditionalService = new TraditionalService();
ParallelService parallelService = new ParallelService();
ProductDetail traditionalResult = traditionalService.getProductDetail("P001");
System.out.println("串行结果: " + traditionalResult);
ProductDetail parallelResult = parallelService.getProductDetail("P001");
System.out.println("并行结果: " + parallelResult);
// 2. 批量查询演示
List<String> productIds = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
productIds.add("P00" + i);
}
BatchQueryService batchService = new BatchQueryService();
Map<String, ProductDetail> batchResults = batchService.batchQueryProductDetails(productIds);
System.out.println("\n批量查询结果:");
batchResults.forEach((id, detail) -> {
System.out.println(" " + id + ": " + detail);
});
// 3. 带超时和降级的服务调用
ResilientService resilientService = new ResilientService();
BigDecimal price = resilientService.getPriceWithFallback("P001");
System.out.println("\n最终价格: " + price);
// 清理资源
parallelService.shutdown();
batchService.shutdown();
resilientService.shutdown();
}
private static void sleep(long millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}3.3 性能对比测试与最佳实践
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 并发性能对比测试
*/
public class ConcurrencyPerformanceTest {
// 模拟一个耗时服务
static class SlowService {
private final int delayMs;
public SlowService(int delayMs) {
this.delayMs = delayMs;
}
public String process(int id) {
try {
// 模拟业务处理时间
Thread.sleep(delayMs);
return "Result-" + id;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error-" + id;
}
}
}
// 测试配置
static class TestConfig {
int taskCount; // 任务数量
int serviceDelayMs; // 每个任务耗时(ms)
int threadPoolSize; // 线程池大小
public TestConfig(int taskCount, int serviceDelayMs, int threadPoolSize) {
this.taskCount = taskCount;
this.serviceDelayMs = serviceDelayMs;
this.threadPoolSize = threadPoolSize;
}
}
// 测试结果
static class TestResult {
String methodName;
long totalTime;
int successCount;
int failedCount;
public TestResult(String methodName, long totalTime, int successCount, int failedCount) {
this.methodName = methodName;
this.totalTime = totalTime;
this.successCount = successCount;
this.failedCount = failedCount;
}
@Override
public String toString() {
return String.format("%-20s 总耗时: %6dms 成功: %4d 失败: %4d QPS: %.2f",
methodName, totalTime, successCount, failedCount,
successCount * 1000.0 / totalTime);
}
}
/**
* 方法1:传统串行处理
*/
static TestResult testSerial(TestConfig config) {
SlowService service = new SlowService(config.serviceDelayMs);
List<String> results = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (int i = 0; i < config.taskCount; i++) {
try {
results.add(service.process(i));
} catch (Exception e) {
// 处理异常
}
}
long endTime = System.currentTimeMillis();
return new TestResult("串行处理", endTime - startTime, results.size(), 0);
}
/**
* 方法2:使用ExecutorService
*/
static TestResult testExecutorService(TestConfig config) throws InterruptedException {
SlowService service = new SlowService(config.serviceDelayMs);
ExecutorService executor = Executors.newFixedThreadPool(config.threadPoolSize);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < config.taskCount; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> service.process(taskId));
futures.add(future);
}
// 等待所有任务完成
for (Future<String> future : futures) {
try {
future.get();
successCount.incrementAndGet();
} catch (Exception e) {
failedCount.incrementAndGet();
}
}
long endTime = System.currentTimeMillis();
executor.shutdown();
return new TestResult("ExecutorService", endTime - startTime,
successCount.get(), failedCount.get());
}
/**
* 方法3:使用CompletableFuture(默认线程池)
*/
static TestResult testCompletableFutureDefault(TestConfig config) {
SlowService service = new SlowService(config.serviceDelayMs);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = IntStream.range(0, config.taskCount)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> service.process(i))
.thenAccept(result -> successCount.incrementAndGet())
.exceptionally(ex -> {
failedCount.incrementAndGet();
return null;
})
).collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
long endTime = System.currentTimeMillis();
return new TestResult("CompletableFuture(默认)", endTime - startTime,
successCount.get(), failedCount.get());
}
/**
* 方法4:使用CompletableFuture(自定义线程池)
*/
static TestResult testCompletableFutureCustom(TestConfig config) {
SlowService service = new SlowService(config.serviceDelayMs);
ExecutorService executor = Executors.newFixedThreadPool(config.threadPoolSize);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = IntStream.range(0, config.taskCount)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> service.process(i), executor)
.thenAccept(result -> successCount.incrementAndGet())
.exceptionally(ex -> {
failedCount.incrementAndGet();
return null;
})
).collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
long endTime = System.currentTimeMillis();
executor.shutdown();
return new TestResult("CompletableFuture(自定义)", endTime - startTime,
successCount.get(), failedCount.get());
}
/**
* 方法5:使用并行流(Parallel Stream)
*/
static TestResult testParallelStream(TestConfig config) {
SlowService service = new SlowService(config.serviceDelayMs);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failedCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
IntStream.range(0, config.taskCount)
.parallel()
.forEach(i -> {
try {
service.process(i);
successCount.incrementAndGet();
} catch (Exception e) {
failedCount.incrementAndGet();
}
});
long endTime = System.currentTimeMillis();
return new TestResult("并行流(ParallelStream)", endTime - startTime,
successCount.get(), failedCount.get());
}
/**
* 运行性能测试
*/
static void runPerformanceTest(TestConfig config) throws InterruptedException {
System.out.println("\n" + "=".repeat(80));
System.out.println("性能测试配置: 任务数=" + config.taskCount +
", 任务耗时=" + config.serviceDelayMs + "ms" +
", 线程数=" + config.threadPoolSize);
System.out.println("-".repeat(80));
List<TestResult> results = new ArrayList<>();
// 运行各种测试
results.add(testSerial(config));
results.add(testExecutorService(config));
results.add(testCompletableFutureDefault(config));
results.add(testCompletableFutureCustom(config));
results.add(testParallelStream(config));
// 打印结果
results.forEach(System.out::println);
// 找出最佳方案
TestResult best = results.stream()
.min((r1, r2) -> Long.compare(r1.totalTime, r2.totalTime))
.orElseThrow();
System.out.println("-".repeat(80));
System.out.println("最佳方案: " + best.methodName + " (耗时最少)");
System.out.println("=".repeat(80));
}
/**
* CompletableFuture最佳实践总结
*/
static class CompletableFutureBestPractices {
// 最佳实践1:合理配置线程池
public static ExecutorService createOptimalThreadPool() {
int coreCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
coreCount * 2, // 核心线程数:CPU核心数 * 2
coreCount * 4, // 最大线程数:CPU核心数 * 4
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(1000), // 任务队列容量
new ThreadFactoryBuilder()
.setNameFormat("business-pool-%d")
.setUncaughtExceptionHandler((t, e) ->
System.err.println("线程" + t.getName() + "异常: " + e.getMessage()))
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用者线程执行
);
}
// 最佳实践2:链式调用优化
public static CompletableFuture<String> optimizedChainCall() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("第一步:获取原始数据");
return "原始数据";
}).thenApplyAsync(data -> {
System.out.println("第二步:处理数据");
return data + " -> 处理后";
}).thenApplyAsync(data -> {
System.out.println("第三步:转换格式");
return data + " -> 转换后";
}).exceptionally(ex -> {
System.err.println("处理失败: " + ex.getMessage());
return "默认值";
});
}
// 最佳实践3:超时控制
public static CompletableFuture<String> withTimeoutControl() {
ExecutorService executor = createOptimalThreadPool();
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000); // 模拟耗时操作
return "正常结果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, executor)
.completeOnTimeout("超时默认值", 1000, TimeUnit.MILLISECONDS)
.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("任务异常完成: " + ex.getMessage());
} else {
System.out.println("任务完成,结果: " + result);
}
});
}
// 最佳实践4:资源清理
public static void withResourceCleanup() {
ExecutorService executor = createOptimalThreadPool();
CompletableFuture.runAsync(() -> {
System.out.println("执行任务...");
// 模拟资源使用
try {
// 业务逻辑
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 确保资源清理
System.out.println("清理资源...");
}
}, executor);
// 优雅关闭
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Java并发性能对比测试");
System.out.println("测试环境: CPU核心数=" + Runtime.getRuntime().availableProcessors());
// 测试场景1:IO密集型任务(短耗时,多任务)
System.out.println("\n场景1:IO密集型(100个任务,每个耗时50ms)");
TestConfig ioIntensiveConfig = new TestConfig(100, 50, 20);
runPerformanceTest(ioIntensiveConfig);
// 测试场景2:计算密集型任务(长耗时,少任务)
System.out.println("\n场景2:计算密集型(20个任务,每个耗时500ms)");
TestConfig cpuIntensiveConfig = new TestConfig(20, 500,
Runtime.getRuntime().availableProcessors());
runPerformanceTest(cpuIntensiveConfig);
// 测试场景3:混合型任务
System.out.println("\n场景3:混合型(50个任务,每个耗时200ms)");
TestConfig mixedConfig = new TestConfig(50, 200,
Runtime.getRuntime().availableProcessors() * 2);
runPerformanceTest(mixedConfig);
// 演示最佳实践
System.out.println("\n" + "=".repeat(80));
System.out.println("CompletableFuture最佳实践示例");
System.out.println("-".repeat(80));
// 示例1:链式调用优化
System.out.println("1. 链式调用优化示例:");
CompletableFutureBestPractices.optimizedChainCall()
.thenAccept(result -> System.out.println("最终结果: " + result))
.join();
// 示例2:超时控制
System.out.println("\n2. 超时控制示例:");
CompletableFutureBestPractices.withTimeoutControl()
.thenAccept(result -> System.out.println("超时控制结果: " + result))
.join();
// 示例3:资源清理
System.out.println("\n3. 资源清理示例:");
CompletableFutureBestPractices.withResourceCleanup();
System.out.println("=".repeat(80));
}
}四、总结与最佳实践
关键技术点总结:
ThreadLocal的核心要点:
- 每个线程有独立的变量副本,线程安全
- 必须在使用后调用
remove()避免内存泄漏 - 在线程池中使用时要格外小心
InheritableThreadLocal可以实现父子线程数据传递
CompletableFuture的核心要点:
- 提供了强大的异步编程能力
- 支持链式调用、组合、异常处理等
- 合理使用线程池,避免耗尽资源
- 注意异常处理和超时控制
性能优化建议:
- IO密集型任务:使用较大的线程池(核心数 * 2 \~ *4)
- 计算密集型任务:使用较小的线程池(≈ 核心数)
- 合理设置任务队列大小,避免内存溢出
- 监控线程池状态,及时调整参数
生产环境注意事项:
- 为线程池设置合理的命名,便于监控和排查
- 实现优雅的关闭逻辑,避免任务丢失
- 添加完善的监控和告警机制
- 定期进行性能压测,优化线程池参数
通过深入理解ThreadLocal和CompletableFuture的原理和最佳实践,我们可以构建出高性能、可维护的并发Java应用。记住:并发编程没有银弹,只有合适的场景选择合适的工具和技术。