CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和非阻塞 IO 操作。它提供了一种简洁而强大的方式来处理异步任务的结果。
1.
CompletableFuture 提供了四个静态方法来创建一个异步操作。
1.CompletableFuture.runAsync(Runnable runnable):
- 创建一个异步操作,该操作不返回结果。
- 接受一个 Runnable 参数,表示要执行的任务。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行异步任务
// ...
});
2.CompletableFuture.runAsync(Runnable runnable, Executor executor):
- 创建一个异步操作,并指定执行该操作的线程池。
- 接受一个 Runnable 参数,表示要执行的任务。
- 接受一个 Executor 参数,表示执行任务的线程池。
Executor executor = Executors.newFixedThreadPool(5);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行异步任务
// ...
}, executor);比如:
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("当前线程线程:main开始");
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(()->{
System.out.println("当前线程:"+Thread.currentThread().getId() + "开始");
},executorService);
System.out.println("当前线程线程:main结束");
}
}运行

3.CompletableFuture.supplyAsync(Supplier<U> supplier):
- 创建一个异步操作,该操作返回一个结果。
- 接受一个 Supplier<U> 参数,表示要执行的任务,并返回结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行异步任务
// 返回结果
return "Hello";
});
4.CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor):
- 创建一个异步操作,并指定执行该操作的线程池。
- 接受一个 Supplier<U> 参数,表示要执行的任务,并返回结果。
- 接受一个 Executor 参数,表示执行任务的线程池。
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("当前线程: main开始");
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(()->{
Thread.currentThread().setName("线程1");
System.out.println("当前线程: "+Thread.currentThread().getName());
int result = 1024;
System.out.println(Thread.currentThread().getName() + "值: " + result);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
},executorService);
//获取返回结果
Integer value = completableFuture.get();
System.out.println("当前线程: main结束,获取到值: "+value);
}
}运行,可以看到main线程会等待线程1完成返回结果

注意区别:
runAsync方法不支持返回值。
supplyAsync可以支持返回值。
2.线程串行化方法
CompletableFuture<T>完成线程串行化方法
thenApply方法:
当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:
消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:
只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

举例:
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 线程1执行返回的结果:100
CompletableFuture<Integer> futureA =
CompletableFuture.supplyAsync(() -> {
int result = 100;
System.out.println("线程一:"+result);
return result;
});
// 线程2 获取到线程1执行的结果
CompletableFuture<Integer> futureB = futureA.thenApplyAsync((res)->{
System.out.println("线程二获取到线程一的值:"+res);
return res;
},executorService);
//线程3: 无法获取futureA或futureB或返回结果
CompletableFuture<Void> futureC = futureB.thenRunAsync(() -> {
System.out.println("线程三....");
}, executorService);
}
}运行

当CompletableFuture执行结束后,或者抛出异常的时候,可以执行特定的Action。
主要是下面的方法:

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("当前线程: main开始");
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(()->{
Thread.currentThread().setName("线程1");
System.out.println("当前线程:"+Thread.currentThread().getName());
int result = 10;
System.out.println("线程1值: "+result);
return result;
},executorService)
.whenCompleteAsync((rs,exception)->{
System.out.println("whenComplete中的线程:" + Thread.currentThread().getName());
System.out.println("whenComplete中的线程获取到值:"+rs);
System.out.println("错误输出:" + exception);
});
System.out.println("当前线程: main结束");
}
}运行

异常情况
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("当前线程: main开始");
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(()->{
Thread.currentThread().setName("线程1");
System.out.println("当前线程:"+Thread.currentThread().getName());
int result = 10/0;
System.out.println("线程1值: "+result);
return result;
},executorService)
.whenCompleteAsync((rs,exception)->{
System.out.println("whenComplete中的线程:" + Thread.currentThread().getName());
System.out.println("whenComplete中的线程获取到值:"+rs);
System.out.println("错误输出:" + exception);
});
System.out.println("当前线程: main结束");
}
}运行

4.等待多任务完成


举例:
all举例
package com.dreams;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 线程1
CompletableFuture<Integer> futureA =
CompletableFuture.supplyAsync(() -> {
Thread.currentThread().setName("线程1");
System.out.println(Thread.currentThread().getName()+"开始");
int res = 100;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "中2000millis后");
System.out.println(Thread.currentThread().getName()+"结束");
return res;
},executorService);
// 线程2
CompletableFuture<Integer> futureB =
CompletableFuture.supplyAsync(() -> {
Thread.currentThread().setName("线程2");
System.out.println(Thread.currentThread().getName()+"开始");
int res = 100;
System.out.println(Thread.currentThread().getName()+"结束");
return res;
},executorService);
CompletableFuture<Void> all = CompletableFuture.allOf(futureA,futureB);
all.get();
System.out.println("等待线程1,线程2完成后");
}
}运行

any举例:
将代码更改如下:
CompletableFuture<Object> any = CompletableFuture.anyOf(futureA, futureB);
Integer result = (Integer) any.get();
System.out.println("输出值: " + result);
System.out.println("等待一个线程完成后");


