引言
在高并发系统中,异步编程是提升性能的关键手段。Java 8 引入的 CompletableFuture 提供了强大的异步编程能力。本文将分享在实际项目中的使用经验。
为什么需要异步编程
同步 vs 异步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public OrderDetail getOrderDetailSync(Long orderId) { Order order = orderService.getOrder(orderId); User user = userService.getUser(order.getUserId()); List<Item> items = itemService.getItems(orderId); return assemble(order, user, items); }
public OrderDetail getOrderDetailAsync(Long orderId) { CompletableFuture<Order> orderFuture = CompletableFuture .supplyAsync(() -> orderService.getOrder(orderId)); CompletableFuture<User> userFuture = orderFuture .thenCompose(order -> CompletableFuture.supplyAsync( () -> userService.getUser(order.getUserId()))); CompletableFuture<List<Item>> itemsFuture = CompletableFuture .supplyAsync(() -> itemService.getItems(orderId)); CompletableFuture.allOf(orderFuture, userFuture, itemsFuture).join(); return assemble(orderFuture.join(), userFuture.join(), itemsFuture.join()); }
|
CompletableFuture 核心方法
1. 创建异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Result"; });
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Task executed"); });
ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture.supplyAsync( () -> "Result", executor );
|
2. 链式调用
1 2 3 4 5
| CompletableFuture<String> result = CompletableFuture .supplyAsync(() -> fetchData()) .thenApply(data -> process(data)) .thenApply(processed -> transform(processed)) .exceptionally(ex -> handleError(ex));
|
3. 组合多个 Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (f1, f2) -> f1 + " " + f2);
CompletableFuture<User> userFuture = getUserId() .thenCompose(userId -> getUser(userId));
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2, future3);
|
实战案例
案例1:拼团优惠计算优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Service public class GroupBuyCalculationService { @Autowired private ExecutorService executorService;
public CalculationResult calculateSync(Long productId, Integer quantity) { Product product = productService.getProduct(productId); Promotion promotion = promotionService.getPromotion(productId); UserLevel level = userService.getUserLevel(); Coupon coupon = couponService.getAvailableCoupon(productId); return doCalculate(product, promotion, level, coupon, quantity); }
public CalculationResult calculateAsync(Long productId, Integer quantity) { CompletableFuture<Product> productFuture = CompletableFuture .supplyAsync(() -> productService.getProduct(productId), executorService); CompletableFuture<Promotion> promotionFuture = CompletableFuture .supplyAsync(() -> promotionService.getPromotion(productId), executorService); CompletableFuture<UserLevel> levelFuture = CompletableFuture .supplyAsync(() -> userService.getUserLevel(), executorService); CompletableFuture<Coupon> couponFuture = CompletableFuture .supplyAsync(() -> couponService.getAvailableCoupon(productId), executorService); CompletableFuture.allOf(productFuture, promotionFuture, levelFuture, couponFuture) .join(); return doCalculate( productFuture.join(), promotionFuture.join(), levelFuture.join(), couponFuture.join(), quantity ); } }
|
案例2:批量数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Service public class BatchProcessService {
public void batchProcessOrders(List<Long> orderIds) { List<List<Long>> batches = Lists.partition(orderIds, 100); List<CompletableFuture<Void>> futures = batches.stream() .map(batch -> CompletableFuture.runAsync(() -> processBatch(batch))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .join(); }
public List<OrderResult> batchProcessWithTimeout(List<Long> orderIds) { List<CompletableFuture<OrderResult>> futures = orderIds.stream() .map(orderId -> CompletableFuture .supplyAsync(() -> processOrder(orderId)) .orTimeout(5, TimeUnit.SECONDS) .exceptionally(ex -> { log.error("处理订单失败: {}", orderId, ex); return OrderResult.failed(orderId, ex.getMessage()); })) .collect(Collectors.toList()); return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }
|
案例3:异步任务编排
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Service public class OrderProcessService {
public CompletableFuture<OrderResult> processOrder(Long orderId) { return CompletableFuture .supplyAsync(() -> validateOrder(orderId)) .thenCompose(validated -> { if (!validated.isValid()) { return CompletableFuture.completedFuture( OrderResult.failed(orderId, "订单校验失败") ); } return deductStock(orderId) .thenApply(stockResult -> new OrderContext(orderId, stockResult)); }) .thenCompose(context -> { if (!context.isStockSuccess()) { return CompletableFuture.completedFuture( OrderResult.failed(orderId, "库存不足") ); } return createPayment(context.getOrderId()) .thenApply(payment -> context.withPayment(payment)); }) .thenApply(context -> { sendNotificationAsync(context.getOrderId()); return OrderResult.success(context.getOrderId()); }) .exceptionally(ex -> { log.error("订单处理失败: {}", orderId, ex); return OrderResult.failed(orderId, ex.getMessage()); }); } private void sendNotificationAsync(Long orderId) { CompletableFuture.runAsync(() -> { notificationService.send(orderId); }).exceptionally(ex -> { log.error("发送通知失败: {}", orderId, ex); return null; }); } }
|
最佳实践
1. 线程池配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class AsyncConfig { @Bean("customExecutor") public ExecutorService customExecutor() { return new ThreadPoolExecutor( 4, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("async-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }
|
2. 异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (someCondition) { throw new RuntimeException("业务异常"); } return "Success"; }) .exceptionally(ex -> { log.error("任务执行失败", ex); return "Default Value"; }) .handle((result, ex) -> { if (ex != null) { return "Error: " + ex.getMessage(); } return result; });
|
3. 避免常见陷阱
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| CompletableFuture.supplyAsync(() -> fetchData()) .thenApply(data -> { Thread.sleep(1000); return process(data); });
CompletableFuture.supplyAsync(() -> fetchData()) .thenCompose(data -> CompletableFuture.supplyAsync(() -> { Thread.sleep(1000); return process(data); }));
try { String result = future.get(); } catch (Exception e) { e.printStackTrace(); }
String result = future.join();
future.thenAccept(result -> { });
|
性能对比
| 场景 |
同步耗时 |
异步耗时 |
提升 |
| 订单详情查询 |
900ms |
400ms |
55% |
| 优惠计算 |
600ms |
200ms |
66% |
| 批量处理(100条) |
5000ms |
800ms |
84% |
总结
CompletableFuture 是 Java 异步编程的利器:
- 链式调用:避免回调地狱
- 组合能力:支持多种组合方式
- 异常处理:完善的异常处理机制
- 性能提升:充分利用多核 CPU
合理使用 CompletableFuture,可以显著提升系统性能和吞吐量。
*本文基于电商平台性能优化实践,接口响应时间平均降低 50%+