Semaphore and producer-consumer model

A series of high-quality articles will be released later here, and the classification is mainly based on language. In this article system, it is basically a relatively popular knowledge point or a relatively large knowledge point. I will reproduce it and then talk about it. talk about your understanding

classic topic

First, take out the classic milk delivery to explain the producer and consumer model. The topic is well known and I won't say much. Here we need to think about it, where is the conflict? It is in the operation of the milk carton by the producer and the consumer respectively. In the code, it is reflected in the reading and writing of a static variable (whether there is milk in the milk carton). Therefore, a lock is required here.

producer class

/**
 * @date 2021/12/6 11:16
 */
public class Produec implements Runnable{
    private  Box b;
 
    public Produec(Box b){
        this.b = b;
    }
 
    @Override
    public void run() {
        //Producers start milking
        for(int i=1;i<=10;i++){
            b.inputMilk(i);
        }
    }
}
copy

consumer class

/**
 * @date 2021/12/6 11:11
 */
public class Customer implements Runnable{
    Box box ;
    Customer(){
 
    }
    Customer(Box box){
        this.box = box;
    }
    //The consumer must take the milk box delivered by the milkman.
    @Override
    public void run() {
        while(true){
            box.getMilk();
        }
    }
}
copy

Box class

public class Box {
    //milk crate
    private int milk;
 
