AQS

AbstractQueuedSynchronizer

(原创)

风住尘香花已尽 -李清照《武陵春·春晚》

在了解AQS前,我们需要先了解CLH lock,AQS是CLH的变种,CLH is Craig, Landin, and Hagersten (CLH) locks(三个发明者名字缩写)。

克雷格.兰丁&hagersten (CLH Lock)

大概就是CLH维护一个list(从尾部向前指的单向链表),每个线程有自己的两个ThreadLocal变量,一个是pre,一个是当前node。node中有一个布尔值的成员变量。入队后的每个线程自旋locked变量,检查前一个结点是否放锁。Thread-1为竞争成功的线程,因为头结点的已经默认初始化为false,当调用clh.lock()的时候,只有Thread-1线程进入临界区,其他线程设置自己的locked为ture,同时自旋检查前一个结点locked状态,一旦前驱结点的locked状态为false,当前结点就进入临界区。

clh.jpg

前面的CLH是实现AQS的基础,AQS是并发的基础设施,在ReentrantLock、ReentrantReadWriteLock、Semaphore、ThreadPoolExecutor的Worker都有用到,AQS阻塞队列是以head结点的线程为空的的双向队列。把每一个入队线程封装成Node结点,其中,Node中的thread就是线程本体,nextWaiter是用于条件队列(单向链表Node)。waitStatus的状态有:CANCELLED(1)表示当前结点的线程被中断,SIGNAL(-1)表示当前结点需要唤醒下一个结点,CONDITION(-2)用于表示条件队列的结点。PROPAGATE(-3)可传播,仅用于设置头结点。AQS同步队列(阻塞队列)如下。

aqs1.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static final class Node {

// 共享模式
static final Node SHARED = new Node();

// 独占模式
static final Node EXCLUSIVE = null;


// 结点被取消,因超时或者被中断。被取消后不会再阻塞
static final int CANCELLED = 1;

// 用于当前结点唤醒下一个结点线程唤醒标识
static final int SIGNAL = -1;

// 用于条件队列
static final int CONDITION = -2;
//传播标识
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}
//返回当前结点的上一个结点。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//用于创建共享结点或初始化。
Node() {
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153

public final void acquire(int arg) {
//尝试获取锁,如果获取不成功,addWaiter把当前线程封装成独占模式结点,
// tryAcquire留给子类实现。
if (!tryAcquire(arg) &&
// 前面tryAcquire失败,则把该线程封装成Node的EXCLUSIVE节点入AQS阻塞队列。
// addWaiter入队成功后会返回当前线程结点。
// acquireQueued返回是否被interrupted,一般为false。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantLock为例子,非公平锁实现tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 第一次拿锁。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 不是第一次,判断是否是同个线程。是就可重入。否则返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 线程尝试获取锁失败后,封装成结点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 阻塞队列不为空。完成head和tail的初始化。
if (pred != null) {
node.prev = pred;
//或许一次就成功了,或许失落出去。:)
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 成功入队。
return node;
}
}
// 来到这里说明为head为空,或者多个线程同时争抢,compareAndSetTail,争抢失败。
enq(node);
// 返回成功入队封装当前线程的结点。
return node;
}
// 自旋,一直尝试入队,直到入队成功。head和tail在此处初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//这里初始化后,继续自旋。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
// 若初始化完成,继续尝试入队,直到入队成功。
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}




// 如果这个方法返回true,则真正阻塞自己selfInterrupt();
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果前一个是头结点,再次尝试争抢锁。这个可能是其他线程的解锁操作,解锁后唤醒head节点的下一个结点。
// 线程被唤醒后,从parkAndCheckInterrupt返回,
// 看看可不可以抢到。
if (p == head && tryAcquire(arg)) {
// 成功抢到,设置当前结点为头结点。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前一个不是头结点,把前一个节点和当前结点传入shouldParkAfterFailedAcquire,找前面为SIGNAL的前驱结点。
// 同时设置前一个为0的节点为-1
// 当p找到了,p为SIGNAL(-1)时,
// shouldParkAfterFailedAcquire返回true。 此时parkAndCheckInterrupt真正的挂起自己。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 来到这,说明tryAcquire方法有异常,取消该节点。
if (failed)
cancelAcquire(node);
}
}

// 该方法被放入acquireQueued自旋中,找前一个为SIGNAL的结点,此处依赖它唤醒自己。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// pred为的ws为-1就成功返回。
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 说明pred结点是被取消的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 往前找<=0的结点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 成功找到后,设置pred的下一个节点为当前结点。
pred.next = node;
}
else {

/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 来到这里说明,pred结点不是SIGNAL(-1),也不大于0,那么就可能是为0、CONDITION(-2)、PROPAGATE(-3)
// 每个新入队的结点都是0,并且,入队后都把前面的结点设置成-1。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不成功继续在acquireQueued自旋。
return false;
}
// 线程挂起,返回是否中断。
private final boolean parkAndCheckInterrupt() {
//挂起线程。等待唤醒
LockSupport.park(this);
//被唤醒后,返回。
return Thread.interrupted();
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final boolean release(int arg) {
//tryRelease子类实现。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继结点。
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantLock实现的tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 进来的是头结点。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 状态为-1就重置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 后继节点可能取消等待
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒s节点的线程。
LockSupport.unpark(s.thread);
}

如果线程1拿到锁后,在线程1没有放锁前,线程2尝试拿锁,拿锁失败。进入阻塞队列,此时,初始化head、tail。把head的waitStatus设置成-1,当前入队节点为waitStatus=0。

ConditionObject

条件队列。AQS内部类。只有获取锁后才有await/signal,signalAll操作,否则抛出IllegalMonitorStateException

  • ReentrantLock可以获取newCondition()每次调用都是新的条件队列。
  • 每个条件队列维护一个单向链表,被signal后,把条件队列的firstWaiter移到AQS阻塞队列。
  • 即使结点被中断了,仍然会进入阻塞队列
1
2
3
4
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final void await() throws InterruptedException {
// 响应其他线程中断此线程
if (Thread.interrupted())
throw new InterruptedException();
// 加入条件队列,不是条件结点,将被移除。
Node node = addConditionWaiter();
// 返回重入次数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断结点是否在阻塞队列。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 唤醒后,检验中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 进入阻塞队列。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// node.next为空,从后往前找。tail重新设置。
return findNodeFromTail(node);
}