Crawler advanced application (16. Multi thread and multi process crawler)

Content summary

  1. The difference between process and thread
  2. Implementing threads in Python
  3. Pass parameters to thread
  4. Thread class
  5. Synchronize threads with thread locks and semaphores
  6. producers and consumers
  7. Implementation of multithreading
  8. Demonstrate how to implement crawler application with multithreading and multithreading through a real project

dried food

16.1 single_thread single thread

Use Python single thread to call two functions: fun1 and fun2. Sleep function is used in these two functions to sleep for a certain time. If these two functions are called with single thread, these two functions will be executed in sequence

from time import sleep,ctime
def fun1():
    print('Start the tour fun1: ',ctime())
    # Sleep for 4 seconds
    sleep(4)
    print('fun1 Tour completed:',ctime())
def fun2():
    print('Start the tour fun2: ', ctime())
    # Sleep for 2 seconds
    sleep(2)
    print('fun2 Tour completed:', ctime())
def main():
    print('Start run time:',ctime())
    # Calling fun1 and fun2 middle note in single thread
    fun1()
    fun2()
    print('End run time:',ctime())
if __name__ == '__main__':
    main()

16.2 _thread thread library

  1. Use_ Start in thread module_ new_ The thread function will directly start a thread. The first parameter of the function needs to specify a function,
  2. You can turn this function into a thread function, which will be called automatically when the thread starts. The second parameter is the parameter passed to the thread function. It must be of tuple type
import _thread as thread
from time import sleep, ctime
def fun1():
    print('Start the tour fun1: ',ctime())
    # Sleep for 4 seconds
    sleep(4)
    print('fun1 Tour completed:',ctime())
def fun2():
    print('Start the tour fun2: ', ctime())
    # Sleep for 2 seconds
    sleep(2)
    print('fun2 Tour completed:', ctime())
def main():
    print('Start run time:',ctime())
    # Start a thread to run the fun1 function
    thread.start_new_thread(fun1, ())
    # Start a thread and run the fun2 function
    thread.start_new_thread(fun2, ())
    # Sleep for 6 seconds
    sleep(6)
    print('End time:',ctime())
if __name__ == '__main__':
    main()

It can be seen from the results that during the four seconds of fun1 dormancy, fun2 was not idle. It took advantage of the weakness and ended the operation of the function first

16.3 multi_thread_args multithreading with parameters

Using for loop and start_ new_ The thread function starts 8 threads. Each thread function does not pass different parameter values, and then outputs the passed in parameter values in the thread function

import random
from time import sleep
import _thread as thread
# Thread function, where a and b are through start_ new_ Parameters passed in by thread function
def fun(a,b):
    print(a,b)
    # Time of random sleep (1~4s)
    sleep(random.randint(1,5))
# Start 8 threads
for i in range(8):
    # Pass in 2 parameter values for each thread function
    thread.start_new_thread(fun,(i+1,'a'*(i+1)))
# Pause the thread by outputting a string from the terminal
input()

Conclusion: threads will seize resources and take advantage of each other

16.4 lock thread lock

Usage of lock

 allocate_lock The function is used to create an object and then use the acquire Method to obtain a lock
 If you don't need a lock, you can use the of the lock object release Method to release the lock.
If you want to judge whether the lock is released, you can use the of the lock object locked method
  1. Start two threads, create two locks, and obtain two locks before running the thread function, which means that the lock is in the locked state. When starting, pass the two lock objects into the respective lock objects of the two threads,
  2. When the thread function is executed, it will call the release method of the lock object to release the lock. At the end of the main function, use the while loop and locked method to determine whether the two locks have been released
  3. As long as one is not released, while will not exit. If all are released, it will end immediately
import _thread as thread
from time import sleep, ctime
# Thread function. Index is an integer index, sec is the sleep time (in seconds), and lock is the lock object
def fun(index,sec,lock):
    print('Start execution{}Execution time:{}'.format(index,ctime()))
    # Sleep sec
    sleep(sec)
    print('end of execution{}Execution time:{}'.format(index, ctime()))
    # Release lock object
    lock.release()

def main():
    lock1 = thread.allocate_lock()
    # Obtain a lock, i.e. lock
    lock1.acquire()
    # Start the first thread and pass in the first lock object. 10 is the index, 4 is the sleep time, and lock1 is the lock object
    thread.start_new_thread(fun,(10,4,lock1))

    lock2 = thread.allocate_lock()
    lock2.acquire()
    # Start the second thread and pass in the second lock object. 20 is the index, 2 is the sleep time, and lock2 is the lock object
    thread.start_new_thread(fun, (20, 2, lock2))
    # Use the while loop and locked method to determine whether lock1 and lock2 are released
    # As long as one is not released, the while loop will not exit
    while lock1.locked() or lock2.locked():
        pass