    //Add variables, milk box status
    private boolean state = false;
    //builder put milk
    public synchronized void inputMilk(int milk){
        //If there is milk, enter the wait state
        if(state){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //If there is no milk, produce milk
        this.milk = milk;
        System.out.println("the milkman will be" + this.milk +": put milk in");
        //After the generation is complete, modify the status of the milk box
        state = true;
        notifyAll();
    }
 
    //consumer takes milk
    public synchronized void getMilk(){
        //If there is no milk, then what to do, do not take! wait
        if(!state){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        //There is milk, sure take it
        System.out.println("The milk drinker gets the first"+ this.milk + ": Milk");
        state = false;
        notifyAll();
    }
}
copy

main program main

/**
 * @date 2021/12/6 11:20
 */
public class Main {
    public static void main(String[] args) {
 
        //shared memory area, milk crate
        Box box = new Box();
 
        //Create the producer, the milkman
        Produec produec = new Produec(box);
 
        //Create a consumer, i.e. I
        Customer customer = new Customer(box);
 
        Thread t1 = new Thread(produec);
        Thread t2 = new Thread(customer);
 
        t1.start();
        t2.start();
 
    }
}
copy

operation result

milkman will be 1st: put milk in
 The sucker got the first place: Milk
 The milkman will be the 2nd: put milk in
 The sucker got the 2nd place: Milk
 milkman will be 3rd: put milk in
 The milk drinker got the 3rd place: Milk
 milkman will be 4th: put milk in
 The sucker got the 4th: Milk
 milkman will be 5th: put milk in
 The milk drinker got the 5th: Milk
 milkman will be 6th: put milk in
 Milky gets 6th: Milk
 milkman will be 7th: put milk in
 The sucker got the 7th: Milk
 milkman will be 8th: put milk in
 The milk drinker got the 8th: Milk
 milkman will be number 9: put milk in
 The sucker got the 9th: Milk
 milkman will be 10th: put milk in
 milk drinker got 10th: Milk
copy

It's easy to understand, what if I modify the title here? Can 3 bottles of milk be placed in a milk carton at a time? how to modify

In fact, it is very simple, just modify the code in the box

box class

/**
 * @date 2021/12/6 10:59
 */
public class Box {
    //milk crate
    private int milk;
    private int max = 3;//Indicates that the milk box can only hold 3 bottles of milk at most
 
    //Modify here, the milk box can hold 3 bottles of milk
    private static int  count = 0;
    //builder put milk
    public synchronized void inputMilk(int milk) throws InterruptedException {
        //If the milk is full, to three bottles
        if(count == max){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        //If not full, continue to produce milk
        this.milk = milk;
        System.out.println("the milkman will be" + this.milk +": put milk in");
        //After the generation is complete, modify the amount of milk in the box
        count++;
        Thread.sleep(100);
        notifyAll();
    }
 
    //consumer takes milk
    public synchronized void getMilk(){
        //If there is no milk, then what to do, do not take! wait
        if(count == 0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        //There is milk, sure take it
        System.out.println("drinkers get milk");
        count --;
        notifyAll();
    }
}
copy

question

But here I encountered a bug, that is, when I set the milk in the box to 3, even if the producer stops, the consumer will not consume, and will not consume until the producer is full .

This is the case as shown above, and I didn’t figure it out in the end, maybe my strength is not enough. Here is a blind guess, it may be related to the keyword sychornized. According to my understanding, this result should be correct. When I stop the producer, the consumer will immediately step forward and consume it instead of producing three and then producing three.

But here I finally changed the milk in the box to 50, a little more, it is more normal. The principle of this bottom layer can only wait for me to come back later

question

The second is that when a consumer is new ly created in my main thread, there is an obvious phenomenon of overtaking milk. What is the reason for this? Compared with the previous one, this reason seems to have the feeling that it must be clarified.

The milk drinker got the 155th: Milk
 The milk drinker got the 156th: Milk
 The milk drinker got the 157th: Milk
 The milk drinker got the 158th: Milk
 milkman will be 24th: put milk in
 milkman will be 25th: put milk in
 The milk drinker got the 159th: Milk
copy

It can be seen that the milk is seriously overtaken here. The reason is actually very simple. If you think about it carefully, you will find that the problem lies in the sychronized keyword. This keyword locks an object under a class. As for the underlying principle, I The block is not very clear yet, and we need to continue to analyze it later

Test

Let's take a small test here: another classic question

Design requirements: Write a program to simulate the process synchronization problem in the ticket office of the station. The ticket office can accommodate up to 20 ticket buyers at any time, otherwise they need to wait outside. Each ticket buyer can be regarded as a process.

Reference: We can regard the inside of the station as a shared area, and the ticket office is equivalent to a consumer, and the station entrance is equivalent to a producer

consumer class

public class Customer implements Runnable{
    //The consumer class is the window
    Station station;
 
    Customer(Station station){
        this.station = station;//Make sure the window is inside this station
    }
 
    @Override
    public void run(){
        while (true){
            try {
                station.getPeople();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
copy

producer class

public class Produce implements Runnable{
    //The producer is the gate of the platform, allowing ticket buyers to enter
    Station station;
 
    static int i =0 ;//Indicates the number of foreigners
 
    Produce(Station station){
        this.station =station;
    }
 
    @Override
    public void run(){
        while (true){
            i++;
            try {
                station.inputPeople(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
copy

inside the station

public class Station {
 
    private int max =20;//Indicates that the maximum capacity of the platform is 20 people
 
    private int sum;//Indicates the number of people
 
    private static int count = 0;//Note that static variables are required here
 
    /**
     * The sum parameter means the number from the outside, or how many in total
     * @param sum
     */
    public synchronized void inputPeople(int sum) throws InterruptedException {
        //If it's full, you don't need to enter
        if(count == max){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        //If not full, continue to put in
        this.sum = sum;
        System.out.println("Inside the station"+this.sum+"people come in!");
        //After entering, modify the number of people in the station
        count++;
        Thread.sleep(500);
        notifyAll();//wake up other threads
 
    }
 
    public synchronized void getPeople() throws InterruptedException {
        //If there is no one in the station, it will not be possible to sell tickets
        if(count == 0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        //someone at the station
        count--;
        System.out.println("A passenger bought a ticket successfully!,left in the station"+count+"people");
        Thread.sleep(1000);//It is assumed here that the processing time for ticket sales is relatively long, so it may be full
        notifyAll();
    }
 
}
copy

main function

public class Main {
    public static void main(String[] args) {
 
        //shared memory area
        Station station = new Station();
 
        //Create platform openings
        Produce produce = new Produce(station);
 
        //Create a ticket slot
        Customer customer =new Customer(station);
 
        Thread t1 = new Thread(produce);
        Thread t2 = new Thread(customer);
 
        t1.start();
        t2.start();
 
    }
}
copy

running result

The first person in the station came in!
...
The 16th person in the station came in!
A passenger bought a ticket successfully!,15 people left in the station
...
A passenger bought a ticket successfully!,0 people left in the station
 The 17th person in the station came in!
...
The 26th person in the station came in!
A passenger bought a ticket successfully!,There are 9 people left in the station
copy

Semaphore

definition

Semaphore (semaphore) is used to control the number of threads that access specific resources at the same time, and coordinate each thread to ensure the reasonable use of common resources.

Semaphore controls access to shared resources by using counters. If the counter is greater than 0, access is allowed. If 0, access is denied. The counter counts the permissions that allow access to the shared resource. Therefore, to access a resource, a thread must be granted permission from the semaphore.

main method

void acquire() : Obtain a license from the semaphore, if no license is available, it will block and wait,
 
void acquire(int permits) : Get the specified number of licenses, if no licenses are available, it will block and wait
 
boolean tryAcquire(): Attempts to get a permit from the semaphore, returns if no permit is available false,will not block
 
boolean tryAcquire(int permits):  Attempts to acquire the specified number of licenses, returning if no licenses are available false
 
boolean tryAcquire(int permits, long timeout, TimeUnit unit): 
Attempt to obtain permission from the semaphore within the specified time, and return if successful within the specified time true,otherwise return false
 
void release(): To release a permission, don't forget to add the finally Note: Calling this method multiple times will increase the number of semaphore permissions to achieve the effect of dynamic expansion, such as: initial permits is 1, called twice release,The maximum allowed will change to 2
 
int availablePermits():  Get the licenses available for the current semaphore
copy

Semaphore constructor

 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
 
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
 
permits The initial number of permits, that is, the maximum number of access threads
fair when set to false When , the created semaphore is an unfair lock; when set to true The semaphore is a fair lock when
copy

Summarize

Semaphore is an effective flow control tool, which is implemented based on AQS shared lock. We often use it to control access to limited resources.

  • Before each resource is used, first apply for a semaphore. If the number of resources is not enough, it will block and wait;
  • Every time a resource is released, a semaphore is released.

application

/**
 * Only 20 people can enter the station
 * This version is wrong. On the surface, the result is correct, but think about it carefully:
 * Acquiring permits on inbound and releasing permits on outbound,
 * But the impact of this is that when the station is not full,
 */
public class Station1 {
    static int N = 0;//Number of people at the station
    static int M = 0;//current number
    Semaphore people =new Semaphore(20);
 
    /**
     * pit stop
     */
   void customIn() {
       new Thread() {
           @Override
           public void run() {
               while (true) {
                   try {
                       people.acquire();
                       M++;
                       N++;
                       System.out.println("No."+M+"individual entry station");
                       System.out.println("Current number of people at the station:"+N);
                       sleep(1000);
                   } catch (InterruptedException e) {
                       System.out.println("Why don't you let me in?");
                       e.printStackTrace();
                   }
               }
           }
       }.start();
    }
 
    /**
     * outbound
     */
    void customOut() {
        new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        people.release();
                        System.out.println("No."+M+"individual ticket");
                        //play after buying tickets
                        N--;
                        sleep(2000);//Assuming it takes a little longer to buy tickets
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                }
            }.start();
    }
    public static void main(String[] args) {
       Station1 station=new Station1();
        station.customIn();
//        station.customOut();
 
 
    }
}
copy

The above is the semaphore solution to the station problem

Let's look at a semaphore solution to an Apple problem

import java.util.concurrent.Semaphore;
/**
 * Implement the producer consumer problem using semaphore semaphores.
 * The essence of the producer-consumer problem is actually a problem of synchronization and mutual exclusion.
 * The problem of synchronization is actually a problem of one after the other. A certain process or program must be executed in front of or behind the other, and they cannot be executed at the same time, which is what it means.
 * Mutual exclusion problem, that is, a resource cannot be occupied by two processes at the same time
 */
public class Test {
    //For example, this is the number of apples.
    private static Integer count = 0;
 
    //Create three semaphores
 
    //This is a producer license, and you can obtain up to 10 licenses, which means you can produce up to 10 apples.
    final Semaphore provider = new Semaphore(10);
    //This is a consumer license. an empty plate.
    final Semaphore consumer = new Semaphore(0);
    //The place to reflect mutual exclusion: to ensure the alternation between producers and consumers, so set up a mutex semaphore
    final Semaphore mutex = new Semaphore(1);
 
    public static void main(String[] args) {
        Test test = new Test();
        new Thread(test.new Producer()).start();
        new Thread(test.new Consumer()).start();
        new Thread(test.new Producer()).start();
        new Thread(test.new Consumer()).start();
        new Thread(test.new Producer()).start();
        new Thread(test.new Consumer()).start();
        new Thread(test.new Producer()).start();
        new Thread(test.new Consumer()).start();
    }
    //The acquire() method attempts to obtain an admission permission. If not, the thread waits until a thread releases a permit or the current thread is interrupted.
    //The release() method is used to release a license after the thread has finished accessing the resource. to make resource access available to other threads waiting for permission.
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    provider.acquire();//The producer goes first and gets permission, because final Semaphore provider = new Semaphore(10);
                    mutex.acquire();
                    count++;// produce an apple
                    System.out.println(Thread.currentThread().getName()
                            + "Producers produce, there are currently a total of" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    // The relationship between acquire and release:
                    // There is no real license object included in the implementation, and the Semaphore does not associate a license with a thread, so a license acquired in one thread can be released in another. That is to say, there is no mandatory one-to-one relationship between acquire and release
                    consumer.release();//After the apple is produced, the consumer obtains the license.
 
                }
            }
        }
    }
 
    class Consumer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    consumer.acquire();
                    mutex.acquire();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "Consumer consumption, currently a total of" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    provider.release();
                }
            }
        }
    }
}
copy

Here is the producer and consumer model, but the code is a little different. In the station we only use one semaphore variable, but in Apple I use three. why? Is there a difference?

Thoughts in the Semaphore

Regarding the above two, I have an opinion: First of all, we need to understand the mechanism of semaphore. In fact, when declaring, the default unfair lock in the constructor, the parameter peritms is very important. This is a permission number, which means that the current How many threads are supported at most to access. Therefore, in fact, the second way of writing is more perfect. A mutx ensures that only one thread reads and writes at a time. But because it happens that the model in our example is a variable quantity, so it doesn't need to be so troublesome.

Four ways to implement the consumer-producer model

Reference: https://blog.csdn.net/qq_34814092/article/details/124269492

https://99ding.men/api/v1/client/subscribe?token=a8f9450b84a1e1076dd5e098394868d3

Tags: Cyber Security https

Posted by astaroth on Thu, 01 Dec 2022 20:51:45 +1030