Detailed explanation of JDK lock 02: 10,000-character text!

Detailed explanation of JDK lock 02: AQS

1. AQS brief introduction

In this part, I will briefly describe AQS from three perspectives: what it is, what it does, and how to use it

1.1 What is it?

The full name of AQS is AbstractQueuedSynchronizer, and the Chinese name is Queue Synchronizer.

If you split the Chinese, you can see that it must be inseparable from the two concepts of queue and synchronization. The following is a further explanation of their functions.

1.2 What do you do?

AQS is the basic framework for building locks or other synchronization components.

If you learn Java concurrency, the focus is on how AQS builds locks, because synchronizers are the key to implementing locks!

  1. AQS uses an int member variable to represent the synchronization state. By modifying the synchronization state, the purpose of acquiring and releasing locks is achieved.

    For example, if a thread acquires a lock, it is equivalent to acquiring the synchronization state at this time.

    When a thread finishes its task, it releases the lock, which is equivalent to releasing the synchronization state.

  2. AQS completes the queuing of threads through the built-in FIFO queue.

    This is actually not difficult to understand. When a thread fails to acquire a lock, it can choose to be stuck in a blocking state, or it can spin and retry non-blocking; when multiple threads acquire the lock exclusively, only one thread can acquire the lock successfully, and the others will fail. So how should you manage these locks that fail to compete? That's what queues do.

    • When the thread fails to acquire the lock, it will enter the queue, become the tail node of the queue, and enter the waiting state

1.3 How to use it?

AQS is implemented by inheritance: subclasses manage synchronization state by inheriting synchronizer and implementing its abstract methods.

AQS supports exclusive acquisition of synchronization status and shared acquisition of synchronization status

In fact, the essence of AQS is that it simplifies the implementation of locks. We do not need to care about the underlying operation logic such as synchronization state management, thread queuing, waiting and wake-up, and we only need to focus on the core functions of locks: locking and unlocking.

It can be understood like this:

  • The lock is user-oriented, it defines the interface for the user to interact with the lock, and hides the implementation details
  • AQS is a lock-oriented implementer, which defines the interface between the implementer of the lock and the synchronizer, hiding the implementation details

Later, I will write a practical case to use AQS to implement a lock.

2. Brief description of AQS method

The following three methods are used to manage the synchronization state

  1. getState(): used to get the synchronization state
  2. setState(int newState): used to set the synchronization state
  3. compareAndSetState(int expect, int update): use CAS to set the synchronization state to ensure atomicity

As mentioned in the first part: AQS has already helped us implement the maintenance logic of the queue. When we implement the lock, we only need to rewrite the method of acquiring the lock.

  1. tryAcquire(int arg): Exclusively acquires the synchronization state, the return value is a boolean type, true is the acquisition success, false is the acquisition failure.
  2. tryRelease(int arg): Exclusively releases the synchronization state, the return value is a boolean type, true means the release is successful, and false means the release fails.
  3. tryAcquireShared(int arg): Acquires the synchronization state in a shared manner, and the return value is of type int.
    • Returns 0 for success and no remaining resources
    • A value greater than 0 is returned to indicate success, and there are still remaining resources
    • Returning a negative number means the acquisition failed
  4. tryReleaseShared(int arg): Shared release synchronization state, the return value is boolean.
    • Returns true if subsequent waiting nodes are allowed to wake up after release; otherwise returns false
  5. isHeldExclusively(): Whether the current synchronizer is exclusive to the thread

Through a simple understanding of these methods, you can initially understand:

  • When the synchronization state state is 0, it is possible for other threads to acquire the synchronization state, that is, to acquire the lock.

  • For reentrant locks, the synchronization state state is automatically incremented after the thread takes the lock exclusively. If the thread has been repeatedly acquiring the lock, the state will continue to accumulate; when the thread releases the lock, the state must be decremented to 0 to be considered a complete release.

3. AQS practical case

By using AQS, an exclusive non-reentrant lock is simply implemented, which means that the state of the lock has only two states: 0 and 1.

Focus on the Sync inner class inherited from AQS, which customizes the core logic of acquiring and releasing the synchronization state.

This case confirms this sentence: AQS is a lock-oriented implementer, which defines the interface between the implementer of the lock and the synchronizer, and hides the implementation details

