Producer consumer model
1. What is the producer consumer model
- Producer: the party in the program responsible for generating data
- Consumer: the party in the program responsible for processing data
2. Why introduce producer consumer model
In concurrent programming, the producer consumer model solves the strong coupling between producers and consumers through a container. Instead of direct communication, they communicate through blocking the queue. Producers (with fast production speed) don't have to wait for consumers to finish processing data. Consumers take data directly from the queue. The queue is equivalent to a buffer, which balances the working capacity of producers and consumers, Thus, the data processing speed of the whole program is improved
3. How to achieve
Through queue: producer -------- queue -------- > consumer
4. Examples of producers and consumers
from multiprocessing import Process, Queue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" time.sleep(random.randint(1, 3)) # Simulated producer data output time q.put(res) # Put the generated data into the queue print(f"\033[1;35m{name}:Produced:{res}\033[0m") def consumer(q, name): while True: res = q.get() # Fetch data if res == None: break # Judge whether it is none. None means that the queue is finished and finished time.sleep(random.randint(1, 3)) # Simulated consumer data processing time print(f"\033[1;36m{name}Yes{res}\033[0m") if __name__ == "__main__": q = Queue() # Create queue # Start three producer processes p1 = Process(target=producer, args=(q, "shawn", "sausage")) p2 = Process(target=producer, args=(q, "Patrick Star", "Hot dog")) p3 = Process(target=producer, args=(q, "Spongebob", "chicken")) # Start two consumer processes c1 = Process(target=consumer, args=(q, "Brother Octopus")) c2 = Process(target=consumer, args=(q, "Crab boss")) p1.start() p2.start() p3.start() c1.start() c2.start() # Wait for all producers to finish production and end the process p1.join() p2.join() p3.join() # The main process wants to put two None in the queue again. When the consumer gets it, the representative gets it q.put(None) q.put(None) print("plankton :main") '''output shawn:Produced:Sausage 0 Patrick Star:Produced:Hot dog 0 Brother Octopus ate sausage 0 Crab boss ate hot dog 0 Patrick Star:Produced:Hot dog 1 shawn:Produced:Sausage 1 Spongebob:Produced:Chicken 0 Brother Octopus ate hot dog 1 Spongebob:Produced:Chicken 1 Patrick Star:Produced:Hot dog 2 Brother Octopus ate chicken 0 Crab boss ate sausage 1 shawn:Produced:Sausage 2 Spongebob:Produced:Chicken 2 plankton :main Crab boss ate hot dog 2 Brother Octopus ate chicken 1 Crab boss ate sausage 2 Brother Octopus ate chicken 2 Process finished with exit code 0 '''
5. The second producer consumer model uses JoinableQueue class (understand)
- Instance of JoinableQueue class
q = JoinableQueue([maxsize]): the same as the object of the Queue, but the Queue allows the user of the item to notify the generator that the item has been successfully processed. The notification process is implemented using shared signals and condition variables
- method
method | effect |
---|---|
q.task_done( ) | The user uses this method to signal that the return item of q.get() has been processed. If this method is called more than the number of items removed from the queue, a ValueError exception is thrown |
q.join( ) | The producer calls this method to block until all items in the queue are processed. Blocking will continue until each item in the queue calls q.task_ Do() method |
from multiprocessing import Process, JoinableQueue import time, random def producer(q, name, food): for i in range(3): res = f"{food}{i}" q.put(res) time.sleep(random.randint(1, 3)) print(f"\033[1;35m{name}:Produced:{res}\033[0m") q.join() # Wait until the data put in by each producer is fetched by the consumer before ending the process def consumer(q, name): while True: res = q.get() if res == None: break time.sleep(random.randint(1, 3)) print(f"\033[1;36m{name}Yes{res}\033[0m") q.task_done() # Consumers send a task every time they take a piece of data_ Done signal, the count on the producer side is reduced by 1 accordingly if __name__ == "__main__": q = JoinableQueue() # Create an object # Create three producers p1 = Process(target=producer, args=(q, "shawn", "sausage")) p2 = Process(target=producer, args=(q, "Patrick Star", "Hot dog")) p3 = Process(target=producer, args=(q, "Spongebob", "chicken")) # Create two consumers c1 = Process(target=consumer, args=(q, "Brother Octopus")) c2 = Process(target=consumer, args=(q, "Crab boss")) # Set two consumers as daemons, the main process code ends, and the two consumer processes end accordingly c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() # Wait for the end of the three producer processes p1.join() p2.join() p3.join() 🔰#Principle analysis: for producer production data, it is assumed that a producer produces three data with queues, and each corresponding count is 3 🔰#Send a task when the consumer takes data from the queue_ The done signal is sent to the producer. The count of the producer is 3-1, and there are two left 🔰#The consumer continues to fetch data and send signals. When the count of the producer is 0, it means that the queue has been fetched. At this time, q.join() will no longer be blocked and the producer process ends 🔰#At this time, the consumer has no effect. Set the consumer process as a daemon. The main process ends when the producer process ends, and the consumer process is naturally taken away '''output shawn:Produced:Sausage 0 Spongebob:Produced:Chicken 0 Brother Octopus ate sausage 0 Patrick Star:Produced:Hot dog 0 Crab boss ate hot dog 0 shawn:Produced:Sausage 1 Spongebob:Produced:Chicken 1 Brother Octopus ate chicken 0 Crab boss ate sausage 1 shawn:Produced:Sausage 2 Patrick Star:Produced:Hot dog 1 Brother Octopus ate chicken 1 Crab boss ate hot dog 1 Spongebob:Produced:Chicken 2 Patrick Star:Produced:Hot dog 2 Brother Octopus ate sausage 2 Crab boss ate chicken 2 Brother Octopus ate hot dog 2 Process finished with exit code 0 '''
XIII Semaphore (understand)
The mutex allows only one thread to modify data at the same time, while Semaphore allows a certain number of processes to change data at the same time. For example, in a barber shop, if there are only three Tony teachers, only three people are allowed to have their hair cut at the same time. The people behind can only wait until someone has finished their hair cut. If the specified Semaphore is 3, one person obtains a lock and the count is increased by 1. When the count is equal to 3, the people behind need to wait. Once released, Someone can get a lock
from multiprocessing import Semaphore,Process import time,random def haircut(sem,name): start_time = time.time() sem.acquire() # Lock print(f"{name}Start a haircut") time.sleep(random.randint(2,3)) # Simulated haircut time print(f"{name}Haircut plus waiting time%.2f"%(time.time()-start_time)) sem.release() # Unlock if __name__ == '__main__': sem = Semaphore(3) # The maximum number of processes is 3 user_list = [] for i in range(8): p = Process(target=haircut,args=(sem,f"Star{i}")) p.start() user_list.append(p) for obj in user_list: obj.join() print("close") '''output Star 0 starts haircutting Star 1 starts to have a haircut Star 2 starts to have a haircut Star 0 haircut plus waiting time 3.00 Star 3 starts to have a haircut Star 1 haircut plus waiting time 3.00 Star 4 starts cutting hair Star 2 haircut plus waiting time 3.00 Star 5 starts to have a haircut Star 3 haircut plus waiting time 4.93 Star 6 starts to have a haircut Star 4 haircut plus waiting time 4.87 Haircut star 7 Star 5 haircut plus waiting time 5.82 Star 7 haircut plus waiting time 6.69 Star 6 haircut plus waiting time 7.74 close Process finished with exit code 0 '''