Exclusive mode and sharing mode of AQS

Exclusive mode and sharing mode of AQS

As ReentrantLock is an exclusive lock, the knowledge of exclusive lock can refer to AQS theoretical knowledge (I) and AQS fair lock and unfair lock (II). This paper focuses on the sharing mode. And the explanation of sharing mode in this article is mainly CountDownLatch.

1, Concept

AQS provides two working modes: exclusive mode and shared mode. All its subclasses either implement and use the API of its exclusive function or use the API of shared function instead of using two sets of APIs at the same time. Even its most famous subclass ReentrantReadWriteLock is realized through two internal classes: read lock and write lock.

Exclusive mode means that when a lock is successfully acquired by a thread, other threads cannot acquire the lock. Shared mode means that when a lock is successfully acquired by a thread, other threads may still acquire the lock.

1.1 exclusive mode

At the same time, only one thread can get the lock to execute, and the lock status is only 0 and 1.

1.2 sharing mode

At the same time, multiple threads can get the lock and work together. The state of the lock is greater than or equal to 0.

1.3 API comparison

Exclusive Mode Sharing mode
tryAcquire(int arg)tryAcquireShared(int arg)
acquire(int arg)acquireShared(int arg)
acquireQueued(final Node node, int arg)doAcquireShared(int arg)
tryRelease(int arg)tryReleaseShared(int arg)
release(int arg)releaseShared(int arg)

2, AQS important method

2.1 acquireShared

The calling chain of this method: tryAcquireShared - > doAcquireShared. According to the specific requirements of the elicited subclass, the elicited subclass is used to analyze the important methods. doAcquireShared has been implemented in AQS and can be called directly.

/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    /*Set the node to sharing mode and insert it into the tail of the queue*/
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            /*Obtain the precursor node. If it is the head node, try to obtain the lock*/
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                /*r>=0 Indicates that the lock is obtained successfully and the thread is awakened*/
                if (r >= 0) {
                   /*Set up a new head node and wake up the thread in the synchronization queue*/
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    /*Detect whether the thread has been interrupted. If it has been interrupted, call the interrupt logic again*/
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

The main path of doAcquireShared method is addwaiter - > tryacquireshared - > sethead() - > doreleaseshared(), which corresponds to the following businesses:

2.1.1 thread information is packaged into nodes and added to the tail of the queue;

2.1.2 competitive lock resources

2.1.3 get the node head node and put it first in the queue

2.1.4 wake up the node after the current node. If the queue header is modified, wake up the next one circularly.

2.2 releaseShared

Call chain of this method: tryrereleaseshared - > dorreleaseshared. Tryrereleaseshared is implemented by subclasses according to different business requirements. It is specifically analyzed in the important methods of Semaphore. doReleaseShared has been implemented in AQS and can be called directly.

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        // Infinite loop
        for (;;) {
            // Save header node
            Node h = head;
            if (h != null && h != tail) { // The head node is not empty and the head node is not the tail node
                // Get the waiting state of the header node
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) { // The status is SIGNAL
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // Continue without success
                        continue;            // loop to recheck cases
                    // Release successor node
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // Status 0 and unsuccessful, continue
                    continue;                // loop on failed CAS
            }
            if (h == head) // If the head node changes, continue the cycle  
                break;
        }
    }

releaseShared realizes the accelerated wake-up, which is mainly realized by the logic of h == head. For example, in a scenario where h == head does not hold in doreaseshared, thread a at the user level releases the lock, and thread t1 at the head of the queue is awakened. t1 calls setHeadAndPropagate method to set the head node to t1, but does not call unparksuccess method in doreaseshared. At this time, thread b at the user level releases the lock, Wake up the thread t2 at the head of the queue. t2 calls setHeadAndPropagate to set the new head node to t2. At this time, t1 continues to execute. Finally, it is found that the head element of the queue has changed. Continue the for loop to call the unparksuccess method to wake up the head element.

When h == head in doReleaseShared is not established, the logic of entering the for loop to continuously wake up the threads in the synchronization queue is mainly an optimization logic to accelerate wake-up. When the head node changes, it indicates that more than one thread releases the lock at this time. In the sharing mode, the lock can be held by more than one thread, so it should tend to wake up more threads in the synchronization queue to obtain the lock.

3, CountDownLatch important methods

3.1 concept

CountDownLatch is a synchronization tool class used to coordinate synchronization between multiple threads.

CountDownLatch enables a thread to wait for other threads to complete their work before continuing execution. It is implemented with a counter. The initial value of the counter is the number of threads. When each thread completes its task, the value of the counter will be reduced by one. When the counter value is 0, it means that all threads have completed some tasks, and then the thread waiting on CountDownLatch can resume executing the next task.

3.2 properties

3.2.1 Sync class

The help class is also an abstract class. It inherits AQS and adds some methods as needed for CountDownLatch to call.

Sync construction method () 1.2
//The initialization value is used for counting
Sync(int count) {
    setState(count);
}
3.2.1.2 tryAcquireShared
//If the state value is equal to, the lock is valid; otherwise, the lock is invalid
protected int tryAcquireShared(int acquires) {
  return (getState() == 0) ? 1 : -1;
}
3.2.1.3 tryReleaseShared
 //Call once and subtract the initial value once
 protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
}
3.2.1.4 getCount

Get the value of the lock.

3.2.2 CountDownLatch() construction method

The constructor can construct a CountDownLatch initialized with a given count, and the constructor completes the initialization of sync and sets the number of states.

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

3.2.3 countDown()

Function: this function decrements the count of the latch. If the count reaches zero, all waiting threads will be released.

    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

As can be seen from the code, the main process of countDown () is tryrereleaseshared - > dorreleaseshared The tryrereleaseshared method has been briefly analyzed above. Each time the tryrereleaseshared method is called, the state value will be reduced by one. Only when the value is reduced to 0, it starts to execute dorreleaseshared() to release the thread;

3.2.4 await()

This function causes the current thread to wait until the latch counts down to zero, unless the thread is interrupted.

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

It can be seen from the code that the main method call chain is tryacquireshared - > doacquiresharedinterruptible. Doacquireeshared interruptibly is similar to the doacquireeshared method body. This article will not cover it, but focus on the logic.

When tryAcquireShared is less than 0, it indicates that the state has not been consumed, and the thread stays at the code boundary to continue consumption.

Note: CountDownLatch does not use queues, only spins.

4, Summary

4.1 differences

4.1.1 node object

In the Node class, use nextWaiter to identify nodes in SHARED mode and EXCLUSIVE mode.

The node object of shared mode is

static final Node SHARED = new Node();
Node node = new Node(Thread.currentThread(), mode);       

The node object in exclusive mode is

 static final Node EXCLUSIVE = null;
 Node node = new Node(Thread.currentThread(), mode);

4.1.2 timing of thread wake-up

In the shared mode, the head node can wake up the successor node immediately after acquiring the shared lock without waiting for the release of the shared lock. There are two wake-up threads. One is that the subsequent thread can wake up immediately after acquiring the shared lock, but the subsequent thread must be the thread in the shared mode; The second is to wake up the subsequent thread after releasing the lock. Here, I think the subsequent thread wake up after releasing the lock can include exclusive mode, but the premise is that all shared mode locks in front of all exclusive modes have been released.

Tags: Java Multithreading

Posted by lional on Thu, 14 Apr 2022 10:17:25 +0930