if __name__ == '__main__':
    main()

16.5 threading another thread library

  1. The instance of Thread class in threading module is an object that executes threads
  2. _ Thread module can be regarded as the process oriented version of thread, and thread class can be regarded as the object-oriented version of thread
  3. The target keyword of the construction method of Thread class executes the Thread function, and the parameters passed to the Thread function are specified through the args keyword parameter. Base note, then call the start method to start the Thread.
import threading
from time import sleep,ctime
# Thread function. Index is an integer index and sec is the sleep time (unit: seconds)
def fun(index,sec):
    print('Start execution{}Execution time:{}'.format(index,ctime()))
    # Sleep sec
    sleep(sec)
    print('end of execution{}Execution time:{}'.format(index, ctime()))

def main():
    # Create the first Thread object, specify the Thread function fun through the target keyword parameter, and pass in index 10 and sleep time 4s
    thread1 = threading.Thread(target=fun,args=(10,4))
    thread1.start()
    # Create the second Thread object, specify the Thread function fun through the target keyword parameter, and pass in the index 20 and sleep time 2s
    thread2 = threading.Thread(target=fun, args=(20, 2))
    thread2.start()
    # Wait for the first thread function to finish executing
    thread1.join()
    # Wait for the second function to finish executing
    thread2.join()

if __name__ == '__main__':
    main()

Conclusion: the join method does not need to release the lock manually, and the threading library is more convenient

16.6 thread_ Default thread startup method of obj object

The target keyword parameter can be not only a function, but also an object,
Class must have one__ call__ Method, the thread object will be called automatically when the thread starts__ call__ method

import threading
from time import sleep,ctime
# Class corresponding to thread object
class Mythread(object):
    # func represents the thread function, args represents the parameters of the thread function
    def __init__(self,func,args):
        # Assign the parameters of thread function and thread function to the member variables of the current class
        self.func = func
        self.args = args
    # This method is called when the thread starts
    def __call__(self):
        # Call the thread function, decompose the tuple type parameter into a single parameter value, and pass it into the thread function
        self.func(*self.args)

# Thread function
def fun(index,sec):
    print('Start execution{}Execution time:{}'.format(index,ctime()))
    # Sleep sec
    sleep(sec)
    print('end of execution{}Execution time:{}'.format(index, ctime()))

def main():
    print('Execution start time:',ctime())
    # Create the first Thread object, specify the Thread function fun through the target keyword parameter, and pass in index 10 and sleep time 4s
    thread1 = threading.Thread(target = Mythread(fun,(10, 4)))
    thread1.start()
    # Create the second Thread object, specify the Thread function fun through the target keyword parameter, and pass in the index 20 and sleep time 2s
    thread2 = threading.Thread(target = Mythread(fun,(20, 2)))
    thread2.start()
    # Create the third Thread object, specify the Thread function fun through the target keyword parameter, and pass in the index 20 and sleep time 2s
    thread3 = threading.Thread(target = Mythread(fun,(30, 1)))
    thread3.start()
    # Wait for execution to complete
    thread1.join()
    thread2.join()
    thread3.join()
    print('All thread functions have been executed:',ctime())

if __name__ == '__main__':
    main()

16.7 thread_inherit inherit thread class

  1. The subclass MyThread inherited from the Thread class overrides the constructor and run methods of the parent class.
  2. Finally, create and start two threads through the MyThread class, and use the join method to wait for the two threads to finish before exiting the program
import threading
from time import ctime,sleep
# Subclass derived from Thread class
class MyThread(threading.Thread):
    # Override the construction method of the parent class, where func is the thread function, args is the parameter of the incoming thread function, and name is the thread name
    def __init__(self,func,args,name=''):
        # Call the constructor of the parent class and pass in the parameter value of the response
        super().__init__(target=func,name=name,args=args)
        # Override the run method of the parent class
        def run(self):
            self._target(*self.args)

# Thread function
def fun(index,sec):
    print('Start execution{}Execution time:{}'.format(index,ctime()))
    # Sleep sec
    sleep(sec)
    print('end of execution{}Execution time:{}'.format(index, ctime()))

