Python practical skills, big task segmentation

Today, let's talk about task segmentation in Python. Take the crawler as an example. Read its contents from a txt file that stores URLs, and we will get a list of URLs. We call this url list a big task.

List segmentation

Without considering the memory occupation, we will divide the above large tasks. For example, when we divide large tasks into small tasks, we can only access 5 URL s per second at most.

import os
import time

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

def read_file():
    file_path = os.path.join(CURRENT_DIR, "url_list.txt")
    with open(file_path, "r", encoding="utf-8") as fs:
        result = [i.strip() for i in fs.readlines()]
    return result

def fetch(url):
    print(url)

def run():
    max_count = 5
    url_list = read_file()
    for index in range(0, len(url_list), max_count):
        start = time.time()
        fetch(url_list[index:index + max_count])
        end = time.time() - start
        if end < 1:
            time.sleep(1 - end)


if __name__ == '__main__':
    run()

The key code is all in the for loop. First, we declare the third parameter of range, which specifies the step size of the iteration as 5, so that each index increase is based on 5, that is, 0, 5, 10...
Then we set the url_list is sliced, and five elements are taken each time. These five elements will change with the increase of the index. If there are less than five elements in the end, according to the characteristics of the slice, the number will be taken at this time, and the index will not exceed the subscript.

As the url list increases, we will find that the memory consumption is also increasing. At this time, we need to modify the code. We know that the generator saves memory space. After the modification, the code becomes as follows.

Generator segmentation

# -*- coding: utf-8 -*-
import os
import time
from itertools import islice

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))


def read_file():
    file_path = os.path.join(CURRENT_DIR, "url_list.txt")
    with open(file_path, "r", encoding="utf-8") as fs:
        for i in fs:
            yield i.strip()


def fetch(url):
    print(url)


def run():
    max_count = 5
    url_gen = read_file()
    while True:
        url_list = list(islice(url_gen, 0, max_count))
        if not url_list:
            break
        start = time.time()
        fetch(url_list)
        end = time.time() - start
        if end < 1:
            time.sleep(1 - end)


if __name__ == '__main__':
    run()

First of all, we modified the way of reading files, changing the original form of reading list to the form of generator. In this way, we save a lot of memory when calling the file reading method.

Then, the for loop above is modified. Because of the characteristics of the generator, it is not suitable to use for iteration, because each iteration will consume the elements of the generator. Use the islice of itertools to_ Gen is sliced. Islice is the slice of the generator. Here, we slice out the generator with 5 elements each time because the generator does not have__ len__ Method, so we turn it into a list, and then judge whether the list is empty, so we can know whether the iteration is over.

The modified code greatly improves the performance and memory saving. Reading tens of millions of files is not a problem.
In addition, asynchronous generator slices may be used when using asynchronous crawlers. Next, we will discuss the problem of asynchronous generator segmentation

Asynchronous generator segmentation

First, let's look at a simple asynchronous generator.
We know that calling the following code will get a generator

def foo():
    for i in range(20):
        yield i

If you add async before def, it will be an asynchronous generator when calling.
The complete example code is as follows:

import asyncio
async def foo():
    for i in range(20):
        yield i


async def run():
    async_gen = foo()
    async for i in async_gen:
        print(i)


if __name__ == '__main__':
    asyncio.run(run())

The segmentation of async for is a bit complicated. It is recommended to use the aiostream module. After using it, the code is changed to the following:

import asyncio
from aiostream import stream

async def foo():
    for i in range(22):
        yield i


async def run():
    index = 0
    limit = 5

    while True:
        xs = stream.iterate(foo())
        ys = xs[index:index + limit]
        t = await stream.list(ys)
        if not t:
            break
        print(t)
        index += limit


if __name__ == '__main__':
    asyncio.run(run())

Tags: Python crawler programming language

Posted by EchoFool on Sun, 21 Aug 2022 15:21:04 +0930