java线程池

java创建线程的四种方式

继承Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyThread extends Thread{
@Override
public void run() {
//重写run方法
for (int i = 0; i < 5; i++) {
System.out.println("i: "+i);
}
}
public static void main(String[] args) {
new MyThread().run();
new MyThread().start();
}
/**
* 这说明一下run()方法和start()方法的区别
*
* run()
* Runnable接口中的抽象方法,而Thread实现了Runnable接口,需要重写run()方法
* 而在Thread类重写的run()方法的源码中,只是调用了Runnable接口的run()方法
* 如果直接调用run方法,并不会启动新线程,程序中只有当前线程线程,
* 程序还是顺序执行,等待run方法体执行完毕后才可继续执行下面的代码,没有达到多线程的目的
* start()方法
* 启动新线程,处于就绪(可运行)状态,并没有运行,一旦得到cpu时间片,
* 就开始执行相应线程的run()方法,这里run()称为线程体,它包含了要执行的这个线程的内容,
* run()方法运行结束,此线程随即终止。
* start()无需等待run()执行完毕,即可继续执行下面的代码,进行了线程切换
*
*/
}

重写Runnable接口实现多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public class MyThread implements Runnable {
@Override
public void run() {
//重写run方法
for (int i = 0; i < 5; i++) {
System.out.println("i: "+i);
}
}
public static void main(String[] args) {
//创建Runnable的实例
MyThread myThread = new MyThread();
//该实例作为Thread的target,创建后这个thread对象才是真正的线程对象
Thread thread = new Thread(myThread);
thread.start();
}
}

重写Callable接口(不常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadTest implements Callable<Integer> {

@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("结果是:" + i);
return i;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main--start");

Callable<Integer> callable = new ThreadTest();
FutureTask<Integer> futureTask = new FutureTask<Integer>(callable);

Thread thread = new Thread(futureTask);
thread.start();
// 等待线程执行完,获取返回结果:注意:获取返回结果是个阻塞的操作!!
Integer integer = futureTask.get();
System.out.println("线程返回的结果:" + integer);

System.out.println("main--end.");

}

}

通过线程池

使用线程池创建线程的优势:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,会消耗系统资源,还会降低系统的稳定性

但是要做到合理的利用线程池,必须对其原理了如指掌。java可以通过Executors来创建线程池,它提供了5种线程池:

  • newFixedThreadPool:固定线程池大小,可控制线程最大并发数,超出的线程会在队列中等待

  • newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

  • newCachedThreadPool:可缓存线程池,如果线程池长度超过处理需要,可回收空闲线程,若无需回收,则新建线程

  • newScheduledThreadPool:创建一个定长线程池,支持定时(scheduleWithFixedDelay()函数的initdelay参数)及周期(delay 参数)任务执行

  • newWorkStealingPool:创建一个单线程化的支持定时的线程池,可以用一个线程周期性执行任务(比如周期7天,一次任务才用1小时,使用多线程就会浪费资源)

但是线程池最好不要使用Executors创建,最好通过ThreadPoolExecutor的方式手动创建,这样的处理方式让我们更加明确线程池的运行规则,规避资源耗尽的风险。

使用ThreadPoolExecutor创建线程池

(1)方法签名:

1
2
3
4
5
6
7
8

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

(2)参数简介

  • corePoolSize 线程池核心池的大小
  • maximumPoolSize 线程池的最大线程数
  • keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
  • unit keepAliveTime 的时间单位
  • workQueue 用来储存等待执行任务的队列
  • threadFactory 线程工厂
  • handler 拒绝策略
  • corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。

  • maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。(如果使用了无界队列,这个参数无效)

    在创建了线程池后,线程池中并没有任何线程(默认),等到有任务来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者

    prestartCoreThread()方法
    当创建的线程数等于 corePoolSize 时,会加入阻塞队列。当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize

  • keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率

  • TimeUnit:可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)

  • runnableTaskQueue:用于保存等待执行的任务的阻塞队列。有5种可选择

    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    SynchronousQueue:默认。一个不存储元素的阻塞队列。每个线程的插入必须等另一个线程的移除,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    DelayQueue: 一个使用优先级队列实现的无界阻塞队列。

  • ThreadFactory:通过线程工厂给每个创建出来的线程设置名字,帮助Debug和定位问题。可以使用默认线程工厂Executors.defaultThreadFactory()

  • RejectedExecutionHandler:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。jdk提供4种策略

    ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常(默认)
    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