def main():
    print('k Start:',ctime())
    # Create the first thread and specify the thread name as "thread 1"
    thread1 = MyThread(fun,(10,4),"Thread 1")
    # Create the second thread and specify the thread name as "thread 2"
    thread2 = MyThread(fun, (20, 2), "Thread 2")
    thread1.start()
    thread2.start()
    print(thread1.name)
    print(thread2.name)
    thread1.join()
    thread2.join()

    print('end:',ctime())

if __name__ == '__main__':
    main()

16.8 lock_demo thread lock object

  1. Use the for loop in the thread function to output the thread name and the value of the loop variable, and program this code to atomic operation through thread lock (atomicity: non interruptible)
  2. In this way, only after the for loop of the current thread is executed, the for loops of other thread functions will regain the thread lock permission and execute
from atexit import register
import random
from threading import Thread,Lock,currentThread
from time import sleep,ctime
# Create thread lock object
lock = Lock()
def fun():
    # Get thread lock permission
    lock.acquire()
    # The for loop has become an atomic operation: because the lock is obtained, other threads cannot steal the cpu resources of the for loop, so the for loop can run completely and end
    for i in range(5):
        print('Thread Name={} i={}'.format(currentThread().name,i))
        # Sleep for a period of time 4s
        sleep(random.randint(1,5))
    # Release the thread lock, and other thread functions can obtain the permission of this thread lock
    lock.release()

def main():
    # Three threads are started through a loop
    for i in range(3):
        Thread(target=fun).start()

# This function is called when the program ends
@register
def exit():
    print('Thread execution completed:',ctime())
if __name__ == '__main__':
    main()

16.9 semaphore semaphore (resource)

This involves some knowledge points on the operating system. semaphore resources are essentially allocated by the operating system to each process (thread). At this time, we need to consider the use of resources and avoid deadlock, otherwise it will cause failure.
Without going into detail here, readers can look at the blogger's operating system column and get a lot of results.

Use the instance of BoundedSemaphore class, acquire method and release method to obtain resource (- 1) and release resource (+ 1)

from threading import BoundedSemaphore
Max = 3
# Create a semaphore object and set the maximum value of the counter. The counter cannot exceed this value
semaphore = BoundedSemaphore(Max)
print(semaphore._value)
# Application resources - 1
semaphore.acquire()
print(semaphore._value)
semaphore.acquire()
print(semaphore._value)
semaphore.acquire()
print(semaphore._value)
# When the counter is 0, no more resources can be obtained, so the acquire method will return False
print(semaphore.acquire(False))
print(semaphore._value)

# Release resources + 1
semaphore.release()
print(semaphore._value)
semaphore.release()
print(semaphore._value)
semaphore.release()
print(semaphore._value)
# An exception is thrown, but the counter reaches the maximum value and can no longer release resources
semaphore.release()

The parameter value of the acquire method is False. When the counter is 0, it will not block. Instead, it directly returns False, indicating that the resource has not been obtained. If the resource is successfully obtained, it will return True.

16.10 semaphore_ A small example of lock resources -- candy machine

  1. Simulate the process of a candy machine replenishing candy and users obtaining candy. The candy machine has 5 slots
  2. If you find that there is no candy in each slot, you need to add new candy
  3. When all five slots are full, you can't replenish new candy
  4. If all five slots are empty, customers will not be able to buy candy.
    For convenience, this example assumes that the customer will buy the candy in the whole slot at one time and supplement the candy in the whole slot each time
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread
from time import sleep,ctime
# Create thread lock
lock = Lock()
# Defines the number of slots of the candy machine, which is also the maximum value of the semaphore counter
MAX = 5
# Create a semaphore object and specify the maximum value of the counter
candytray = BoundedSemaphore(MAX)
# Add new candy to the slot of the candy machine (only one slot at a time)
def refill():
    # Obtain the thread lock and turn the operation of supplementing candy into atomic operation
    lock.acquire()
    print('Add candy again...',end=' ')
    try:
        # Replenish the candy counter + 1 for the slot of the candy machine
        candytray.release()
    except ValueError:
        print('The candy machine is full and cannot be added')
    else:
        print('Candy added successfully')
    # Release thread lock
    lock.release()

# Customers buy candy
def buy():
    # Get the thread lock and change the operation of buying candy into atomic operation
    lock.acquire()
    print('Buy candy...',end=' ')
    # The customer buys candy counter-1. If the purchase fails (there are no candy on five machines), False is returned
    if candytray.acquire(False):
        print('Candy purchase successful')
    else:
        print('Candy machine is empty, unable to buy candy')
    lock.release()

