谈一谈ThreadPoolExecutor

(原创)

花自飘零水自流。 -李清照 《一剪梅·红藕香残玉簟秋》

​ 开发中,我们都有形或无形中用到线程池 (SpringBoot的@Async、Tomcat线程池 …..)。

  • 线程池可以降低资源消耗(如果配置适当)。

  • 线程池不用自己频繁创建、销毁线程。

  • 能更好的管理线程,提高对线程的管理,并可查看执行任务数。

  • 使用者设置参数,直接提交任务就行了

    本文将涉及到的类会讲一讲,会有些长,请耐心看。

    分析ThreadPoolExecutor之前我们先看看它的父接口和父类:

    1.PNG

    前奏

    Executor

    这个接口只有一个方法, 提交一个任务,没有返回任务的结果值。

    1
    void execute(Runnable command);
    1
    2
    3
    Executor executor = anExecutor;
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());

    ExecutorService

    这个继承了Executor,有更多契约。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 任务继续执行并执行队列剩余的任务。不接受新任务。
    void shutdown();

    // 一旦调用,停止工作线程的任务(工作线程需响应中断,否则无法停止),不执行队列剩余任务,返回队列的任务。
    List<Runnable> shutdownNow();

    // 返回线程池是否Shutdown状态
    boolean isShutdown();

    // 返回线程池是否完全关闭状态
    boolean isTerminated();

    // 一直阻塞,直到线程池状态Terminated后回调。
    // true 线程池成功关闭后返回,false 超时
    boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException; //响应中断

    // 有返回值的任务提交。 函数式接口Callable<V>接口方法: V call() throws Exception;
    // 如果想要阻塞等待任务返回结果:exec.submit(aCallable).get();
    <T> Future<T> submit(Callable<T> task);

    // T result 自定义返回结果的类型
    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    // 执行给定任务,当他们全部执行完成,返回结果和状态。 Future#isDone 任务可能正常完成终止或通过抛出异常终止
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

    // timeout 等待的最长时间, unit 超时参数的时间单位,
    //如果任务执行超时,这任务的状态将不是完成。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException;


    // 任务集合,只要有一个完成即可。未完成的任务将被取消 ExecutionException这个异常是没有任务成功完成
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

    // 同样的,只是设置了超时。TimeoutException这个异常是: 指定时间过去了,但是没有一个任务完成
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

    AbstractExecutorService

    抽象类,实现ExecutorService接口。提供ExecutorService的默认实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115


    // value 返回的默认值
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }


    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }
    // 下面的submit都差不多,只是任务类型不同。
    ...



    // 注意这个方法,invokeAny都会用到
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
    boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    // 校验
    if (tasks == null)
    throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
    throw new IllegalArgumentException();

    // 存储任务返回值列表
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    // 这个是安排提交的任务。
    // 这个类实现CompletionService接口。内部其实还是用的是AbstractExecutorService,可扩展。
    // 里面有存储完成任务的队列(completionQueue),对于任务的获取(take、poll、poll(timeout, unit))
    ExecutorCompletionService<T> ecs =
    new ExecutorCompletionService<T>(this);



    try {
    // 记录异常
    ExecutionException ee = null;
    // 是否设置了超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;

    // 任务迭代器
    Iterator<? extends Callable<T>> it = tasks.iterator();

    // 开始一个任务,并且存储返回值。
    futures.add(ecs.submit(it.next()));
    // 待执行任务数-1
    --ntasks;
    // 已添加任务数+1
    int active = 1;

    for (;;) {
    // 注意这里返回的是FutureTask<T>(callable);
    Future<T> f = ecs.poll();
    // 代表没有已完成的任务。
    if (f == null) {
    if (ntasks > 0) {
    // 待执行任务数-1
    --ntasks;
    // 重新加一个任务
    futures.add(ecs.submit(it.next()));
    // 已添加任务数+1
    ++active;
    }
    // 注意这里,一开始有人任务提交都是+1,那么什么时候为0呢?
    // 玄机就是在下面f!=null的时候,如果下面任务正常完成了那么肯定会return,除非异常,
    // 注意:下面 if (f != null)并没有 throw e,抛异常操作是在自旋之外。
    // 也就是说,这里的active==0,是所有任务都失败了。必须出去。
    else if (active == 0)
    break;
    // 检验是否超时
    else if (timed) {
    // 获取任务执行结果
    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
    if (f == null)
    throw new TimeoutException();
    nanos = deadline - System.nanoTime();
    }
    else
    //阻塞获取已完成的任务
    f = ecs.take();
    }
    // 说明已经有完成的任务了,
    if (f != null) {
    // 减提交的任务数。
    --active;
    try {
    // 阻塞获取,除非有异常
    return f.get();
    } catch (ExecutionException eex) {
    ee = eex;
    } catch (RuntimeException rex) {
    ee = new ExecutionException(rex);
    }
    }
    }
    // 这里就是接自旋的异常。
    if (ee == null)
    ee = new ExecutionException();
    throw ee;
    // 无论是否正常完成,都取消其他任务。
    } finally {
    // 取消其他任务
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);
    }
    }



    FutureTask

    • 一个可以取消的异步计算,get()将阻塞如果没完成任务
    • 可用于包装Callable或Runnable 对象
    • FutureTask可以提交给Executor用于执行
    • 状态转换:
    • NEW -> COMPLETING -> NORMAL
    • NEW -> COMPLETING -> EXCEPTIONAL
    • NEW -> CANCELLED
    • NEW -> INTERRUPTING -> INTERRUPTED

    下面我们看一看FutureTask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
      
    private volatile int state;
    // 任务状态(state)。
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;


    // 任务返回值 或者 异常
    // 注意没有volatile ,受读写保护
    //只有cas成功的线程才能赋值。
    private Object outcome;

    //执行callable的线程
    private volatile Thread runner;

    // 等待的线程节点
    private volatile WaitNode waiters;



    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
    }
    // 注意这个 result 是给定的返回值
    public FutureTask(Runnable runnable, V result) {
    // 任务适配器 返回RunnableAdapter类型。如果调用callable.call(),任务将会执行,完成后返回result
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
    }

    // 这个只要是CANCELLED、INTERRUPTING、INTERRUPTED状态,就是认为任务取消了
    public boolean isCancelled() {
    return state >= CANCELLED;
    }

    // 只要不是新建状态的任务都是Done状态,包括阻塞、异常、正常状态
    public boolean isDone() {
    return state != NEW;
    }


    public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //只要是NEW、COMPLETING状态,就阻塞获取。
    if (s <= COMPLETING)
    //当多个线程进入时,等待任务完成。
    s = awaitDone(false, 0L);
    // 这里只是任务状态是NORMAL才成功返回,否则抛异常外面接。
    return report(s);
    }

    // 此方法响应中断。
    private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    //是否设置了超时。
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋。
    for (;;) {
    // 如果该线程被中断了,那么移除等待节点,
    if (Thread.interrupted()) {
    //移除当前节点,第一次进来,q是为空的,也就是什么也不做。
    removeWaiter(q);
    throw new InterruptedException();
    }

    int s = state;
    // 不是新建状态,也不是执行任务的状态,返回state。
    if (s > COMPLETING) {
    if (q != null)
    q.thread = null;
    return s;
    }
    // 这个任务是在执行中,让出CPU调度
    else if (s == COMPLETING) // cannot time out yet
    Thread.yield();
    else if (q == null)
    //任务是新建状态,包装当前线程。
    q = new WaitNode();
    else if (!queued)
    //来到这里,就是线程包装WaitNode好了,进入链表等待。这个是cas入队新创建的当前线程。头插法。等待
    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
    q.next = waiters, q);

    // 检验是否超时等待
    else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
    removeWaiter(q);
    return state;
    }
    LockSupport.parkNanos(this, nanos);
    }
    else
    //线程入队了,可以阻塞了。一旦任务执行完成,也就是status>COMPLETING,
    //finishCompletion();后会LockSupport.unpark(t);
    //线程又会从这里醒来,再继续自旋,满足条件后return state
    LockSupport.park(this);
    }
    }

    // s等于status
    private V report(int s) throws ExecutionException {
    Object x = outcome;
    //NORMAL完成了任务,就返回
    if (s == NORMAL)
    // 返回outcome
    return (V)x;
    // 任务状态是CANCELLED、INTERRUPTING、INTERRUPTED,抛出取消异常。
    if (s >= CANCELLED)
    throw new CancellationException();
    //如果都不是,就是执行异常。
    throw new ExecutionException((Throwable)x);
    }


    // 移除中断、超时的节点,
    private void removeWaiter(WaitNode node) {
    // 注意这个细节,设置了thread为空,下面自旋有用。
    if (node != null) {
    node.thread = null;
    //标识外层循环
    retry:
    for (;;) {
    //不为空就遍历单向链表
    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    //下一个节点。
    s = q.next;
    //只要是不为空的thread,就不是刚开始设置的node。
    //那么就一直找
    if (q.thread != null)
    //记录前面的节点
    pred = q;
    //如果找到了当前thread为空。
    else if (pred != null) {
    //那么上一个节点,连接下一个节点。
    pred.next = s;
    //检验是否被其他线程改。这部分没搞懂。
    if (pred.thread == null) // check for race
    continue retry;
    }
    //如果pred为空,当前节点,q.thread为空
    //交换当前节点为下一个节点。
    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
    q, s))
    continue retry;
    }
    // 这里就是q=null,跳出自旋。
    break;
    }
    }
    }

    // 设置返回值
    protected void set(V v) {
    //前提是任务为新建状态。
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
    }
    }

    // 设置返回异常结果
    protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    finishCompletion();
    }
    }


    public void run() {
    // 保证任务是新建状态。如果是新建状态,设置执行任务的线程
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
    null, Thread.currentThread()))
    return;
    try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
    //执行任务,并拿到返回值。
    result = c.call();
    ran = true;
    } catch (Throwable ex) {
    result = null;
    ran = false;
    // 设置异常原因
    setException(ex);
    }
    //任务执行完成
    if (ran)
    //设置任务执行状态,NEW->COMPLETING,设置任务的执行结果:outcome=result。
    //成功后在设置任务状态(UNSAFE.putOrderedInt):COMPLETING->NORMAL
    set(result);
    }
    } finally {
    // 无论是否正常完成,清除当前任务的线程。防止并发调用run
    runner = null;
    int s = state;
    //响应中断。
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    }


    // 此方法没设置result.
    protected boolean runAndReset() {
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
    null, Thread.currentThread()))
    return false;
    boolean ran = false;
    int s = state;
    try {
    Callable<V> c = callable;
    if (c != null && s == NEW) {
    try {
    c.call();
    ran = true;
    } catch (Throwable ex) {
    setException(ex);
    }
    }
    } finally {

    runner = null;
    // 再获取一下状态,响应中断
    s = state;
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    // 执行完成了并且还是任务还是新建状态
    return ran && s == NEW;
    }

    // 取消任务方法。
    // mayInterruptIfRunning状态标识:true :INTERRUPTING false :CANCELLED
    public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    // 任务状态不是新建的,或者cas失败
    return false;
    try {
    if (mayInterruptIfRunning) {
    try {
    Thread t = runner;
    if (t != null)
    // 中断
    t.interrupt();
    } finally { // final state
    // 设置INTERRUPTED状态。
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    }
    }
    } finally {
    finishCompletion();
    }
    return true;
    }


    // run方法的finally都用这个,用于唤醒线程并且把等待节点清空。
    private void finishCompletion() {

    for (WaitNode q; (q = waiters) != null;) {
    // cas设置 WaitNode为空
    if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
    for (;;) {
    Thread t = q.thread;
    // 只要还有thread,就一直唤醒。并且清空WaitNode节点。
    if (t != null) {
    q.thread = null;
    LockSupport.unpark(t);
    }
    WaitNode next = q.next;
    if (next == null)
    break;
    q.next = null; // unlink to help gc
    q = next;
    }
    break;
    }
    }
    // 子类实现
    done();
    // 已经执行完成了,无任务了。
    callable = null;
    }

    中篇

    ThreadPoolExecutor

    说了这么多,前面是铺垫,现在讲线程池了(全是重点),仔细读一读,不禁感叹,doug lea大师就是大师,既能把所有情况都考虑到,又能把代码写的如此简洁。让我们体会一下细节的力量,细节是洪水猛兽。

    把线程池化是提高资源的利用率,是统一管理资源,监控已有资源状态。

    这里记录下个人对源码结合注释的解读。

    线程池的生命周期状态:

    • RUNNING
    • SHUTDOWN
    • STOP
    • TIDYING
    • TERMINATED
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// workerCount : 有效的线程数量,存活线程。
// runState : 线程池状态。
// 限制线程数 : (2^29)-1 (about 500 million)
//------线程池状态
// RUNNING : 接受新任务,执行阻塞队列的任务
// SHUTDOWN : 不接受任务,但会执行阻塞队列的任务,以及继续执行正在执行的任务。
// STOP : 不接受任务,也不会执行阻塞队列的任务,同时中断正在执行的任务的线程。
// TIDYING : 所有任务完成,存活线程数为0,线程池状态转向tydying状态,并调用结束terminated(),钩子方法,
// TERMINATED: 当terminated()完成。
// 状态转换 : RUNNING -> SHUTDOWN call shutdown()
// SHUTDOWN -> TIDYING 阻塞队列无任务,线程无存活。
// STOP -> TIDYING 存活线程为空
// TIDYING -> TERMINATED 当terminated()完成

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),
// 高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的
// 情况,不必为了维护两者的一致,而占用锁资源。
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 存活线程
private final HashSet<Worker> workers = new HashSet<Worker>();