(3)示例

连接池配置类:ThreadPoolConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class ThreadPoolConfig {

@Bean(value = "myThreadPool")
public ExecutorService buildMyThreadPool(){
// 例如,"rpc-pool-%d"会产生像线程名称 "rpc-pool-0","rpc-pool-1","rpc-pool-2"
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-thread-%d").build();

ExecutorService executorService = new ThreadPoolExecutor(
15
,30
,0
,TimeUnit.MILLISECONDS
,new ArrayBlockingQueue<>(1000)
,threadFactory
,new ThreadPoolExecutor.CallerRunsPolicy()
);
return executorService;
}
}

注入并使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Test {

@Resource(name = "myThreadPool")
private ExecutorService myThreadPool;

/**
* 简单执行任务
*/
@org.junit.Test
public void test(){
for (int i = 0; i < 10; i++) {
final int index = i;
myThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("index: "+index);
}
});
}
myThreadPool.shutdown();
}
}

任务提交给线程池之后的处理策略(主要有4点)

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

82214e045b8f08f9050d3de180c428c2dce.jpg

CompletableFuture 异步编排

runAsync 和 supplyAsync方法

CompletableFuture 提供了四个静态方法来创建一个异步操作:

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync方法可以支持返回值。

runAsync方法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

package com.xieh;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");
// 没有返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("计算结果:" + i);
}, executorService);
// 什么都不返回,调用get方法,就变成了阻塞操作!
// future.get();
System.out.println("main---end");
executorService.shutdown();
}
}

img

supplyAsync方法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

// 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("计算结果:" + i);
return i;
}, executorService);

// 获取返回结果,调用get方法,就变成了阻塞操作!
Integer integer = future.get();
System.out.println("main---end:" + integer);
executorService.shutdown();
}
}

img

计算结果完成时的回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

whenComplete 和 whenCompleteAsync 的区别
whenComplete:是执行当前任务的线程继续执行 whenComplete 的任务。
whenCompleteAsync:是把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

runAsync方法计算结果完成时的后续操作示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("计算结果:" + i);

}, executorService);

future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("whenComplete执行完成!" + Thread.currentThread().getName());
}

});

future.whenCompleteAsync(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("whenCompleteAsync执行完成!" + Thread.currentThread().getName());
}

});

future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!" + t.getMessage());
return null;
}
});

// 获取返回结果,调用get方法,就变成了阻塞操作!
// future.get();
System.out.println("main---end:");
executorService.shutdown();
}
}

img

supplyAsync方法计算结果完成时的后续操作示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("计算结果:" + i);
return i;
}, executorService).whenComplete((result, exception) -> {
// 虽然能得到异常信息,但是无法修改返回数据
System.out.println("whenComplete异步任务完成了,结果是:" + result + ";异常是:" + exception);
}).whenCompleteAsync((result, exception) -> {
System.out.println("whenCompleteAsync异步任务完成了,结果是:" + result + ";异常是:" + exception);
}).exceptionally(throwable -> {
// 如果执行失败,可以设置默认返回值
return 10;
});
// 获取返回结果
Integer integer = future.get();
System.out.println("main---end:" + integer);
executorService.shutdown();
}
}

img

handle 方法

handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("计算结果:" + i);
return i;
}, executorService).whenComplete((result, exception) -> {
// 虽然能得到异常信息,但是无法修改返回数据
System.out.println("whenComplete异步任务完成了,结果是:" + result + ";异常是:" + exception);
}).whenCompleteAsync((result, exception) -> {
System.out.println("whenCompleteAsync异步任务完成了,结果是:" + result + ";异常是:" + exception);
}).handle((result, throwable) -> {
if (result != null) {
return result * 2;
}
if (throwable != null) {
return 1;
}
return 0;

// 注意:handle与exceptionally都可以控制返回值,谁先被调用就以谁的为准(先被调用者的返回值为准)
}).exceptionally(throwable -> {
// 如果执行失败,可以设置默认返回值
return 10;
});
// 获取返回结果
Integer integer = future.get();
System.out.println("main---end:" + integer);
executorService.shutdown();
}
}

img

线程串行化

  • thenApply 方法:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。获取上一个任务返回的结果,并返回当前任务的返回值

  • thenAccept 消费处理结果:接收任务的处理结果,并消费处理,无返回结果。

  • thenRun 方法:跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。

