CompletableFuture异步编程实战

引言

在高并发系统中,异步编程是提升性能的关键手段。Java 8 引入的 CompletableFuture 提供了强大的异步编程能力。本文将分享在实际项目中的使用经验。

为什么需要异步编程

同步 vs 异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 同步方式:串行执行,耗时 = 300ms + 400ms + 200ms = 900ms
public OrderDetail getOrderDetailSync(Long orderId) {
Order order = orderService.getOrder(orderId); // 300ms
User user = userService.getUser(order.getUserId()); // 400ms
List<Item> items = itemService.getItems(orderId); // 200ms

return assemble(order, user, items);
}

// 异步方式:并行执行,耗时 ≈ max(300, 400, 200) = 400ms
public OrderDetail getOrderDetailAsync(Long orderId) {
CompletableFuture<Order> orderFuture = CompletableFuture
.supplyAsync(() -> orderService.getOrder(orderId));

CompletableFuture<User> userFuture = orderFuture
.thenCompose(order -> CompletableFuture.supplyAsync(
() -> userService.getUser(order.getUserId())));

CompletableFuture<List<Item>> itemsFuture = CompletableFuture
.supplyAsync(() -> itemService.getItems(orderId));

// 等待所有结果
CompletableFuture.allOf(orderFuture, userFuture, itemsFuture).join();

return assemble(orderFuture.join(), userFuture.join(), itemsFuture.join());
}

CompletableFuture 核心方法

1. 创建异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// supplyAsync:有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步执行任务
return "Result";
});

// runAsync:无返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步执行任务
System.out.println("Task executed");
});

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(
() -> "Result",
executor
);

2. 链式调用

1
2
3
4
5
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchData()) // 获取数据
.thenApply(data -> process(data)) // 处理数据
.thenApply(processed -> transform(processed)) // 转换数据
.exceptionally(ex -> handleError(ex)); // 异常处理

3. 组合多个 Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// thenCombine:两个 Future 并行执行,结果合并
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = future1.thenCombine(future2, (f1, f2) -> f1 + " " + f2);
// 结果:Hello World

// thenCompose:链式调用,解决嵌套问题
CompletableFuture<User> userFuture = getUserId()
.thenCompose(userId -> getUser(userId));

// allOf:等待所有 Future 完成
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);

// anyOf:任意一个 Future 完成
CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2, future3);

实战案例

案例1:拼团优惠计算优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Service
public class GroupBuyCalculationService {

@Autowired
private ExecutorService executorService;

/**
* 优化前:串行查询,耗时约 600ms
*/
public CalculationResult calculateSync(Long productId, Integer quantity) {
Product product = productService.getProduct(productId); // 150ms
Promotion promotion = promotionService.getPromotion(productId); // 200ms
UserLevel level = userService.getUserLevel(); // 100ms
Coupon coupon = couponService.getAvailableCoupon(productId); // 150ms

return doCalculate(product, promotion, level, coupon, quantity);
}

/**
* 优化后:并行查询,耗时约 200ms
*/
public CalculationResult calculateAsync(Long productId, Integer quantity) {
// 并行查询所有依赖数据
CompletableFuture<Product> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProduct(productId), executorService);

CompletableFuture<Promotion> promotionFuture = CompletableFuture
.supplyAsync(() -> promotionService.getPromotion(productId), executorService);

CompletableFuture<UserLevel> levelFuture = CompletableFuture
.supplyAsync(() -> userService.getUserLevel(), executorService);

CompletableFuture<Coupon> couponFuture = CompletableFuture
.supplyAsync(() -> couponService.getAvailableCoupon(productId), executorService);

// 等待所有查询完成
CompletableFuture.allOf(productFuture, promotionFuture, levelFuture, couponFuture)
.join();

// 获取结果并计算
return doCalculate(
productFuture.join(),
promotionFuture.join(),
levelFuture.join(),
couponFuture.join(),
quantity
);
}
}