private final ReentrantLock mainLock = new ReentrantLock();
// 条件队列
private final Condition termination = mainLock.newCondition();

// 当前最大线程池数
private int largestPoolSize;

// 完成任务数
private long completedTaskCount;

// 创建线程的线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略(当线程池最大线程数为峰值并且队列已满,执行拒绝策略)
private volatile RejectedExecutionHandler handler;

// 当超过核心线程数的线程还有设置允许allowCoreThreadTimeOut,则使用此超时时间,
private volatile long keepAliveTime;

// 当此变量为false(默认),核心线程在没工作的时候也存活,总是等待任务。
private volatile boolean allowCoreThreadTimeOut;

// 核心线程数
private volatile int corePoolSize;

// 最大线程数
private volatile int maximumPoolSize;
// 默认拒绝策略,抛异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();





1.PNG

​ 图1 提交任务的判断

经上图的提交任务的策略顺序来看,阻塞队列更像一个起到缓冲作用的用法,当核心线程都在跑任务,还要提交任务,就进缓冲。注意这个缓冲,如果把设置的Integer最大,如果场景的任务非常多。是很容易OOM的。因为非核心线程没用上,更不用说拒绝策略了。

线程池的七大参数,根据场景设置,事实是这个并不好设置,需要经验。

构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

