Process pool (multiprocess.Pool)

Process pool (multiprocess.Pool)

1, Process pool concept

1. What is a process pool?

  • 👉 Process pool is the application of technology composed of resource process and management process

2. Why is there a process pool?

😮There will be thousands of tasks to be performed in busy time, and there may be only sporadic tasks in idle time.
😒So when thousands of tasks need to be executed, do we need to create thousands of processes?
😓First, it takes time to create a process and time to destroy a process.
😟Second, even if thousands of processes are started, the operating system can't let them execute at the same time, which will affect the efficiency of the program.
😥Therefore, we cannot start or end the process according to the task without limitation. So what are we going to do?

3. concept of process pool

  • 😺 Define a pool and put a fixed number of processes in it. When there is a demand, take the processes in a pool to process tasks
  • 😸 When the processing is completed, the process does not close, but puts the process back into the process pool to continue waiting for tasks
  • 😹 If there are many tasks to be executed and the number of processes in the pool is not enough, the task must wait for the previous process to return after executing the task, and the idle process can continue to execute.
  • 😻 In other words, if the number of processes entering the pool is fixed, a maximum of a fixed number of processes are running at the same time
  • 😼 This will not increase the scheduling difficulty of the operating system, but also save the time of switching processes, and can achieve the concurrency effect to a certain extent.

4. Resource process

  • 👉 Pre created idle processes, management processes (like pools) 🏊) The work is distributed to idle processes for processing.

5. Management process 🏊

  • 👉 The management process is responsible for creating the resource process, handing over the work to the idle resource process for processing, and recycling the resource process that has processed the work.

  • Interaction between resource process and management process

😱How can the management process effectively manage the resource process and assign tasks to the resource process?
👉adopt IPC,Signals, semaphores, message queues, pipelines, etc.

II Use of process pool

We can control the number of processes by maintaining a process Pool. For example, using the process mode of httpd, we can specify the maximum and minimum number of processes. The Pool class of multiprocessing module can provide a specified number of processes for users to call

1. Create a process pool

  • Method: Pool([numprocess],[initializer],[initargs])
  • ps: in the multithreading article, we introduced another method of creating process pool from concurrent future import ProcessPoolExecutor
  • parameter
parametereffect
numprocessThe number of processes to be created. If omitted, the CPU will be used by default_ Value of count()
initializerThe callable object to be executed when each working process starts. The default is None
initargsParameter group passed to initializer

2. Introduction to common methods

methodeffect
p.apply(func,args,kwargs)(synchronous call) execute func function in the process working in the process pool, followed by parameters, and then return results. If you want to pass in different parameters and execute func concurrently, you need to call p.apply() function in different threads or use p.apply_async()
p.apply_async(func,args,kwargs)(asynchronous call) execute func function in the process working in the process pool, followed by parameters, and then return results. The result is an instance of AsyncResult class. You can use callback function to pass the single parameters of the results returned by funct to the callback function
p.close( )Close the process pool to prevent further operations. If all operations continue to hang, they will be completed before the worker process terminates
P.jion()Wait for all worker processes to exit. This method can only be called after close () or teminate(), otherwise base note.

3. Other methods

The following methods are applied to pply_async() and map_ The return value of async() is the object of * * AsyncResul * * instance, that is, the method of the object

methodeffect
obj.get( )Return the result and wait for the result to arrive if necessary. timeout is optional. If it does not arrive within the specified time, a will be triggered. If an exception is thrown in a remote operation, it will be thrown again when this method is called
obj.ready( )Returns True if the call is complete
obj.successful( )If the call completes and does not cause an exception, it returns True. If the method is called before the result is ready, top note is thrown.
obj.wait([timeout])Wait for the result to become available. The parameter is timeout
obj.terminate( )Immediately terminate all work processes without performing any cleanup or ending any pending work. If the object is garbage collected, this method will be called automatically

4. synchronous call example (apply)

from multiprocessing import Pool
import time,os,random
def test(n):
    print(f"Subprocess:{os.getpid()}")
    time.sleep(2)
    return n*random.randint(2,9)

if __name__ == '__main__':
    n = os.cpu_count()     # The number of local CPUs, mine is 4, the number of process pool capacity is customized, and the default number of CPU cores
    p = Pool(processes=n)  # Set the number of processes in the process pool from scratch, and only these four processes are performing tasks in the future
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply(test,args=(2,))  # Create ten tasks and call them synchronously
        li.append(res)
    p.close()  # Close the process pool first, and no new processes will be added to the pool to prevent further operations (this method can be omitted for synchronous calls)
    p.join()   # It must be executed after the close call, or an error will be reported. After execution, wait for all child processes to end (this method can be omitted for synchronous calls)
    print(li)  # Synchronous call will get the final result. (asynchronous call will get the object, so you need to use the get method to get the value.)
    print(f'Usage time:{time.time()-start_time}')
    
