Today, let's talk about task segmentation in Python. Take the crawler as an example. Read its contents from a txt file that stores URLs, and we will get a list of URLs. We call this url list a big task.
List segmentation
Without considering the memory occupation, we will divide the above large tasks. For example, when we divide large tasks into small tasks, we can only access 5 URL s per second at most.
import os import time CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: result = [i.strip() for i in fs.readlines()] return result def fetch(url): print(url) def run(): max_count = 5 url_list = read_file() for index in range(0, len(url_list), max_count): start = time.time() fetch(url_list[index:index + max_count]) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
The key code is all in the for loop. First, we declare the third parameter of range, which specifies the step size of the iteration as 5, so that each index increase is based on 5, that is, 0, 5, 10...
Then we set the url_list is sliced, and five elements are taken each time. These five elements will change with the increase of the index. If there are less than five elements in the end, according to the characteristics of the slice, the number will be taken at this time, and the index will not exceed the subscript.
As the url list increases, we will find that the memory consumption is also increasing. At this time, we need to modify the code. We know that the generator saves memory space. After the modification, the code becomes as follows.
Generator segmentation
# -*- coding: utf-8 -*- import os import time from itertools import islice CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: for i in fs: yield i.strip() def fetch(url): print(url) def run(): max_count = 5 url_gen = read_file() while True: url_list = list(islice(url_gen, 0, max_count)) if not url_list: break start = time.time() fetch(url_list) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
First of all, we modified the way of reading files, changing the original form of reading list to the form of generator. In this way, we save a lot of memory when calling the file reading method.
Then, the for loop above is modified. Because of the characteristics of the generator, it is not suitable to use for iteration, because each iteration will consume the elements of the generator. Use the islice of itertools to_ Gen is sliced. Islice is the slice of the generator. Here, we slice out the generator with 5 elements each time because the generator does not have__ len__ Method, so we turn it into a list, and then judge whether the list is empty, so we can know whether the iteration is over.
The modified code greatly improves the performance and memory saving. Reading tens of millions of files is not a problem.
In addition, asynchronous generator slices may be used when using asynchronous crawlers. Next, we will discuss the problem of asynchronous generator segmentation
Asynchronous generator segmentation
First, let's look at a simple asynchronous generator.
We know that calling the following code will get a generator
def foo(): for i in range(20): yield i
If you add async before def, it will be an asynchronous generator when calling.
The complete example code is as follows:
import asyncio async def foo(): for i in range(20): yield i async def run(): async_gen = foo() async for i in async_gen: print(i) if __name__ == '__main__': asyncio.run(run())
The segmentation of async for is a bit complicated. It is recommended to use the aiostream module. After using it, the code is changed to the following:
import asyncio from aiostream import stream async def foo(): for i in range(22): yield i async def run(): index = 0 limit = 5 while True: xs = stream.iterate(foo()) ys = xs[index:index + limit] t = await stream.list(ys) if not t: break print(t) index += limit if __name__ == '__main__': asyncio.run(run())