Producer consumer model

Producer consumer model

1. What is the producer consumer model

  • Producer: the party in the program responsible for generating data
  • Consumer: the party in the program responsible for processing data

2. Why introduce producer consumer model

In concurrent programming, the producer consumer model solves the strong coupling between producers and consumers through a container. Instead of direct communication, they communicate through blocking the queue. Producers (with fast production speed) don't have to wait for consumers to finish processing data. Consumers take data directly from the queue. The queue is equivalent to a buffer, which balances the working capacity of producers and consumers, Thus, the data processing speed of the whole program is improved

3. How to achieve

Through queue: producer -------- queue -------- > consumer

4. Examples of producers and consumers

from multiprocessing import Process, Queue
import time, random


def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        time.sleep(random.randint(1, 3))  # Simulated producer data output time
        q.put(res)             # Put the generated data into the queue
        print(f"\033[1;35m{name}:Produced:{res}\033[0m")


def consumer(q, name):
    while True:
        res = q.get()           # Fetch data
        if res == None: break   # Judge whether it is none. None means that the queue is finished and finished
        time.sleep(random.randint(1, 3))   # Simulated consumer data processing time
        print(f"\033[1;36m{name}Yes{res}\033[0m")


if __name__ == "__main__":
    q = Queue()  # Create queue
    # Start three producer processes
    p1 = Process(target=producer, args=(q, "shawn", "sausage")) 
    p2 = Process(target=producer, args=(q, "Patrick Star", "Hot dog"))
    p3 = Process(target=producer, args=(q, "Spongebob", "chicken"))
    # Start two consumer processes
    c1 = Process(target=consumer, args=(q, "Brother Octopus"))
    c2 = Process(target=consumer, args=(q, "Crab boss"))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # Wait for all producers to finish production and end the process
    p1.join()
    p2.join()
    p3.join()
    # The main process wants to put two None in the queue again. When the consumer gets it, the representative gets it
    q.put(None)
    q.put(None)
    print("plankton :main")
    
'''output
shawn:Produced:Sausage 0
 Patrick Star:Produced:Hot dog 0
 Brother Octopus ate sausage 0
 Crab boss ate hot dog 0
 Patrick Star:Produced:Hot dog 1
shawn:Produced:Sausage 1
 Spongebob:Produced:Chicken 0
 Brother Octopus ate hot dog 1
 Spongebob:Produced:Chicken 1
 Patrick Star:Produced:Hot dog 2
 Brother Octopus ate chicken 0
 Crab boss ate sausage 1
shawn:Produced:Sausage 2
 Spongebob:Produced:Chicken 2
 plankton :main
 Crab boss ate hot dog 2
 Brother Octopus ate chicken 1
 Crab boss ate sausage 2
 Brother Octopus ate chicken 2
Process finished with exit code 0
'''

5. The second producer consumer model uses JoinableQueue class (understand)

  • Instance of JoinableQueue class

q = JoinableQueue([maxsize]): the same as the object of the Queue, but the Queue allows the user of the item to notify the generator that the item has been successfully processed. The notification process is implemented using shared signals and condition variables

  • method
methodeffect
q.task_done( )The user uses this method to signal that the return item of q.get() has been processed. If this method is called more than the number of items removed from the queue, a ValueError exception is thrown
q.join( )The producer calls this method to block until all items in the queue are processed. Blocking will continue until each item in the queue calls q.task_ Do() method
from multiprocessing import Process, JoinableQueue
import time, random


def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        q.put(res)
        time.sleep(random.randint(1, 3))
        print(f"\033[1;35m{name}:Produced:{res}\033[0m")
    q.join()  # Wait until the data put in by each producer is fetched by the consumer before ending the process


def consumer(q, name):
    while True:
        res = q.get()
        if res == None: break
        time.sleep(random.randint(1, 3))
        print(f"\033[1;36m{name}Yes{res}\033[0m")
        q.task_done()  
        # Consumers send a task every time they take a piece of data_ Done signal, the count on the producer side is reduced by 1 accordingly