案例2:批量数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class BatchProcessService {

/**
* 批量处理订单,每批 100 个
*/
public void batchProcessOrders(List<Long> orderIds) {
// 分批处理
List<List<Long>> batches = Lists.partition(orderIds, 100);

List<CompletableFuture<Void>> futures = batches.stream()
.map(batch -> CompletableFuture.runAsync(() -> processBatch(batch)))
.collect(Collectors.toList());

// 等待所有批次处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
}

/**
* 带超时的批量处理
*/
public List<OrderResult> batchProcessWithTimeout(List<Long> orderIds) {
List<CompletableFuture<OrderResult>> futures = orderIds.stream()
.map(orderId -> CompletableFuture
.supplyAsync(() -> processOrder(orderId))
.orTimeout(5, TimeUnit.SECONDS) // 5秒超时
.exceptionally(ex -> {
log.error("处理订单失败: {}", orderId, ex);
return OrderResult.failed(orderId, ex.getMessage());
}))
.collect(Collectors.toList());

return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}

案例3:异步任务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Service
public class OrderProcessService {

/**
* 订单处理流程编排
*/
public CompletableFuture<OrderResult> processOrder(Long orderId) {
return CompletableFuture
// 1. 校验订单
.supplyAsync(() -> validateOrder(orderId))

// 2. 扣减库存(依赖校验结果)
.thenCompose(validated -> {
if (!validated.isValid()) {
return CompletableFuture.completedFuture(
OrderResult.failed(orderId, "订单校验失败")
);
}
return deductStock(orderId)
.thenApply(stockResult -> new OrderContext(orderId, stockResult));
})

// 3. 创建支付单(依赖库存结果)
.thenCompose(context -> {
if (!context.isStockSuccess()) {
return CompletableFuture.completedFuture(
OrderResult.failed(orderId, "库存不足")
);
}
return createPayment(context.getOrderId())
.thenApply(payment -> context.withPayment(payment));
})

// 4. 发送通知(异步,不阻塞主流程)
.thenApply(context -> {
sendNotificationAsync(context.getOrderId());
return OrderResult.success(context.getOrderId());
})

// 异常处理
.exceptionally(ex -> {
log.error("订单处理失败: {}", orderId, ex);
return OrderResult.failed(orderId, ex.getMessage());
});
}

private void sendNotificationAsync(Long orderId) {
CompletableFuture.runAsync(() -> {
// 发送短信/邮件通知
notificationService.send(orderId);
}).exceptionally(ex -> {
log.error("发送通知失败: {}", orderId, ex);
return null;
});
}
}

最佳实践

1. 线程池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class AsyncConfig {

@Bean("customExecutor")
public ExecutorService customExecutor() {
return new ThreadPoolExecutor(
4, // 核心线程数
8, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 任务队列
new ThreadFactoryBuilder()
.setNameFormat("async-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}

2. 异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 可能抛出异常
if (someCondition) {
throw new RuntimeException("业务异常");
}
return "Success";
})
.exceptionally(ex -> {
// 处理异常,返回默认值
log.error("任务执行失败", ex);
return "Default Value";
})
.handle((result, ex) -> {
// 统一处理正常结果和异常
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
});

3. 避免常见陷阱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ❌ 错误:在 thenApply 中执行阻塞操作
CompletableFuture.supplyAsync(() -> fetchData())
.thenApply(data -> {
// 阻塞操作会占用 ForkJoinPool 线程
Thread.sleep(1000);
return process(data);
});

// ✅ 正确:使用 thenCompose 链式调用
CompletableFuture.supplyAsync(() -> fetchData())
.thenCompose(data -> CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return process(data);
}));

// ❌ 错误:get() 会阻塞线程
try {
String result = future.get(); // 阻塞
} catch (Exception e) {
e.printStackTrace();
}

// ✅ 正确:使用 join() 或回调
String result = future.join(); // 非受检异常

// 或使用回调
future.thenAccept(result -> {
// 处理结果
});

性能对比

场景 同步耗时 异步耗时 提升
订单详情查询 900ms 400ms 55%
优惠计算 600ms 200ms 66%
批量处理(100条) 5000ms 800ms 84%

总结

CompletableFuture 是 Java 异步编程的利器:

  1. 链式调用:避免回调地狱
  2. 组合能力:支持多种组合方式
  3. 异常处理:完善的异常处理机制
  4. 性能提升:充分利用多核 CPU

合理使用 CompletableFuture,可以显著提升系统性能和吞吐量。


*本文基于电商平台性能优化实践,接口响应时间平均降低 50%+


CompletableFuture异步编程实战
https://zxyblog.top/2024/08/05/CompletableFuture异步编程实战/
作者
zxy
发布于
2024年8月5日
许可协议