# Produce multiple actions to supplement candy
def producer(loops):
    for i in range(loops):
        refill()
        sleep(randrange(3))
# Generate multiple actions to buy candy
def consumer(loops):
    for i in range(loops):
        buy()
        sleep(randrange(3))

def main():
    print('Start:',ctime())
    # Generate a random number of 2-5
    nloops = randrange(2,6)
    print('Candy machine shared%d A slot!' % MAX)
    # Start a thread to execute the consumer function
    Thread(target=consumer, args=(randrange(nloops, nloops + MAX + 2),)).start()
    # Start a thread to execute the producer function
    Thread(target=producer, args=(nloops,)).start()

@register
def exit():
    print('Program execution completed:',ctime())

if __name__ == '__main__':
    main()

10.11 producer_consumer classic case: consumers and producers

The queue module is used to provide a mechanism for inter thread communication. Producers and consumers share a queue
Producers produce goods, consumers consume goods

from random import randrange
from time import sleep,time,ctime
from threading import Lock,Thread
from queue import Queue
# Create thread lock object
lock = Lock()
# Subclass derived from Thread
class MyThread(Thread):
    def __init__(self,func,args):
        super().__init__(target=func,args=args)

# Add item to queue
def wrireQ(queue):
    # Get thread lock
    lock.acquire()
    print('An object is generated and added to the queue',end=' ')
    # Add item to queue
    queue.put('commodity')
    print("Queue size",queue.qsize())
    # Release thread lock
    lock.release()

# Get item from queue
def readQ(queue):
    # Get thread lock
    lock.acquire()
    # Get item from queue
    val = queue.get(1)
    print('Consumed an object, queue size:',queue.qsize())
    # Release thread lock
    lock.release()

# Generate several producers
def writer(queue,loops):
    for i in range(loops):
        wrireQ(queue)
        sleep(randrange(1,4))

# Generate several consumers
def reader(queue,loops):
    for i in range(loops):
        readQ(queue)
        sleep(randrange(1,4))

funcs = [writer,reader]
nfuncs = range(len(funcs))

def main():
    nloops = randrange(2,6)
    q = Queue(32)

    threads = []
    # Create two threads to run writer function and reader function
    for i in nfuncs:
        t = MyThread(funcs[i],(q,nloops))
        threads.append(t)
    # Start thread
    for i in nfuncs:
        threads[i].start()

    # Wait for 2 threads to end
    for i in nfuncs:
        threads[i].join()
    print('All the work is over')

if __name__ == '__main__':
    main()

16.12 multi_process multi process

Difference between process and thread:

Process: the smallest unit of resource allocation
Thread: the smallest unit of scheduling
A process can have multiple threads. In short, the operating system cannot directly allocate resources to threads, but to processes, which create threads to use resources.

Introduction to process pool:

  1. If there are many established processes, you can use the process Pool (Pool class) of the multiprocessing module. You can specify the number of processes created through the processes parameter of the Pool class construction method
  2. The Pool class has a map method, which is used to manage the callback function and the data to be passed to the callback function
from multiprocessing import Pool
import time
# Thread callback function
def get_value(value):
    i = 0
    while i < 3:
        # Sleep for 1 second
        time.sleep(1)
        print(value,i)
        i += 1

if __name__ == '__main__':
    # Generate 5 values for multiple processes to obtain
    values = ['value{}'.format(str(i)) for i in range(0,5)]
    # Create 4 processes
    pool = Pool(processes=4)
    # Associate the process callback function with values
    pool.map(get_value,values)

During the running process of the program, if you check the python process through the task manager, you will find five more Python processes, one of which is the main process and the other four are sub processes created through Pool

16.13 practical case: multi thread capture of Douban music Top250 ranking list

  1. This example uses four threads to grab different pages at the same time for analysis
  2. Create a pool that stores URL s and a list.
  3. Get the URL in this list_ The URL function is completed, which is synchronized through the thread lock
  4. Because the URL will be deleted from the list after obtaining the URL, the list must be synchronized in a multithreaded environment, otherwise dirty data will appear
import threading
import datetime
import requests
from bs4 import BeautifulSoup
import re
import time
# Record start time
starttime = datetime.datetime.now()
# Create thread lock
lock = threading.Lock()
# Get the URL from the URL list, which is a synchronization function
def get_url():
    global urls
    # Add a resource lock before obtaining URLs
    lock.acquire()
    if len(urls) == 0:
        lock.release()
        return ""
    else:
        url = urls[0]
        # After extracting a URL, delete the entire URL from the list
        del urls[0]

    # When the work is finished, release the lock
    lock.release()
    return url