// 七个参数:corePoolSize核心线程数、
// maximumPoolSize最大线程数、
// keepAliveTime默认允许非核心线程存活时间,unit时间单位,workQueue阻塞队列
// threadFactory创建线程的工厂。
// RejectedExecutionHandler 阻绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

提交任务的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31


public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 这里成功后,返回出去,任务成功添加,并且是核心线程执行。
if (addWorker(command, true))
return;
// 到了这里,线程池状态发生了改变,再看看线程池状态,>SHUTDOWN,
c = ctl.get();
}
// 来到这线程池状态发生了改变,或者线程数大于等于核心线程数,任务会入队。
// 线程池状态是RUNNING,并且任务成功入队,
if (isRunning(c) && workQueue.offer(command)) {
// 再次查看线程池状态。
int recheck = ctl.get();
// 一旦>RUNNING,就不接受提交任务,执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 来到这,说明线程池状态正常:<=SHUTDOWN,这里判断下工作线程是不是为0,毕竟还有任务,
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 这里的情况是,当前线程数大于核心线程数,并且任务队列满了,尝试创建 thread,此时若成功,即:workers >=corePoolSize,
// workers< maximumPoolSize,否则不是正常的线程池状态,执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}

我们可以看到,提交任务跟addWorker关系密切,线程池的状态尤为重要。

好了,这儿我们先看看Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145




//我们看到 worker继承了AQS,用独占不可重入的设计。每个woker都有自己锁,防止执行任务时被中断。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

// 记录线程
final Thread thread;

// 任务
Runnable firstTask;

// 每个线程的执行完成的任务数
volatile long completedTasks;


Worker(Runnable firstTask) {
//注意这里,非常重要,这个-1,防止中断,包括shutDownNow的方法中断。
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}


// 线程开始执行自己的任务时,先执行外界给的firstTask,完成后,自旋从队列取任务(阻塞)
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//是否自己持有锁。
protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

// shutDownNow()使用,相当于worker自杀。
void interruptIfStarted() {
Thread t;
// getState大于0才能自我中断,也就是在执行构造方法的时候不能中断。
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}


// 到这就是,每个线程执行的方法。
// 太妙了,全是重点。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拿到使用者给的第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 这儿是因为在构造worker设置状态为-1, unlock后,也就是release(1)
// release会tryRelease,其中 setState(0);这是中断的前提条件
// 因为中断可以是tryLock,也可以自我中断,但首先需满足getStaus()!=0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// getTask()可能会阻塞住,也可能返回为空
while (task != null || (task = getTask()) != null) {
// 注意,这个是不可重入锁,到了这里,有任务,并且不会响应shutdown中断。
w.lock();
// 到了这里会检查下是否达到中断自己的条件
// 检验线程池状态,如果线程池是STOP、TIDYING、TERMINATED状态,确保自己中断
// 这个用意是响应shutDownNow的线程自杀。也就是不让执行任务了。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
// 前提条件符合,并且自己还没有中断标识
!wt.isInterrupted())
// 正式中断自己
wt.interrupt();
try {
// 线程池状态正常,开始执行任务。
// 执行任务前需要执行的方法,留给子类。可是这里并没有抓异常
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这里开始执行任务
task.run();

// 抓异常抛出并给到thrown,用于finally的afterExecute
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 骚操作,留给子类,执行任务可能会有异常,子类要实现就可以把异常拿到做处理。
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 如果没到这里,那就说明上面任务中出现了异常(completedAbruptly=true),
// 如果到了这里,在getTask()返回了空,
completedAbruptly = false;
} finally {
// 如果来到这,就是线程执行任务时,有异常了或者getTask()返回了空。线程本该执行任务完成后从任务队列取任务。
processWorkerExit(w, completedAbruptly);
}
}