if __name__ == "__main__":
    q = JoinableQueue()  # Create an object
    # Create three producers
    p1 = Process(target=producer, args=(q, "shawn", "sausage"))
    p2 = Process(target=producer, args=(q, "Patrick Star", "Hot dog"))
    p3 = Process(target=producer, args=(q, "Spongebob", "chicken"))
    # Create two consumers
    c1 = Process(target=consumer, args=(q, "Brother Octopus"))
    c2 = Process(target=consumer, args=(q, "Crab boss"))
    # Set two consumers as daemons, the main process code ends, and the two consumer processes end accordingly
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # Wait for the end of the three producer processes
    p1.join()
    p2.join()
    p3.join()
🔰#Principle analysis: for producer production data, it is assumed that a producer produces three data with queues, and each corresponding count is 3
🔰#Send a task when the consumer takes data from the queue_ The done signal is sent to the producer. The count of the producer is 3-1, and there are two left
🔰#The consumer continues to fetch data and send signals. When the count of the producer is 0, it means that the queue has been fetched. At this time, q.join() will no longer be blocked and the producer process ends
🔰#At this time, the consumer has no effect. Set the consumer process as a daemon. The main process ends when the producer process ends, and the consumer process is naturally taken away

'''output
shawn:Produced:Sausage 0
 Spongebob:Produced:Chicken 0
 Brother Octopus ate sausage 0
 Patrick Star:Produced:Hot dog 0
 Crab boss ate hot dog 0
shawn:Produced:Sausage 1
 Spongebob:Produced:Chicken 1
 Brother Octopus ate chicken 0
 Crab boss ate sausage 1
shawn:Produced:Sausage 2
 Patrick Star:Produced:Hot dog 1
 Brother Octopus ate chicken 1
 Crab boss ate hot dog 1
 Spongebob:Produced:Chicken 2
 Patrick Star:Produced:Hot dog 2
 Brother Octopus ate sausage 2
 Crab boss ate chicken 2
 Brother Octopus ate hot dog 2
Process finished with exit code 0
'''

XIII Semaphore (understand)

The mutex allows only one thread to modify data at the same time, while Semaphore allows a certain number of processes to change data at the same time. For example, in a barber shop, if there are only three Tony teachers, only three people are allowed to have their hair cut at the same time. The people behind can only wait until someone has finished their hair cut. If the specified Semaphore is 3, one person obtains a lock and the count is increased by 1. When the count is equal to 3, the people behind need to wait. Once released, Someone can get a lock

from multiprocessing import Semaphore,Process
import time,random

def haircut(sem,name):
    start_time = time.time()
    sem.acquire()  # Lock
    print(f"{name}Start a haircut")
    time.sleep(random.randint(2,3)) # Simulated haircut time
    print(f"{name}Haircut plus waiting time%.2f"%(time.time()-start_time))
    sem.release()  # Unlock

if __name__ == '__main__':
    sem = Semaphore(3)  # The maximum number of processes is 3
    user_list = []
    for i in range(8):
        p = Process(target=haircut,args=(sem,f"Star{i}"))
        p.start()
        user_list.append(p)

    for obj in user_list:
        obj.join()
    print("close")
    
'''output
 Star 0 starts haircutting
 Star 1 starts to have a haircut
 Star 2 starts to have a haircut
 Star 0 haircut plus waiting time 3.00
 Star 3 starts to have a haircut
 Star 1 haircut plus waiting time 3.00
 Star 4 starts cutting hair
 Star 2 haircut plus waiting time 3.00
 Star 5 starts to have a haircut
 Star 3 haircut plus waiting time 4.93
 Star 6 starts to have a haircut
 Star 4 haircut plus waiting time 4.87
 Haircut star 7
 Star 5 haircut plus waiting time 5.82
 Star 7 haircut plus waiting time 6.69
 Star 6 haircut plus waiting time 7.74
 close
Process finished with exit code 0
'''

Tags: Python Multithreading multiple processes

Posted by mndwn on Sun, 17 Apr 2022 23:42:51 +0930