'''output
 Subprocess:7768
 Subprocess:16276
 Subprocess:17544
 Subprocess:15680
 Subprocess:7768
 Subprocess:16276
 Subprocess:17544
 Subprocess:15680
 Subprocess:7768
 Subprocess:16276
[4, 18, 14, 14, 12, 14, 16, 14, 6, 10]
Usage time:20.226498126983643
'''

From the above output results, we can see that there are always four processes: 7769, 16276, 17544 and 15680. Asynchronous submission needs to wait for the end of the previous task to get the result before proceeding to the next task, so it takes a little more than 20 seconds

5. Asynchronous call example (apply_async)

from multiprocessing import Pool
import time,os,random

def test(n):
    print(f"Subprocess:{os.getpid()}")
    time.sleep(2)
    return n*n*random.randint(2,9)

if __name__ == '__main__':
    n = os.cpu_count()     # The number of local CPUs, mine is 4, the number of process pool capacity is customized, and the default number of CPU cores
    p = Pool(processes=n)  # Set the size of the process pool from scratch, and then only these four processes perform tasks
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply_async(test,args=(2,))  # Open ten tasks and use asynchronous invocation
        li.append(res)
    p.close()  # Close the process pool and no new processes will be added to the pool to prevent further operations
    p.join()   # join must be performed after the close function, or an error will be reported. After execution, wait for all child processes to end
    print(li)  # The object returned is asyncresult [< multiprocessing. Pool. Applyresult object at 0x000002318511b408 >,...]
    print([i.get() for i in li])  # Use the get method to get the value of asynchronous call (synchronous call does not have this method), and put it into the list for printing
    print(f"Usage time:{time.time()-start_time}")

'''output
 Subprocess:8636
 Subprocess:10828
 Subprocess:7432
 Subprocess:13976
 Subprocess:8636
 Subprocess:10828
 Subprocess:7432
 Subprocess:13976
 Subprocess:8636
 Subprocess:10828
[<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,...ellipsis]
[16, 24, 24, 24, 16, 28, 36, 28, 8, 32]
Usage time:6.301024436950684
'''

It can also be seen from the above results that there are only four processes working from beginning to end: 8636, 10828, 7432 and 13976. Asynchronous calling mode. If the task encounters blocking operations, it will immediately receive the results of other asynchronous operations. If the process pool is full, it can only wait for the task to complete and get the results. The results are AsyncResul objects, which need to be obtained by using the get method, which takes a little more than 6 seconds

6. Example of server using process pool to control the number of access client processes

  • Server
from socket import *
from multiprocessing import Pool
import os

s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)  # Reuse IP and ports
s.bind(("127.0.0.1",8055))
s.listen(5)

def connection(conn):
    print(f"Current process:{os.getpid()}")
    while 1:
        try:
            date = conn.recv(1024)
            if len(date) == 0:break
            conn.send("ABBA ABBA".encode("utf-8"))
        except Exception:
            break

if __name__ == '__main__':
    p = Pool(2)  # If not specified, the default number of native CPU cores
    print("connection....")
    while 1:
        conn,addr = s.accept()
        print(f"Connected{addr}")
        p.apply_async(connection,args=(conn,))
  • Client (we open four clients for experiments)
from socket import *

c = socket(AF_INET,SOCK_STREAM)
c.connect(("127.0.0.1",8055))

while 1:
    msg = input("content>>").strip()
    if len(msg) == 0:continue
    c.send(msg.encode("utf-8"))
    date = c.recv(1024)
    print(f"Server reply:{date.decode('utf-8')}")
  • Test effect

Four clients and one server:

Start five machines and let four clients send messages

The first two can send and receive messages, while the latter two are blocked


The server shows that two processes are started successfully: 8928 and 17584, and the remaining two are blocked


We shut down the first two client processes to see if the process number changes

After closing the first two client processes, the latter two client processes are started immediately, and it is found that the PID is still the original two

III Callback function

1. What is a callback function

Pass the pointer of the first function (that is, the memory address, the concept of pointer is weakened in Python) as a parameter to another function for processing. This first function is called a callback function

2. simple example

def foo(n):
    print(f"foo output{n}")

def Bar(i,func):
    func(i)

for i in range(3):
    Bar(i,foo)
    
'''output
foo Output 0
foo Output 1
foo Output 2
'''

