Detailed explanation of JDK lock 02: AQS
1. AQS brief introduction
In this part, I will briefly describe AQS from three perspectives: what it is, what it does, and how to use it
1.1 What is it?
The full name of AQS is AbstractQueuedSynchronizer, and the Chinese name is Queue Synchronizer.
If you split the Chinese, you can see that it must be inseparable from the two concepts of queue and synchronization. The following is a further explanation of their functions.
1.2 What do you do?
AQS is the basic framework for building locks or other synchronization components.
If you learn Java concurrency, the focus is on how AQS builds locks, because synchronizers are the key to implementing locks!
-
AQS uses an int member variable to represent the synchronization state. By modifying the synchronization state, the purpose of acquiring and releasing locks is achieved.
For example, if a thread acquires a lock, it is equivalent to acquiring the synchronization state at this time.
When a thread finishes its task, it releases the lock, which is equivalent to releasing the synchronization state.
-
AQS completes the queuing of threads through the built-in FIFO queue.
This is actually not difficult to understand. When a thread fails to acquire a lock, it can choose to be stuck in a blocking state, or it can spin and retry non-blocking; when multiple threads acquire the lock exclusively, only one thread can acquire the lock successfully, and the others will fail. So how should you manage these locks that fail to compete? That's what queues do.
- When the thread fails to acquire the lock, it will enter the queue, become the tail node of the queue, and enter the waiting state
1.3 How to use it?
AQS is implemented by inheritance: subclasses manage synchronization state by inheriting synchronizer and implementing its abstract methods.
AQS supports exclusive acquisition of synchronization status and shared acquisition of synchronization status
In fact, the essence of AQS is that it simplifies the implementation of locks. We do not need to care about the underlying operation logic such as synchronization state management, thread queuing, waiting and wake-up, and we only need to focus on the core functions of locks: locking and unlocking.
It can be understood like this:
- The lock is user-oriented, it defines the interface for the user to interact with the lock, and hides the implementation details
- AQS is a lock-oriented implementer, which defines the interface between the implementer of the lock and the synchronizer, hiding the implementation details
Later, I will write a practical case to use AQS to implement a lock.
2. Brief description of AQS method
The following three methods are used to manage the synchronization state
- getState(): used to get the synchronization state
- setState(int newState): used to set the synchronization state
- compareAndSetState(int expect, int update): use CAS to set the synchronization state to ensure atomicity
As mentioned in the first part: AQS has already helped us implement the maintenance logic of the queue. When we implement the lock, we only need to rewrite the method of acquiring the lock.
- tryAcquire(int arg): Exclusively acquires the synchronization state, the return value is a boolean type, true is the acquisition success, false is the acquisition failure.
- tryRelease(int arg): Exclusively releases the synchronization state, the return value is a boolean type, true means the release is successful, and false means the release fails.
- tryAcquireShared(int arg): Acquires the synchronization state in a shared manner, and the return value is of type int.
- Returns 0 for success and no remaining resources
- A value greater than 0 is returned to indicate success, and there are still remaining resources
- Returning a negative number means the acquisition failed
- tryReleaseShared(int arg): Shared release synchronization state, the return value is boolean.
- Returns true if subsequent waiting nodes are allowed to wake up after release; otherwise returns false
- isHeldExclusively(): Whether the current synchronizer is exclusive to the thread
Through a simple understanding of these methods, you can initially understand:
-
When the synchronization state state is 0, it is possible for other threads to acquire the synchronization state, that is, to acquire the lock.
-
For reentrant locks, the synchronization state state is automatically incremented after the thread takes the lock exclusively. If the thread has been repeatedly acquiring the lock, the state will continue to accumulate; when the thread releases the lock, the state must be decremented to 0 to be considered a complete release.
3. AQS practical case
By using AQS, an exclusive non-reentrant lock is simply implemented, which means that the state of the lock has only two states: 0 and 1.
Focus on the Sync inner class inherited from AQS, which customizes the core logic of acquiring and releasing the synchronization state.
This case confirms this sentence: AQS is a lock-oriented implementer, which defines the interface between the implementer of the lock and the synchronizer, and hides the implementation details
class ExclusiveLock implements Lock { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { // When the state is set to 1 through CAS, it means the lock is successful if (compareAndSetState(0 ,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { // When the lock is released, if it is found that the lock has been released, there is an exception. if (getState() == 0) throw new IllegalArgumentException(); setExclusiveOwnerThread(null); setState(0); return true; } // When state==1, it means it is in the occupied state @Override protected boolean isHeldExclusively() { return getState() == 1; } public Condition newCondition() { return new ConditionObject(); } } private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
4. Detailed explanation of AQS source code
4.1 Synchronized Queue
As mentioned earlier, the synchronizer implements the management of threads and synchronization state through the synchronization queue.
Further explanation:
- When the current thread fails to acquire the synchronization state (lock), the synchronizer will construct the current thread as a node Node and add it to the synchronization queue, blocking the current thread
- When the synchronization state (lock) is released, the thread of the first node of the queue will wake up, and then the thread will try to obtain the synchronization state again.
As shown in the following figure: head points to the head node of the current queue, which has acquired the synchronization state (lock).
Nodes that fail to obtain synchronization status will be added to the queue in turn, and tail maintains the tail node.
Queue node status:
- CANCELLED(1): The value is 1, indicating that the current node needs to cancel the waiting due to the waiting timeout or interruption, and the node will not change after entering this state.
- SIGNAL(-1): The value is -1, indicating that the thread of the successor node is in a waiting state, waiting for the current node to wake up.
- CONDITION(-2): The value is -2, indicating that the node is waiting on the Condition. If other threads call the signal method on the Condition, the node in the CONDITION state will be transferred from the waiting queue to the synchronization queue.
- PROPAGATE(-3): The value is -3. In shared mode, the predecessor node will not only wake up its successor node, but also may wake up the successor node.
- INITIAL(0): The value is 0, representing the initial state
Observe the node status and find that: a negative value indicates that it is in a valid waiting state, while a positive value indicates that the node has been canceled.
4.2 Exclusive acquisition
The core method is acquire() , as shown below
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
This method consists of three methods: tryAcquire , acquireQueued and addWaiter .
Here is the execution flow of this method:
- Call the tryAcquire method to try to get the synchronization state, and its return value is of type boolean:
- If it returns True, it means that the synchronization status was obtained successfully. And after its negation, it is False, the method ends here, and returns directly
- If it returns False, it means that the synchronization status failed to be obtained. If it is negated, it is True, and the subsequent method needs to be further executed.
- After tryAcquire returns False, continue to execute the addWaiter method, construct a synchronization node, and use this method to add the constructed Node node to the tail of the queue in exclusive mode
- The acquireQueued method makes the node acquire the synchronization state in an infinite loop, and does not return until it is acquired
- Returns True if it was interrupted during the infinite loop, otherwise returns False
- If the thread node is interrupted while waiting, it will not respond. The selfInterrupt method will not be executed until the synchronization state is obtained
Next, let's take a look at the source code of these three core methods
-
tryAcquire
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
This core method has only one line of code, and it is the code that throws an exception. If you only look at the source code, it is really confusing.
But combined with the content of the first part and the actual combat case: when we implement a lock, we need to create an inner class inside the lock, let it inherit AQS, and rewrite the core methods in AQS. These core methods include tryAcquire. This logic of acquiring the synchronization state (lock) needs to be implemented by ourselves.
This further explains: AQS simplifies the implementation of locks. We do not need to care about the underlying operation logic such as synchronization state management, thread queuing, waiting and wake-up, and we only need to focus on locking and unlocking the core functions of locks.
-
addWaiter: look directly at the comments
The content executed in the if block is to add the current node to the tail of the queue
-
Note the need to use the compareAndSetTail method to ensure that nodes are added to the tail in a thread-safe manner.
If only a simple LinkedList is used to handle node relationships, it will cause multiple threads to be added to the linked list concurrently, causing confusion in the number and order of nodes.
private Node addWaiter(Node mode) { // Constructs the current thread as a node in a synchronized queue Node node = new Node(Thread.currentThread(), mode); // Get the tail node, tail is a member variable of the AQS class Node pred = tail; // If the tail node is not empty if (pred != null) { // Set the prev of the current node as the tail node node.prev = pred; // Set the current node as the new tail node if (compareAndSetTail(pred, node)) { // Set the next of the old tail node as the current node pred.next = node; return node; } } // If the tail node is empty, call enq spin to enter the queue enq(node); return node; }
-
enq: look directly at the comments
private Node enq(final Node node) { // It's a way of spin. Spin until Node is added to the end of the queue for (;;) { // Get the tail node of the current queue Node t = tail; if (t == null) { // If tail is empty, it means that the queue is empty, you need to create a new empty node as head and tail nodes if (compareAndSetHead(new Node())) tail = head; } else { // If tail is not empty, it means that there are thread nodes in the queue, and enter the queue operation node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
It should be noted here: when the tail node in addWaiter is empty, the enq method is called to add the node in a spin mode, which can ensure that the node is added successfully
-
-
acquireQueued: look directly at the comments
final boolean acquireQueued(final Node node, int arg) { // failed indicates whether the synchronization status (lock) failed to be acquired boolean failed = true; try { // interrupted indicates whether it was interrupted during the acquisition process boolean interrupted = false; for (;;) { // Get its predecessor node through node.predecessor() final Node p = node.predecessor(); // If the predecessor node is the head node head, it means that the current node can try to acquire the synchronization state (lock) if (p == head && tryAcquire(arg)) { // If the acquisition is successful, the current node becomes the new head node head // The setHead method will set the current node's prev to null setHead(node); // Setting the next of the old head node to null helps gc to recycle the old head node p.next = null; // Represents the successful acquisition of the synchronization state (lock) failed = false; // return if interrupted return interrupted; } // If the current thread node can rest, it will enter the waiting state until it is unpark ed if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // Marked as true if the waiting process was interrupted interrupted = true; } } finally { // If the synchronization state is not successfully obtained during the waiting process (interrupted or timed out), the thread node is set to the CANCELLED state if (failed) cancelAcquire(node); } }
-
shouldParkAfterFailedAcquire: This method is used to check the status and determine whether the predecessor node of the current thread node is still valid. see notes
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // If the status of the precursor node is valid and returns true, the current thread node can enter the waiting state and wait for wake-up. if (ws == Node.SIGNAL) return true; // If the status of the predecessor node is greater than 0, it is invalid. if (ws > 0) { // The while loop continues to find valid nodes before the predecessor node do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // Queue the current thread node after the valid node pred.next = node; } else { // If the predecessor node is valid, set its state to SIGNAL in CAS mode, so as to notify the current thread node after releasing the synchronization state compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // Return false to continue the spin check state within the acquireQueued method return false; }
-
parkAndCheckInterrupt: This method is used to enter the waiting state
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
-
4.3 Exclusive release
The release method is used to release the synchronization state, and will wake up the successor node of the current thread node, so that the successor node tries to obtain the synchronization state. The source code is as follows:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // Wake up the successor node unparkSuccessor(h); return true; } return false; }
-
The design pattern of tryRelease is the same as tryAcquire, and we need to implement it ourselves
-
unparkSuccessor: wake up the successor node
private void unparkSuccessor(Node node) { // node is the current thread node, get its status int ws = node.waitStatus; // If the status is less than 0, it needs to be set to 0 (valid to invalid), the process is allowed to fail if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Get the successor node of the current node, the general node is not empty and valid Node s = node.next; // But if the node is empty or invalid (in CANCELLED state), you need to find a valid node if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
4.4 Shared access
Let's first look at the difference between shared and exclusive access to synchronization state:
- At the same time, shared acquisition allows multiple threads to acquire the synchronization state at the same time, while exclusive access allows only one thread to acquire, and other threads are blocked
A classic application of shared acquisition is Semaphore, which controls the number of threads accessing a particular resource at the same time.
Therefore, in the explanations that follow, Resource is used instead of Synchronized State.
Call the acquireShared method to acquire resources in a shared way
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
Among them tryAcquireShared needs to be implemented manually by ourselves, but the return result type has been defined:
- When a negative number is returned, it means the acquisition failed and the remaining resources are insufficient
- When it returns a non-negative number, it means the acquisition is successful. If it is 0, it means that there are no remaining resources; if it is a positive number, it means that there are remaining resources.
When it is negative, the doAcquireShared method needs to be called to enter the waiting queue, and it will not return until the resource is obtained.
private void doAcquireShared(int arg) { // Add the current thread node to the tail of the queue final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // Spin repeats attempt to acquire resource for (;;) { final Node p = node.predecessor(); // When the predecessor node of the current thread node is the head node if (p == head) { // try to get the resource again int r = tryAcquireShared(arg); // If the number of resources is greater than or equal to 0, the acquisition is successful if (r >= 0) { // Set the head node. If r > 0, there are remaining resources, wake up the thread node after setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
4.5 Shared release
Shared release will call the releaseShared method to release the specified number of resources. The tryReleaseShared method still needs to be implemented by ourselves
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
The doReleaseShared method is mainly used to wake up the subsequent nodes in the waiting state after releasing the resources
private void doReleaseShared() { // Ensure that resources are released thread-safely by looping and CAS operations for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
5. Write it down
references:
-
JDK5.0 source code
-
The Art of Java Concurrent Programming
There will be about 5 articles in this series. I try my best to write my understanding of JUC in an easy-to-understand manner.
But if there are any mistakes, please point them out, and I will learn and improve in time~
If you think my content is good, you can leave a message in the comment area to support it~
Welcome to visit my personal blog~