class ExclusiveLock implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            // When the state is set to 1 through CAS, it means the lock is successful
            if (compareAndSetState(0 ,1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // When the lock is released, if it is found that the lock has been released, there is an exception.
            if (getState() == 0) throw new IllegalArgumentException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // When state==1, it means it is in the occupied state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

4. Detailed explanation of AQS source code

4.1 Synchronized Queue

As mentioned earlier, the synchronizer implements the management of threads and synchronization state through the synchronization queue.

Further explanation:

  1. When the current thread fails to acquire the synchronization state (lock), the synchronizer will construct the current thread as a node Node and add it to the synchronization queue, blocking the current thread
  2. When the synchronization state (lock) is released, the thread of the first node of the queue will wake up, and then the thread will try to obtain the synchronization state again.

As shown in the following figure: head points to the head node of the current queue, which has acquired the synchronization state (lock).

Nodes that fail to obtain synchronization status will be added to the queue in turn, and tail maintains the tail node.

Queue node status:

  1. CANCELLED(1): The value is 1, indicating that the current node needs to cancel the waiting due to the waiting timeout or interruption, and the node will not change after entering this state.
  2. SIGNAL(-1): The value is -1, indicating that the thread of the successor node is in a waiting state, waiting for the current node to wake up.
  3. CONDITION(-2): The value is -2, indicating that the node is waiting on the Condition. If other threads call the signal method on the Condition, the node in the CONDITION state will be transferred from the waiting queue to the synchronization queue.
  4. PROPAGATE(-3): The value is -3. In shared mode, the predecessor node will not only wake up its successor node, but also may wake up the successor node.
  5. INITIAL(0): The value is 0, representing the initial state

Observe the node status and find that: a negative value indicates that it is in a valid waiting state, while a positive value indicates that the node has been canceled.

4.2 Exclusive acquisition

The core method is acquire() , as shown below

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

This method consists of three methods: tryAcquire , acquireQueued and addWaiter .

Here is the execution flow of this method:

  1. Call the tryAcquire method to try to get the synchronization state, and its return value is of type boolean:
    • If it returns True, it means that the synchronization status was obtained successfully. And after its negation, it is False, the method ends here, and returns directly
    • If it returns False, it means that the synchronization status failed to be obtained. If it is negated, it is True, and the subsequent method needs to be further executed.
  2. After tryAcquire returns False, continue to execute the addWaiter method, construct a synchronization node, and use this method to add the constructed Node node to the tail of the queue in exclusive mode
  3. The acquireQueued method makes the node acquire the synchronization state in an infinite loop, and does not return until it is acquired
    • Returns True if it was interrupted during the infinite loop, otherwise returns False
  4. If the thread node is interrupted while waiting, it will not respond. The selfInterrupt method will not be executed until the synchronization state is obtained

Next, let's take a look at the source code of these three core methods

  • tryAcquire

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    This core method has only one line of code, and it is the code that throws an exception. If you only look at the source code, it is really confusing.

    But combined with the content of the first part and the actual combat case: when we implement a lock, we need to create an inner class inside the lock, let it inherit AQS, and rewrite the core methods in AQS. These core methods include tryAcquire. This logic of acquiring the synchronization state (lock) needs to be implemented by ourselves.

    This further explains: AQS simplifies the implementation of locks. We do not need to care about the underlying operation logic such as synchronization state management, thread queuing, waiting and wake-up, and we only need to focus on locking and unlocking the core functions of locks.

  • addWaiter: look directly at the comments

    The content executed in the if block is to add the current node to the tail of the queue

    • Note the need to use the compareAndSetTail method to ensure that nodes are added to the tail in a thread-safe manner.

      If only a simple LinkedList is used to handle node relationships, it will cause multiple threads to be added to the linked list concurrently, causing confusion in the number and order of nodes.

    private Node addWaiter(Node mode) {
        // Constructs the current thread as a node in a synchronized queue
        Node node = new Node(Thread.currentThread(), mode);
        // Get the tail node, tail is a member variable of the AQS class
        Node pred = tail;
        // If the tail node is not empty
        if (pred != null) {
            // Set the prev of the current node as the tail node
            node.prev = pred;
            // Set the current node as the new tail node
            if (compareAndSetTail(pred, node)) {
                // Set the next of the old tail node as the current node
                pred.next = node;
                return node;
            }
        }
        // If the tail node is empty, call enq spin to enter the queue
        enq(node);
        return node;
    }
    
    • enq: look directly at the comments

      private Node enq(final Node node) {
          // It's a way of spin. Spin until Node is added to the end of the queue
          for (;;) {
              // Get the tail node of the current queue
              Node t = tail;
              if (t == null) {
                  // If tail is empty, it means that the queue is empty, you need to create a new empty node as head and tail nodes
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  // If tail is not empty, it means that there are thread nodes in the queue, and enter the queue operation
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                  }
              }
          }
      }
      

      It should be noted here: when the tail node in addWaiter is empty, the enq method is called to add the node in a spin mode, which can ensure that the node is added successfully

  • acquireQueued: look directly at the comments

    final boolean acquireQueued(final Node node, int arg) {
        // failed indicates whether the synchronization status (lock) failed to be acquired
        boolean failed = true;
        try {
            // interrupted indicates whether it was interrupted during the acquisition process
            boolean interrupted = false;
            for (;;) {
                // Get its predecessor node through node.predecessor()
                final Node p = node.predecessor();
                // If the predecessor node is the head node head, it means that the current node can try to acquire the synchronization state (lock)
                if (p == head && tryAcquire(arg)) {
                    // If the acquisition is successful, the current node becomes the new head node head
                	// The setHead method will set the current node's prev to null
                    setHead(node);
                    // Setting the next of the old head node to null helps gc to recycle the old head node
                    p.next = null; 
                    // Represents the successful acquisition of the synchronization state (lock)
                    failed = false;
                    // return if interrupted
                    return interrupted;
                }
                // If the current thread node can rest, it will enter the waiting state until it is unpark ed
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // Marked as true if the waiting process was interrupted
                    interrupted = true;
            }
        } finally {
            // If the synchronization state is not successfully obtained during the waiting process (interrupted or timed out), the thread node is set to the CANCELLED state
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • shouldParkAfterFailedAcquire: This method is used to check the status and determine whether the predecessor node of the current thread node is still valid. see notes

      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;
          // If the status of the precursor node is valid and returns true, the current thread node can enter the waiting state and wait for wake-up.
          if (ws == Node.SIGNAL)
              return true;
          // If the status of the predecessor node is greater than 0, it is invalid.
          if (ws > 0) {
              // The while loop continues to find valid nodes before the predecessor node
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              // Queue the current thread node after the valid node
              pred.next = node;
          } else {
              // If the predecessor node is valid, set its state to SIGNAL in CAS mode, so as to notify the current thread node after releasing the synchronization state
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          // Return false to continue the spin check state within the acquireQueued method
          return false;
      }
      
    • parkAndCheckInterrupt: This method is used to enter the waiting state

      private final boolean parkAndCheckInterrupt() {
          LockSupport.park(this);
          return Thread.interrupted();
      }
      

4.3 Exclusive release

The release method is used to release the synchronization state, and will wake up the successor node of the current thread node, so that the successor node tries to obtain the synchronization state. The source code is as follows:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // Wake up the successor node
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • The design pattern of tryRelease is the same as tryAcquire, and we need to implement it ourselves

  • unparkSuccessor: wake up the successor node

    private void unparkSuccessor(Node node) {
        // node is the current thread node, get its status
        int ws = node.waitStatus;
        // If the status is less than 0, it needs to be set to 0 (valid to invalid), the process is allowed to fail
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // Get the successor node of the current node, the general node is not empty and valid
        Node s = node.next;
        // But if the node is empty or invalid (in CANCELLED state), you need to find a valid node
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

4.4 Shared access

Let's first look at the difference between shared and exclusive access to synchronization state:

  • At the same time, shared acquisition allows multiple threads to acquire the synchronization state at the same time, while exclusive access allows only one thread to acquire, and other threads are blocked

A classic application of shared acquisition is Semaphore, which controls the number of threads accessing a particular resource at the same time.

Therefore, in the explanations that follow, Resource is used instead of Synchronized State.

Call the acquireShared method to acquire resources in a shared way

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Among them tryAcquireShared needs to be implemented manually by ourselves, but the return result type has been defined:

  • When a negative number is returned, it means the acquisition failed and the remaining resources are insufficient
  • When it returns a non-negative number, it means the acquisition is successful. If it is 0, it means that there are no remaining resources; if it is a positive number, it means that there are remaining resources.

When it is negative, the doAcquireShared method needs to be called to enter the waiting queue, and it will not return until the resource is obtained.

private void doAcquireShared(int arg) {
    // Add the current thread node to the tail of the queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // Spin repeats attempt to acquire resource
        for (;;) {
            final Node p = node.predecessor();
            // When the predecessor node of the current thread node is the head node
            if (p == head) {
                // try to get the resource again
                int r = tryAcquireShared(arg);
                // If the number of resources is greater than or equal to 0, the acquisition is successful
                if (r >= 0) {
                    // Set the head node. If r > 0, there are remaining resources, wake up the thread node after
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4.5 Shared release

Shared release will call the releaseShared method to release the specified number of resources. The tryReleaseShared method still needs to be implemented by ourselves

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

The doReleaseShared method is mainly used to wake up the subsequent nodes in the waiting state after releasing the resources

private void doReleaseShared() {
    // Ensure that resources are released thread-safely by looping and CAS operations
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   
            break;
    }
}

5. Write it down

references:

  1. JDK5.0 source code

  2. The Art of Java Concurrent Programming

  3. Dark Horse Java Concurrent Programming Tutorial

There will be about 5 articles in this series. I try my best to write my understanding of JUC in an easy-to-understand manner.

But if there are any mistakes, please point them out, and I will learn and improve in time~

If you think my content is good, you can leave a message in the comment area to support it~

Welcome to visit my personal blog~

Tags: Java Distribution Interview Concurrent Programming programming language

Posted by saltedm8 on Sat, 22 Oct 2022 04:07:38 +1030