线程从阻塞队列获取任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 从阻塞队列获取任务。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 自旋
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 老规矩,检查线程池状态。
// 线程池状态是SHUTDOWN、STOP、TIDYING、TERMINATED
// 并且,>= (STOP、TIDYING、TERMINATED)或者任务队列为空
// 我们可以看到,即使SHUTDOWN状态,任务队列还有任务,就不会返回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
//减完工作线程数就返回了。
return null;
}
// 线程数
int wc = workerCountOf(c);

// Are workers subject to culling?
// 允许核心线程空闲超时,或者线程数超过了核心线程数。
// 这个超时设置,我们可以看到,如果是使用者可以定义allowCoreThreadTimeOut为true,或者有非核心线程了。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// (wc > maximumPoolSize || (timed && timedOut)) :
// 只要是超过最大线程数。 或者允许了核心线程空闲超时、有非核心线程活动,但是没有任务
// timedOut 在下面判断
// 我们可以看到,只要线程数没超过最大线程数,没有允许核心线程空闲超时,并且wc<=核心线程,就不会返回空,会阻塞获取任务
// 返回空的,是当前线程数超过核心线程数,并且队列没任务,
// 那么线程会返回到processWorkerExit,进行清理空闲线程操作,
// completedAbruptly = false;为了干掉超过核心线程数的线程
if (
(wc > maximumPoolSize || (timed && timedOut))
&&
// 注意:是大于1,只要有2个线程以上存活、或者线程数为1却没任务
(wc > 1 || workQueue.isEmpty())) {
// 减去多余的线程
if (compareAndDecrementWorkerCount(c))
return null;
// 自旋,compareAndDecrementWorkerCount失败,发生竞争。
continue;
}