1
2
3
4
5
6
7
8
9
10
11
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

// thenRun不能获取上一步的执行结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("计算结果:" + i);
return i;
}, executorService).thenRunAsync(() -> {
System.out.println("任务2启动了...");
}, executorService);

System.out.println("main---end:");
executorService.shutdown();
}
}

img

thenAccept代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

// thenAccept能获取到上一步的结果,但是无返回值
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("计算结果:" + i);
return i;
}, executorService).thenAcceptAsync((result) -> {
System.out.println("任务2启动了,上一步的结果是:" + result);
}, executorService);

System.out.println("main---end:");
executorService.shutdown();
}
}

img

thenApply代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFutureTest {
public static ExecutorService executorService = new ThreadPoolExecutor(5, 100, 3L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main---start");

// thenApply能获取到上一步的结果,可以有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("计算结果:" + i);
return i;
}, executorService).thenApplyAsync(result -> {
System.out.println("任务2开启了,上一步的结果是:" + result);
return "thenApplyAsync的新结果";
}, executorService);

String string = future.get();
System.out.println("main---end:" + string);
executorService.shutdown();
}
}

img

任务合并

  • thenCombine:thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

  • thenAcceptBoth:当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。

  • thenCompose:thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

  • applyToEither:两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

  • acceptEither:两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

  • runAfterEither:两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)。

  • runAfterBoth:两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

thenCombine代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
});
CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
@Override
public String apply(String t, String u) {
return t+" "+u;
}
});
System.out.println(result.get());
}

thenAcceptBoth代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer t, Integer u) {
System.out.println("f1="+t+";f2="+u+";");
}
});
}

thenCompose代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void thenCompose() throws Exception {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
System.out.println("t1="+t);
return t;
}
}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = param *2;
System.out.println("t2="+t);
return t;
}
});
}

});
System.out.println("thenCompose result : "+f.get());
}

applyToEither 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});

CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t) {
System.out.println(t);
return t * 2;
}
});

System.out.println(result.get());
}

acceptEither代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static void acceptEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.acceptEither(f2, new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(t);
}
});
}

runAfterEither代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static void runAfterEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.runAfterEither(f2, new Runnable() {

@Override
public void run() {
System.out.println("上面有一个已经完成了。");
}
});
}

runAfterBoth代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static void runAfterBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});

CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.runAfterBoth(f2, new Runnable() {

@Override
public void run() {
System.out.println("上面两个任务都执行完成了。");
}
});
}

allof代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void method() throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "f1";
});

f1.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(System.currentTimeMillis() + ":" + s);
}
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

return "f2";
});

f2.whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println(System.currentTimeMillis() + ":" + s);
}
});

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);

//阻塞,直到所有任务结束。
System.out.println(System.currentTimeMillis() + ":阻塞");
all.join();
System.out.println(System.currentTimeMillis() + ":阻塞结束");

//一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
System.out.println("任务均已完成。");
}

输出:

1
2
3
4
5
06-12 20:16:37.400 31142-31142/zhangphil.test I/System.out: 1528805797400:阻塞
06-12 20:16:39.406 31142-31171/zhangphil.test I/System.out: 1528805799406:f2
06-12 20:16:40.404 31142-31170/zhangphil.test I/System.out: 1528805800404:f1
06-12 20:16:40.404 31142-31142/zhangphil.test I/System.out: 1528805800404:阻塞结束
任务均已完成。

任务超时

JDK 9带来了两种新方法,可以为每个人提供渴望的功能 - 这对于确保在使用异步处理时的正确弹性至关重要。

orTimeout

简单地说,在调用上述方法之后,如果未在指定的超时内完成,将来会抛出ExecutionException。

一个简单的例子:

1
2
3
4
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(this::computeEndlessly)
.orTimeout(1, TimeUnit.SECONDS);

future.get(); // java.util.concurrent.ExecutionException after waiting for 1 second

由于设置了timeout为1秒,那么在get那里等待1秒后抛错

completeOnTimeout

在这种情况下,我们可以在达到超时后返回默认值:

1
2
3
4
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(this::computeEndlessly)
.completeOnTimeout(42, 1, TimeUnit.SECONDS);

Integer result = future.get(); // 42

超时1秒后不是报错,而是返回了预设的42这个值,前提条件是你必须预设默认值。