A series of high-quality articles will be released later here, and the classification is mainly based on language. In this article system, it is basically a relatively popular knowledge point or a relatively large knowledge point. I will reproduce it and then talk about it. talk about your understanding
classic topic
First, take out the classic milk delivery to explain the producer and consumer model. The topic is well known and I won't say much. Here we need to think about it, where is the conflict? It is in the operation of the milk carton by the producer and the consumer respectively. In the code, it is reflected in the reading and writing of a static variable (whether there is milk in the milk carton). Therefore, a lock is required here.
producer class
copy/** * @date 2021/12/6 11:16 */ public class Produec implements Runnable{ private Box b; public Produec(Box b){ this.b = b; } @Override public void run() { //Producers start milking for(int i=1;i<=10;i++){ b.inputMilk(i); } } }
consumer class
copy/** * @date 2021/12/6 11:11 */ public class Customer implements Runnable{ Box box ; Customer(){ } Customer(Box box){ this.box = box; } //The consumer must take the milk box delivered by the milkman. @Override public void run() { while(true){ box.getMilk(); } } }
Box class
copypublic class Box { //milk crate private int milk; //Add variables, milk box status private boolean state = false; //builder put milk public synchronized void inputMilk(int milk){ //If there is milk, enter the wait state if(state){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //If there is no milk, produce milk this.milk = milk; System.out.println("the milkman will be" + this.milk +": put milk in"); //After the generation is complete, modify the status of the milk box state = true; notifyAll(); } //consumer takes milk public synchronized void getMilk(){ //If there is no milk, then what to do, do not take! wait if(!state){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //There is milk, sure take it System.out.println("The milk drinker gets the first"+ this.milk + ": Milk"); state = false; notifyAll(); } }
main program main
copy/** * @date 2021/12/6 11:20 */ public class Main { public static void main(String[] args) { //shared memory area, milk crate Box box = new Box(); //Create the producer, the milkman Produec produec = new Produec(box); //Create a consumer, i.e. I Customer customer = new Customer(box); Thread t1 = new Thread(produec); Thread t2 = new Thread(customer); t1.start(); t2.start(); } }
operation result
copymilkman will be 1st: put milk in The sucker got the first place: Milk The milkman will be the 2nd: put milk in The sucker got the 2nd place: Milk milkman will be 3rd: put milk in The milk drinker got the 3rd place: Milk milkman will be 4th: put milk in The sucker got the 4th: Milk milkman will be 5th: put milk in The milk drinker got the 5th: Milk milkman will be 6th: put milk in Milky gets 6th: Milk milkman will be 7th: put milk in The sucker got the 7th: Milk milkman will be 8th: put milk in The milk drinker got the 8th: Milk milkman will be number 9: put milk in The sucker got the 9th: Milk milkman will be 10th: put milk in milk drinker got 10th: Milk
It's easy to understand, what if I modify the title here? Can 3 bottles of milk be placed in a milk carton at a time? how to modify
In fact, it is very simple, just modify the code in the box
box class
copy/** * @date 2021/12/6 10:59 */ public class Box { //milk crate private int milk; private int max = 3;//Indicates that the milk box can only hold 3 bottles of milk at most //Modify here, the milk box can hold 3 bottles of milk private static int count = 0; //builder put milk public synchronized void inputMilk(int milk) throws InterruptedException { //If the milk is full, to three bottles if(count == max){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //If not full, continue to produce milk this.milk = milk; System.out.println("the milkman will be" + this.milk +": put milk in"); //After the generation is complete, modify the amount of milk in the box count++; Thread.sleep(100); notifyAll(); } //consumer takes milk public synchronized void getMilk(){ //If there is no milk, then what to do, do not take! wait if(count == 0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //There is milk, sure take it System.out.println("drinkers get milk"); count --; notifyAll(); } }
question
But here I encountered a bug, that is, when I set the milk in the box to 3, even if the producer stops, the consumer will not consume, and will not consume until the producer is full .

This is the case as shown above, and I didn’t figure it out in the end, maybe my strength is not enough. Here is a blind guess, it may be related to the keyword sychornized. According to my understanding, this result should be correct. When I stop the producer, the consumer will immediately step forward and consume it instead of producing three and then producing three.
But here I finally changed the milk in the box to 50, a little more, it is more normal. The principle of this bottom layer can only wait for me to come back later
question
The second is that when a consumer is new ly created in my main thread, there is an obvious phenomenon of overtaking milk. What is the reason for this? Compared with the previous one, this reason seems to have the feeling that it must be clarified.
copyThe milk drinker got the 155th: Milk The milk drinker got the 156th: Milk The milk drinker got the 157th: Milk The milk drinker got the 158th: Milk milkman will be 24th: put milk in milkman will be 25th: put milk in The milk drinker got the 159th: Milk
It can be seen that the milk is seriously overtaken here. The reason is actually very simple. If you think about it carefully, you will find that the problem lies in the sychronized keyword. This keyword locks an object under a class. As for the underlying principle, I The block is not very clear yet, and we need to continue to analyze it later
Test
Let's take a small test here: another classic question
Design requirements: Write a program to simulate the process synchronization problem in the ticket office of the station. The ticket office can accommodate up to 20 ticket buyers at any time, otherwise they need to wait outside. Each ticket buyer can be regarded as a process.
Reference: We can regard the inside of the station as a shared area, and the ticket office is equivalent to a consumer, and the station entrance is equivalent to a producer
consumer class
copypublic class Customer implements Runnable{ //The consumer class is the window Station station; Customer(Station station){ this.station = station;//Make sure the window is inside this station } @Override public void run(){ while (true){ try { station.getPeople(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
producer class
copypublic class Produce implements Runnable{ //The producer is the gate of the platform, allowing ticket buyers to enter Station station; static int i =0 ;//Indicates the number of foreigners Produce(Station station){ this.station =station; } @Override public void run(){ while (true){ i++; try { station.inputPeople(i); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
inside the station
copypublic class Station { private int max =20;//Indicates that the maximum capacity of the platform is 20 people private int sum;//Indicates the number of people private static int count = 0;//Note that static variables are required here /** * The sum parameter means the number from the outside, or how many in total * @param sum */ public synchronized void inputPeople(int sum) throws InterruptedException { //If it's full, you don't need to enter if(count == max){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //If not full, continue to put in this.sum = sum; System.out.println("Inside the station"+this.sum+"people come in!"); //After entering, modify the number of people in the station count++; Thread.sleep(500); notifyAll();//wake up other threads } public synchronized void getPeople() throws InterruptedException { //If there is no one in the station, it will not be possible to sell tickets if(count == 0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //someone at the station count--; System.out.println("A passenger bought a ticket successfully!,left in the station"+count+"people"); Thread.sleep(1000);//It is assumed here that the processing time for ticket sales is relatively long, so it may be full notifyAll(); } }
main function
copypublic class Main { public static void main(String[] args) { //shared memory area Station station = new Station(); //Create platform openings Produce produce = new Produce(station); //Create a ticket slot Customer customer =new Customer(station); Thread t1 = new Thread(produce); Thread t2 = new Thread(customer); t1.start(); t2.start(); } }
running result
copyThe first person in the station came in! ... The 16th person in the station came in! A passenger bought a ticket successfully!,15 people left in the station ... A passenger bought a ticket successfully!,0 people left in the station The 17th person in the station came in! ... The 26th person in the station came in! A passenger bought a ticket successfully!,There are 9 people left in the station
Semaphore
definition
Semaphore (semaphore) is used to control the number of threads that access specific resources at the same time, and coordinate each thread to ensure the reasonable use of common resources.
Semaphore controls access to shared resources by using counters. If the counter is greater than 0, access is allowed. If 0, access is denied. The counter counts the permissions that allow access to the shared resource. Therefore, to access a resource, a thread must be granted permission from the semaphore.
main method
copyvoid acquire() : Obtain a license from the semaphore, if no license is available, it will block and wait, void acquire(int permits) : Get the specified number of licenses, if no licenses are available, it will block and wait boolean tryAcquire(): Attempts to get a permit from the semaphore, returns if no permit is available false,will not block boolean tryAcquire(int permits): Attempts to acquire the specified number of licenses, returning if no licenses are available false boolean tryAcquire(int permits, long timeout, TimeUnit unit): Attempt to obtain permission from the semaphore within the specified time, and return if successful within the specified time true,otherwise return false void release(): To release a permission, don't forget to add the finally Note: Calling this method multiple times will increase the number of semaphore permissions to achieve the effect of dynamic expansion, such as: initial permits is 1, called twice release,The maximum allowed will change to 2 int availablePermits(): Get the licenses available for the current semaphore
Semaphore constructor
copypublic Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } permits The initial number of permits, that is, the maximum number of access threads fair when set to false When , the created semaphore is an unfair lock; when set to true The semaphore is a fair lock when
Summarize
Semaphore is an effective flow control tool, which is implemented based on AQS shared lock. We often use it to control access to limited resources.
- Before each resource is used, first apply for a semaphore. If the number of resources is not enough, it will block and wait;
- Every time a resource is released, a semaphore is released.
application
copy/** * Only 20 people can enter the station * This version is wrong. On the surface, the result is correct, but think about it carefully: * Acquiring permits on inbound and releasing permits on outbound, * But the impact of this is that when the station is not full, */ public class Station1 { static int N = 0;//Number of people at the station static int M = 0;//current number Semaphore people =new Semaphore(20); /** * pit stop */ void customIn() { new Thread() { @Override public void run() { while (true) { try { people.acquire(); M++; N++; System.out.println("No."+M+"individual entry station"); System.out.println("Current number of people at the station:"+N); sleep(1000); } catch (InterruptedException e) { System.out.println("Why don't you let me in?"); e.printStackTrace(); } } } }.start(); } /** * outbound */ void customOut() { new Thread() { @Override public void run() { while (true) { try { people.release(); System.out.println("No."+M+"individual ticket"); //play after buying tickets N--; sleep(2000);//Assuming it takes a little longer to buy tickets } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } public static void main(String[] args) { Station1 station=new Station1(); station.customIn(); // station.customOut(); } }
The above is the semaphore solution to the station problem
Let's look at a semaphore solution to an Apple problem
copyimport java.util.concurrent.Semaphore; /** * Implement the producer consumer problem using semaphore semaphores. * The essence of the producer-consumer problem is actually a problem of synchronization and mutual exclusion. * The problem of synchronization is actually a problem of one after the other. A certain process or program must be executed in front of or behind the other, and they cannot be executed at the same time, which is what it means. * Mutual exclusion problem, that is, a resource cannot be occupied by two processes at the same time */ public class Test { //For example, this is the number of apples. private static Integer count = 0; //Create three semaphores //This is a producer license, and you can obtain up to 10 licenses, which means you can produce up to 10 apples. final Semaphore provider = new Semaphore(10); //This is a consumer license. an empty plate. final Semaphore consumer = new Semaphore(0); //The place to reflect mutual exclusion: to ensure the alternation between producers and consumers, so set up a mutex semaphore final Semaphore mutex = new Semaphore(1); public static void main(String[] args) { Test test = new Test(); new Thread(test.new Producer()).start(); new Thread(test.new Consumer()).start(); new Thread(test.new Producer()).start(); new Thread(test.new Consumer()).start(); new Thread(test.new Producer()).start(); new Thread(test.new Consumer()).start(); new Thread(test.new Producer()).start(); new Thread(test.new Consumer()).start(); } //The acquire() method attempts to obtain an admission permission. If not, the thread waits until a thread releases a permit or the current thread is interrupted. //The release() method is used to release a license after the thread has finished accessing the resource. to make resource access available to other threads waiting for permission. class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { provider.acquire();//The producer goes first and gets permission, because final Semaphore provider = new Semaphore(10); mutex.acquire(); count++;// produce an apple System.out.println(Thread.currentThread().getName() + "Producers produce, there are currently a total of" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); // The relationship between acquire and release: // There is no real license object included in the implementation, and the Semaphore does not associate a license with a thread, so a license acquired in one thread can be released in another. That is to say, there is no mandatory one-to-one relationship between acquire and release consumer.release();//After the apple is produced, the consumer obtains the license. } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { consumer.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "Consumer consumption, currently a total of" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); provider.release(); } } } } }
Here is the producer and consumer model, but the code is a little different. In the station we only use one semaphore variable, but in Apple I use three. why? Is there a difference?
Thoughts in the Semaphore
Regarding the above two, I have an opinion: First of all, we need to understand the mechanism of semaphore. In fact, when declaring, the default unfair lock in the constructor, the parameter peritms is very important. This is a permission number, which means that the current How many threads are supported at most to access. Therefore, in fact, the second way of writing is more perfect. A mutx ensures that only one thread reads and writes at a time. But because it happens that the model in our example is a variable quantity, so it doesn't need to be so troublesome.
Four ways to implement the consumer-producer model
Reference: https://blog.csdn.net/qq_34814092/article/details/124269492
https://99ding.men/api/v1/client/subscribe?token=a8f9450b84a1e1076dd5e098394868d3