3. Application scenario of callback function

When a task in the process pool is processed, it notifies the main process that it is finished and lets the main process process process its own results, so the main process calls another function to process the results. We can put the time-consuming or blocked tasks into the process pool, specify the callback function in the main process and the main process is responsible for execution, so that the I/O process is omitted when the main process executes the callback function, What you get directly is the result of the task

  • Give a simple and easy to understand example
from multiprocessing import Pool
import os

def get(n):
    print(f"get--->{os.getpid()}")
    return n   # Return the result of task execution

def set(num):  # Get the processing result of the callback function -- > num
    print(f"set--->{os.getpid()} : {num**2}")

if __name__ == '__main__':
    p = Pool(3)
    nums = [2,3,4,1]
    li = []
    for i in nums:
        # Call asynchronously and set the callback with callback
        res = p.apply_async(get,args=(i,),callback=set)  
        li.append(res)

    p.close()  # Close process pool
    p.join()   # Wait for the child process to end
    print([ii.get() for ii in li])  # Use the get method to get the results
    
'''output
get--->8388
get--->8388
set--->8768 : 4
get--->8388
set--->8768 : 9
get--->8388
set--->8768 : 16
set--->8768 : 1
[2, 3, 4, 1]
'''
  • Get an example of the size of the web page source code
from multiprocessing import Pool
import requests,os

def get_htm(url):
    print(f"process:{os.getpid()}Start getting:{url}Webpage")
    response = requests.get(url)
    if response.status_code == 200:   # If it is 200, it is successful
        return {'url':url,'text':response.text}
    else:
        return {'url':url,'text':''}  # Some web pages cannot be obtained. The setting is empty

def parse_htm(htm_dic):
    print(f'process:{os.getpid()}Processing:{htm_dic["url"]}of text')
    parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}"
    with open("./db.txt","a")as f:    # Save the URL and the corresponding web page source code size to a file
        f.write(f"{parse_data}\n")

if __name__ == '__main__':
    urls=[
        'https://zhuanlan.zhihu.com',
        'https://www.cnblogs.com',
        'https://www.python.org',
        'https://blog.csdn.net',
        'http://www.china.com.cn',
    ]
    p = Pool(3)  # Set the maximum number of processes in the process pool to 3
    li = []
    for url in urls:
        # Call asynchronously and set callback
        res = p.apply_async(get_htm,args=(url,),callback=parse_htm)  
        li.append(res)

    p.close()  # Close process pool
    p.join()   # Wait for the child process to end
    print([i.get() for i in li])  # Get results using get method
    
'''output
 process:11484 Start getting:https://zhuanlan.zhihu.com web page
 process:17344 Start getting:https://www.cnblogs.com web page
 process:2688 Start getting:https://www.python.org web page
 process:11484 Start getting:https://blog.csdn.net web page
 process:3928 Processing:https://zhuanlan. zhihu. text for com
 process:17344 Start getting:http://www.china.com.cn web page
 process:3928 Processing:https://www.cnblogs. text for com
 process:3928 Processing:https://blog.csdn.net text
 process:3928 Processing:http://www.china. com. text of CN
 process:3928 Processing:https://www.python.org text
[{'url': 'https://zhuanlan.zhihu.com', 'text': ''},... A bunch of bytes of web page source code (omitted)]
'''
  • Check the saved "db.txt" file


4. Climb Forbes Global Rankings


from multiprocessing import Pool
import re
import requests

def get_htm(url,format1):
    response = requests.get(url)
    if response.status_code == 200:
        return (response.text,format1)
    else:
        return ('',format1)

def parse_htm(res):
    text,format1 = res
    data_list = re.findall(format1,text)
    for data in data_list:
        with open("Forbes ranking.txt","a",encoding="utf-8")as f:
            f.write(f"ranking:{data[0]},name:{data[1]},Worth:{data[2]},company:{data[3]},country:{data[4]}\n")

if __name__ == '__main__':
    url1 = "https://www.phb123.com/renwu/fuhao/shishi.html"
    # Use regular matching keywords
    format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S)
    url_list = [url1]
    for i in range(2, 16):  # A total of 15 page ranking, add all links to the list
        url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html")
    p = Pool()
    li = []
    for url in url_list:
        res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm)
        li.append(res)

    p.close()
    p.join()
    print("Save complete")
  • Check the file to see if you're on the list

ps: if you want to wait for all tasks in the process pool to be executed in the main process, and then process the results uniformly, there is no need to call back the function

Tags: Python

Posted by dhrosti on Sun, 17 Apr 2022 23:31:31 +0930