0%

异步编程CompletableFuture

创建异步任务

创建任务可以通过runAsyncsupplyAsync两个方法来进行,区别是第一个不带返回参数,第二个可以带返回参数。

对于Async后缀的方法,可以通过自定义执行器,也可以通过默认的执行器ForkJoinPool来执行。

runAsync

1
2
3
4
5
6
7
8
9
/**
* 异步运行,不带返回值
*
*/
@Test
public void runAsyncTest() {
CompletableFuture.runAsync(() -> System.out.println(getCurrentThreadName() + "CompletableFuture runASync method"), threadPoolExecutor)
.join();
}

supplyAsync

1
2
3
4
5
6
7
8
9
10
/**
* 异步运行,带返回值
*
*/
@Test
public void supplyAsyncTest() {
CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor)
.thenAccept(System.out::println)
.join();
}

任务回调

无参无返回值

无参无返回值指的是不接收上一步的参数,不返回参数,只是作为上一个任务结束后的下一个任务。

Async后缀结尾的方法代表自定义执行器,也可使用默认执行器,不带后缀的代表与父任务共享执行器。

thenRun

在上一个任务执行完后调用,和父任务共享执行器。

1
2
3
4
5
6
7
8
9
/**
* 上一个任务完成后用上一个任务的线程执行下一个任务
*/
@Test
public void thenRunTest() {
CompletableFuture.runAsync(() -> System.out.println(getCurrentThreadName() + "CompletableFuture runASync method"), threadPoolExecutor)
.thenRun(() -> System.out.println(getCurrentThreadName() + "CompletableFuture thenRun method"))
.join();
}

thenRunAsync

在上一个任务执行完后调用,使用自定义的执行器。

1
2
3
4
5
6
7
8
9
/**
* 上一个任务完成后使用默认线程池异步执行下一个任务
*/
@Test
public void thenRunAsyncTest() {
CompletableFuture.runAsync(() -> System.out.println(getCurrentThreadName() + "CompletableFuture runASync method"), threadPoolExecutor)
.thenRunAsync(() -> System.out.println(getCurrentThreadName() + "CompletableFuture thenRun method"))
.join();
}

有参无返回值

thenAccept

通过接收父任务的返回来结果来执行任务。

1
2
3
4
5
6
@Test
public void thenAcceptTest() {
CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor)
.thenAccept((str) -> System.out.println(getCurrentThreadName() + "\n" + str))
.join();
}

有参有返回值

thenApply

通过接收父任务的返回结果来重新组装生成新的对象并返回。

1
2
3
4
5
6
7
@Test
public void thenApplyTest() {
CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor)
.thenApply((result) -> getCurrentThreadName() + "\n" + result)
.thenAccept(System.out::println)
.join();
}

异常处理

exceptionally

某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。

1
2
3
4
5
6
7
8
9
@Test
public void exceptionallyTest() {
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
}).exceptionally((e) -> {
e.printStackTrace();
return "程序异常";
}).thenAccept(System.out::println);
}

handle

某个任务执行完成后,接收父任务返回结果,执行回调方法,有返回值的;且可以通过异常参数判断父任务是否发生异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* handle方法可以对上个任务的异常进行处理
*/
@Test
public void handleAsyncTest() {
// CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor)
// .handleAsync((result,e)->{
// if (null==e){
// return "无异常";
// }else {
// e.printStackTrace();
// return "有异常";
// }
// },threadPoolExecutor)
// .thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> "23".substring(4), threadPoolExecutor)
.handleAsync((result, e) -> {
if (null == e) {
return "无异常";
} else {
e.printStackTrace();
return "有异常";
}
}, threadPoolExecutor)
.thenAccept(System.out::println)
.join();
}

whenComplete

某个任务执行完成后,接收父任务返回结果,执行回调方法,无返回值的;且可以通过异常参数判断父任务是否发生异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void whenCompleteTest() {
CompletableFuture<String> normal = CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "supplyAsync method", threadPoolExecutor)
.whenComplete((result, e) -> {
if (null != e) {
e.printStackTrace();
} else {
System.out.println(result);
}
});
CompletableFuture<String> exception = CompletableFuture.supplyAsync(() -> createException() + getCurrentThreadName() + "supplyAsync method with exception", threadPoolExecutor)
.whenComplete((result, e) -> {
if (null != e) {
e.printStackTrace();
} else {
System.out.println(result);
}
});
CompletableFuture.allOf(normal, exception).join();
}

任务组合

thenCompose

用来连接两个CompletableFuture,是生成一个新的CompletableFuture

1
2
3
4
5
6
7
8
9
/**
* 用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
*/
@Test
public void thenComposeTest() throws InterruptedException {
CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor)
.thenCompose((str) -> CompletableFuture.runAsync(() -> System.out.println(str)))
.join();
}

thenCombine

等两个任务执行完毕后回调任务。

1
2
3
4
5
6
7
8
@Test
public void thenCombineTest() {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method", threadPoolExecutor);
CompletableFuture<String> stringCompletableFuture2 = CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "CompletableFuture supplyASync method-2", threadPoolExecutor);
stringCompletableFuture.thenCombine(stringCompletableFuture2, (result1, result2) ->
result1 + "\n" + result2 + "\n" + getCurrentThreadName()
).thenAccept(System.out::println).join();
}