Process pool (multiprocess.Pool)
1, Process pool concept
1. What is a process pool?
- 👉 Process pool is the application of technology composed of resource process and management process
2. Why is there a process pool?
😮There will be thousands of tasks to be performed in busy time, and there may be only sporadic tasks in idle time. 😒So when thousands of tasks need to be executed, do we need to create thousands of processes? 😓First, it takes time to create a process and time to destroy a process. 😟Second, even if thousands of processes are started, the operating system can't let them execute at the same time, which will affect the efficiency of the program. 😥Therefore, we cannot start or end the process according to the task without limitation. So what are we going to do?
3. concept of process pool
- 😺 Define a pool and put a fixed number of processes in it. When there is a demand, take the processes in a pool to process tasks
- 😸 When the processing is completed, the process does not close, but puts the process back into the process pool to continue waiting for tasks
- 😹 If there are many tasks to be executed and the number of processes in the pool is not enough, the task must wait for the previous process to return after executing the task, and the idle process can continue to execute.
- 😻 In other words, if the number of processes entering the pool is fixed, a maximum of a fixed number of processes are running at the same time
- 😼 This will not increase the scheduling difficulty of the operating system, but also save the time of switching processes, and can achieve the concurrency effect to a certain extent.
4. Resource process
- 👉 Pre created idle processes, management processes (like pools) 🏊) The work is distributed to idle processes for processing.
5. Management process 🏊
-
👉 The management process is responsible for creating the resource process, handing over the work to the idle resource process for processing, and recycling the resource process that has processed the work.
-
Interaction between resource process and management process
😱How can the management process effectively manage the resource process and assign tasks to the resource process? 👉adopt IPC,Signals, semaphores, message queues, pipelines, etc.
II Use of process pool
We can control the number of processes by maintaining a process Pool. For example, using the process mode of httpd, we can specify the maximum and minimum number of processes. The Pool class of multiprocessing module can provide a specified number of processes for users to call
1. Create a process pool
- Method: Pool([numprocess],[initializer],[initargs])
- ps: in the multithreading article, we introduced another method of creating process pool from concurrent future import ProcessPoolExecutor
- parameter
parameter | effect |
---|---|
numprocess | The number of processes to be created. If omitted, the CPU will be used by default_ Value of count() |
initializer | The callable object to be executed when each working process starts. The default is None |
initargs | Parameter group passed to initializer |
2. Introduction to common methods
method | effect |
---|---|
p.apply(func,args,kwargs) | (synchronous call) execute func function in the process working in the process pool, followed by parameters, and then return results. If you want to pass in different parameters and execute func concurrently, you need to call p.apply() function in different threads or use p.apply_async() |
p.apply_async(func,args,kwargs) | (asynchronous call) execute func function in the process working in the process pool, followed by parameters, and then return results. The result is an instance of AsyncResult class. You can use callback function to pass the single parameters of the results returned by funct to the callback function |
p.close( ) | Close the process pool to prevent further operations. If all operations continue to hang, they will be completed before the worker process terminates |
P.jion() | Wait for all worker processes to exit. This method can only be called after close () or teminate(), otherwise base note. |
3. Other methods
The following methods are applied to pply_async() and map_ The return value of async() is the object of * * AsyncResul * * instance, that is, the method of the object
method | effect |
---|---|
obj.get( ) | Return the result and wait for the result to arrive if necessary. timeout is optional. If it does not arrive within the specified time, a will be triggered. If an exception is thrown in a remote operation, it will be thrown again when this method is called |
obj.ready( ) | Returns True if the call is complete |
obj.successful( ) | If the call completes and does not cause an exception, it returns True. If the method is called before the result is ready, top note is thrown. |
obj.wait([timeout]) | Wait for the result to become available. The parameter is timeout |
obj.terminate( ) | Immediately terminate all work processes without performing any cleanup or ending any pending work. If the object is garbage collected, this method will be called automatically |
4. synchronous call example (apply)
from multiprocessing import Pool import time,os,random def test(n): print(f"Subprocess:{os.getpid()}") time.sleep(2) return n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # The number of local CPUs, mine is 4, the number of process pool capacity is customized, and the default number of CPU cores p = Pool(processes=n) # Set the number of processes in the process pool from scratch, and only these four processes are performing tasks in the future li = [] start_time = time.time() for i in range(10): res = p.apply(test,args=(2,)) # Create ten tasks and call them synchronously li.append(res) p.close() # Close the process pool first, and no new processes will be added to the pool to prevent further operations (this method can be omitted for synchronous calls) p.join() # It must be executed after the close call, or an error will be reported. After execution, wait for all child processes to end (this method can be omitted for synchronous calls) print(li) # Synchronous call will get the final result. (asynchronous call will get the object, so you need to use the get method to get the value.) print(f'Usage time:{time.time()-start_time}') '''output Subprocess:7768 Subprocess:16276 Subprocess:17544 Subprocess:15680 Subprocess:7768 Subprocess:16276 Subprocess:17544 Subprocess:15680 Subprocess:7768 Subprocess:16276 [4, 18, 14, 14, 12, 14, 16, 14, 6, 10] Usage time:20.226498126983643 '''
From the above output results, we can see that there are always four processes: 7769, 16276, 17544 and 15680. Asynchronous submission needs to wait for the end of the previous task to get the result before proceeding to the next task, so it takes a little more than 20 seconds
5. Asynchronous call example (apply_async)
from multiprocessing import Pool import time,os,random def test(n): print(f"Subprocess:{os.getpid()}") time.sleep(2) return n*n*random.randint(2,9) if __name__ == '__main__': n = os.cpu_count() # The number of local CPUs, mine is 4, the number of process pool capacity is customized, and the default number of CPU cores p = Pool(processes=n) # Set the size of the process pool from scratch, and then only these four processes perform tasks li = [] start_time = time.time() for i in range(10): res = p.apply_async(test,args=(2,)) # Open ten tasks and use asynchronous invocation li.append(res) p.close() # Close the process pool and no new processes will be added to the pool to prevent further operations p.join() # join must be performed after the close function, or an error will be reported. After execution, wait for all child processes to end print(li) # The object returned is asyncresult [< multiprocessing. Pool. Applyresult object at 0x000002318511b408 >,...] print([i.get() for i in li]) # Use the get method to get the value of asynchronous call (synchronous call does not have this method), and put it into the list for printing print(f"Usage time:{time.time()-start_time}") '''output Subprocess:8636 Subprocess:10828 Subprocess:7432 Subprocess:13976 Subprocess:8636 Subprocess:10828 Subprocess:7432 Subprocess:13976 Subprocess:8636 Subprocess:10828 [<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,...ellipsis] [16, 24, 24, 24, 16, 28, 36, 28, 8, 32] Usage time:6.301024436950684 '''
It can also be seen from the above results that there are only four processes working from beginning to end: 8636, 10828, 7432 and 13976. Asynchronous calling mode. If the task encounters blocking operations, it will immediately receive the results of other asynchronous operations. If the process pool is full, it can only wait for the task to complete and get the results. The results are AsyncResul objects, which need to be obtained by using the get method, which takes a little more than 6 seconds
6. Example of server using process pool to control the number of access client processes
- Server
from socket import * from multiprocessing import Pool import os s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # Reuse IP and ports s.bind(("127.0.0.1",8055)) s.listen(5) def connection(conn): print(f"Current process:{os.getpid()}") while 1: try: date = conn.recv(1024) if len(date) == 0:break conn.send("ABBA ABBA".encode("utf-8")) except Exception: break if __name__ == '__main__': p = Pool(2) # If not specified, the default number of native CPU cores print("connection....") while 1: conn,addr = s.accept() print(f"Connected{addr}") p.apply_async(connection,args=(conn,))
- Client (we open four clients for experiments)
from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8055)) while 1: msg = input("content>>").strip() if len(msg) == 0:continue c.send(msg.encode("utf-8")) date = c.recv(1024) print(f"Server reply:{date.decode('utf-8')}")
- Test effect
Four clients and one server:
Start five machines and let four clients send messages
The first two can send and receive messages, while the latter two are blocked
The server shows that two processes are started successfully: 8928 and 17584, and the remaining two are blocked
We shut down the first two client processes to see if the process number changes
After closing the first two client processes, the latter two client processes are started immediately, and it is found that the PID is still the original two
III Callback function
1. What is a callback function
Pass the pointer of the first function (that is, the memory address, the concept of pointer is weakened in Python) as a parameter to another function for processing. This first function is called a callback function
2. simple example
def foo(n): print(f"foo output{n}") def Bar(i,func): func(i) for i in range(3): Bar(i,foo) '''output foo Output 0 foo Output 1 foo Output 2 '''
3. Application scenario of callback function
When a task in the process pool is processed, it notifies the main process that it is finished and lets the main process process process its own results, so the main process calls another function to process the results. We can put the time-consuming or blocked tasks into the process pool, specify the callback function in the main process and the main process is responsible for execution, so that the I/O process is omitted when the main process executes the callback function, What you get directly is the result of the task
- Give a simple and easy to understand example
from multiprocessing import Pool import os def get(n): print(f"get--->{os.getpid()}") return n # Return the result of task execution def set(num): # Get the processing result of the callback function -- > num print(f"set--->{os.getpid()} : {num**2}") if __name__ == '__main__': p = Pool(3) nums = [2,3,4,1] li = [] for i in nums: # Call asynchronously and set the callback with callback res = p.apply_async(get,args=(i,),callback=set) li.append(res) p.close() # Close process pool p.join() # Wait for the child process to end print([ii.get() for ii in li]) # Use the get method to get the results '''output get--->8388 get--->8388 set--->8768 : 4 get--->8388 set--->8768 : 9 get--->8388 set--->8768 : 16 set--->8768 : 1 [2, 3, 4, 1] '''
- Get an example of the size of the web page source code
from multiprocessing import Pool import requests,os def get_htm(url): print(f"process:{os.getpid()}Start getting:{url}Webpage") response = requests.get(url) if response.status_code == 200: # If it is 200, it is successful return {'url':url,'text':response.text} else: return {'url':url,'text':''} # Some web pages cannot be obtained. The setting is empty def parse_htm(htm_dic): print(f'process:{os.getpid()}Processing:{htm_dic["url"]}of text') parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}" with open("./db.txt","a")as f: # Save the URL and the corresponding web page source code size to a file f.write(f"{parse_data}\n") if __name__ == '__main__': urls=[ 'https://zhuanlan.zhihu.com', 'https://www.cnblogs.com', 'https://www.python.org', 'https://blog.csdn.net', 'http://www.china.com.cn', ] p = Pool(3) # Set the maximum number of processes in the process pool to 3 li = [] for url in urls: # Call asynchronously and set callback res = p.apply_async(get_htm,args=(url,),callback=parse_htm) li.append(res) p.close() # Close process pool p.join() # Wait for the child process to end print([i.get() for i in li]) # Get results using get method '''output process:11484 Start getting:https://zhuanlan.zhihu.com web page process:17344 Start getting:https://www.cnblogs.com web page process:2688 Start getting:https://www.python.org web page process:11484 Start getting:https://blog.csdn.net web page process:3928 Processing:https://zhuanlan. zhihu. text for com process:17344 Start getting:http://www.china.com.cn web page process:3928 Processing:https://www.cnblogs. text for com process:3928 Processing:https://blog.csdn.net text process:3928 Processing:http://www.china. com. text of CN process:3928 Processing:https://www.python.org text [{'url': 'https://zhuanlan.zhihu.com', 'text': ''},... A bunch of bytes of web page source code (omitted)] '''
- Check the saved "db.txt" file
4. Climb Forbes Global Rankings
from multiprocessing import Pool import re import requests def get_htm(url,format1): response = requests.get(url) if response.status_code == 200: return (response.text,format1) else: return ('',format1) def parse_htm(res): text,format1 = res data_list = re.findall(format1,text) for data in data_list: with open("Forbes ranking.txt","a",encoding="utf-8")as f: f.write(f"ranking:{data[0]},name:{data[1]},Worth:{data[2]},company:{data[3]},country:{data[4]}\n") if __name__ == '__main__': url1 = "https://www.phb123.com/renwu/fuhao/shishi.html" # Use regular matching keywords format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S) url_list = [url1] for i in range(2, 16): # A total of 15 page ranking, add all links to the list url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html") p = Pool() li = [] for url in url_list: res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm) li.append(res) p.close() p.join() print("Save complete")
- Check the file to see if you're on the list
ps: if you want to wait for all tasks in the process pool to be executed in the main process, and then process the results uniformly, there is no need to call back the function