# Request header
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36',
}

def get_url_music(url,thread_name):
    html = requests.get(url, headers=headers)
    soup = BeautifulSoup(html.text, 'lxml')
    aTags = soup.find_all("a", attrs={"class": "nbg"})
    for aTag in aTags:
        get_music_info(aTag['href'], thread_name)

def get_music_info(url,thread_name):
    html = requests.get(url, headers=headers)
    soup = BeautifulSoup(html.text, 'lxml')
    name = soup.find(attrs={'id': 'wrapper'}).h1.span.text
    author = soup.find(attrs={'id': 'info'}).find('a').text
    styles = re.findall('<span class="pl">schools:</span>&nbsp;(.*?)<br />', html.text, re.S)
    if len(styles) == 0:
        style = 'unknown'
    else:

        style = styles[0].strip()
    time = re.findall('Release time:</span>&nbsp;(.*?)<br />', html.text, re.S)[0].strip()
    publishers = re.findall('<span class="pl">publisher:</span>&nbsp;(.*?)<br />', html.text, re.S)
    if len(publishers) == 0:
        publisher = 'unknown'
    else:
        publisher = publishers[0].strip()

    score = soup.find(class_='ll rating_num').text
    info = {
        'name': name,
        'author': author,
        'style': style,
        'time': time,
        'publisher': publisher,
        'score': score
    }
    print(thread_name, info)

# This is a thread class
class SpiderThread(threading.Thread):
    def __init__(self,name):
        threading.Thread.__init__(self)
        # Name is the thread name
        self.name = name
    def run(self):
        while True:
            # Once the thread runs, it will constantly get the URL from the URL list and know that the list is empty
            url = get_url()
            if url != "":
                get_url_music(url,self.name)
            else:
                break

if __name__ == '__main__':
    url_index = 0
    urls = ['https://music.douban.com/top250?start={}'.format(str(i)) for i in range(0, 100, 25)]
    print(len(urls))
    # Create a new thread
    thread1 = SpiderThread('thread1')
    thread2 = SpiderThread('thread2')
    thread3 = SpiderThread('thread3')
    thread4 = SpiderThread('thread4')

    # Open thread
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    print("Exit crawler")
    endtime = datetime.datetime.now()
    print('It takes time:', (endtime - starttime).seconds, 'second')

16.14 practical case 2: using multi process Pool to grab Douban music Top250

import requests
from bs4 import BeautifulSoup
import re
from multiprocessing import Pool

# Request header
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36',
}

def get_url_music(url):
    html = requests.get(url, headers=headers)
    soup = BeautifulSoup(html.text, 'lxml')
    aTags = soup.find_all("a", attrs={"class": "nbg"})
    for aTag in aTags:
        get_music_info(aTag['href'])

def get_music_info(url):
    html = requests.get(url, headers=headers)
    soup = BeautifulSoup(html.text, 'lxml')
    name = soup.find(attrs={'id': 'wrapper'}).h1.span.text
    author = soup.find(attrs={'id': 'info'}).find('a').text
    styles = re.findall('<span class="pl">schools:</span>&nbsp;(.*?)<br />', html.text, re.S)
    if len(styles) == 0:
        style = 'unknown'
    else:

        style = styles[0].strip()
    time = re.findall('Release time:</span>&nbsp;(.*?)<br />', html.text, re.S)[0].strip()
    publishers = re.findall('<span class="pl">publisher:</span>&nbsp;(.*?)<br />', html.text, re.S)
    if len(publishers) == 0:
        publisher = 'unknown'
    else:
        publisher = publishers[0].strip()

    score = soup.find(class_='ll rating_num').text
    info = {
        'name': name,
        'author': author,
        'style': style,
        'time': time,
        'publisher': publisher,
        'score': score
    }
    print(info)

if __name__ == '__main__':
    urls = ['https://music.douban.com/top250?start={}'.format(str(i)) for i in range(0,100,25)]
    print(len(urls))
    pool = Pool(processes=4)
    pool.map(get_url_music,urls)

epilogue

At present, the crawler column is nearing the end. The last chapter will introduce the Scrapy framework to save a lot of work, such as multithreading and data storage. I hope you can support it.
Tonight two more!!

Tags: Python crawler

Posted by geon on Sat, 16 Apr 2022 14:42:57 +0930