try {
// timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 到了这里,说明不允许核心线程空闲超时,而且没有非核心线程,这个take是会阻塞的。也就是没有任务,一直在等。
workQueue.take();
if (r != null)
return r;
// 拿到任务为空。说明可以尝试 kill 线程了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

在getTask中,会检查线程池状态,并且符合条件杀掉多余的线程。

下面我们看看添加工作线程的方法。太妙了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102


private boolean addWorker(Runnable firstTask, boolean core) {
// 外层循环
// 注意这里没有上大锁。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 检查线程池状态和队列的状态。
// 线程池状态>=(SHUTDOWN、STOP、TIDYING、TERMINATED)
if (rs >= SHUTDOWN &&
// 能来到这,说明线程池状态非正常,
// 这个条件( rs == SHUTDOWN, firstTask == null , ! workQueue.isEmpty() )
// 只要有一个为false,就符合退出条件。
// SHUTDOWN,要是还提交任务就是不允许啦。
// 我们看到即使SHUTDOWN状态, firstTask为空(这个是其他地方有用),
// 队列还有任务,就不会返回false,也就是说,会继续创建线程
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 里层循环
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
// 结合核心最大线程数、最大线程数限制,判断是否符合创建线程条件,
if (wc >= CAPACITY ||
// 注意:
// 如果是core,大于等于 corePoolSize,不让创建线程
// 如果非核心线程,大于等于maximumPoolSize,不让创建线程
wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// 符合后,就先增加线程数
if (compareAndIncrementWorkerCount(c))
// 注意,这里是跳出外层循环,---出口
break retry;
// 如果失败,就说明其他线程抢先一步增加了线程,发生竞争,再次循环
c = ctl.get(); // Re-read ctl
// 在看看是不是线程池状态发生了变化。
if (runStateOf(c) != rs)
// 发生了变化,跳到外界循环,检查线程池状态。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 到了这里说明上面成功增加了线程数。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {

w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 注意:到了这里,才拿这把大锁。只允许线程池中一个线程进入创建线程。
// 另外,tryTerminate(),也会拿这个把大锁,也就是说,一旦此线程拿到了这把大锁,
// 那么,尝试关闭线程池的线程让他等着,等我先造一个线程完后再关闭吧。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次检查线程池状态,如果为SHUTDOWN状态,firstTask为空,还创个啥
// 这里抛出了IllegalThreadStateException异常,但是没有catch,抛到外层
// 哪里会出现firstTask为空呢,
// 也就是在: public void setCorePoolSize(int corePoolSize)
// public boolean prestartCoreThread() 提前加一个核心线程。
// void ensurePrestart()
// public int prestartAllCoreThreads() 提前启动所有核心线程。相当于预热。
// private void processWorkerExit。处理线程退出。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 好了,该检验的都检验了,加线程了。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 这个是记录当前的线程池的峰值线程数。
largestPoolSize = s;
// 设置已添加线程标识
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果成功添加了,开始任务。
if (workerAdded) {
//这里会执行runWorker方法,
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144




// 这里就是有异常后处理 completedAbruptly可以看做异常、任务队列没有任务的标注
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//有异常,减少工作线程数
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 注意,上一把大锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 拿到该线程已完成的任务数
completedTaskCount += w.completedTasks;
// 移除当前的线程。此线程可能时执行任务有异常,或者线程池的线程数超过核心线程数并且队列没有任务,
workers.remove(w);
} finally {
mainLock.unlock();
}

// 尝试关闭线程,
tryTerminate();

int c = ctl.get();
// 如果线程池在 RUNNING SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// completedAbruptly=false ,getTask()为空。来到了这。
if (!completedAbruptly) {
// allowCoreThreadTimeOut:允许核心线程超时。 允许的话,等空闲超时后线程数就是为0了,
// 因为都允许核心线程空闲超时了,还有什么可留恋的。
// 如果不允许,那就最小的线程数的就是corePoolSize数。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 有可能队列新提交了任务,线程不能死。
if (min == 0 && ! workQueue.isEmpty())
// 留个活口
min = 1;
// 先看看线程池的线程数是不是符合条件,符合了就不用增加线程了。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 到了这里,就是队列还有任务,但是没有线程
addWorker(null, false);
}
}


// 关闭线程池:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 提前设置线程池的状态,自旋cas
// 里面会先看线程池状态是否已经被其他线程设置了,这样就不用再重复设置了。
advanceRunState(SHUTDOWN);
// 这里会中断所有workers线程,
// 注意里面的interruptIdleWorkers是false(如果为true,只是tryLock一次),也会有把大锁(可重入),
// 但是仅仅是tryLock(),也就是在执行任务的线程不会中断。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}


// shutdown用的是false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 第二个条件,不可重入锁,已经在执行任务的线程不会中断。tryLock是cas 0 -> 1
// 失败了就下一个worker
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}



// 符合关闭条件,则会自旋一个一个中断线程

final void tryTerminate() {
// 自旋,
for (;;) {
int c = ctl.get();
// 线程池状态为 RUNNING,不让关闭
if (isRunning(c) ||
// 或者线程池状态为 TIDYING、TERMINATED,说明其他线程已经在关闭了,此时,不用尝试关闭了
runStateAtLeast(c, TIDYING) ||
// 来到这里就是SHUTDOWN、STOP状态
// 线程池状态为SHUTDOWN但是还有任务,就不关闭了
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 前面进行状态检验,
// 来到这就说明可以关闭了。
// 这个时候,线程池状态可能是SHUTDOWN但是队列没任务,也可能是STOP状态
if (workerCountOf(c) != 0) { // Eligible to terminate
// 看看尝试中断一个线程。
// 其实这个时候 工作线程会在getTask(),看线程池状态,是shutdown的话,并且任务队列为空
// 自己会先减工作线程数的。decrementWorkerCount,并且retuen null。这个时候就用到processWorkerExit。
interruptIdleWorkers(ONLY_ONE);
return;
}
// 到了这里,符合关闭线程池了,工作线程数为0
// 拿大锁,只允许一个线程。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas线程池状态,准备关闭,先设置TIDYING状态,并且工作线程数为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类实现
terminated();
} finally {
// 设置成功后,在设置成TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 这个时候 唤醒条件队列在等待TERMINATED的条件的线程,
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

马上关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 提前设置线程池状态。
advanceRunState(STOP);
// 注意,这个是线程自杀
interruptWorkers();
// 拿到任务队列的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}


// 这里跟shutdown不同的是,这里会执行worker自己的中断方法,不用尝试加锁,也就是正在执行任务的线程也会中断。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

尾声

好了,到了这就是接近尾声了,我们看看一些辅助方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 这个方法是等待指定时间,看能不能等到线程池关闭后,用户做一些生命周期的操作
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}



还可以设置核心线程数,最大线程数,keepAliveTime,拒绝策略,线程工厂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

public void setThreadFactory(ThreadFactory threadFactory)

public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
// 中断空闲的线程。
interruptIdleWorkers();
// 如果是调大了核心线程数
else if (delta > 0) {
// 取相差了的核心线程数,和任务队列的任务数做对比。取最小的数
int k = Math.min(delta, workQueue.size());
// 1:取任务队列的数,说明新增的核心线程比任务队列的数还要大。启动核心线程拿任务
// 2:取新增的核心线程的数,说明任务队列的数比新增的核心线程数还要大。
while (k-- > 0 && addWorker(null, true)) {
// 只要任务队列为空就不加线程了。
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize)

public void setKeepAliveTime(long time, TimeUnit unit)
// 清除被取消的任务
public void purge()

// 获取正在工作的线程
public int getActiveCount()

// 获取线程数达到的峰值
public int getLargestPoolSize()

// 获取线程完成的任务(包括有异常的任务)+正在执行的任务+任务队列的任务
public long getTaskCount()

// 允许核心线程超时回收
public void allowCoreThreadTimeOut(boolean value)

拒绝策略(当然可以自己实现RejectedExecutionHandler自定义自己的拒绝策略),我们看看线程池默认提供的拒绝策略:

  • CallerRunsPolicy : 拿去任务的线程自己执行(前提是线程池状态正常run)

  • AbortPolicy: 默认抛RejectedExecutionHandler异常

  • DiscardPolicy:忽略此任务

  • DiscardOldestPolicy:把任务队列的最老的任务弹出,新增现在提交的任务

结束。