Very practical, Schedule module, Python periodic task artifact

If you want to execute a Python script periodically on a Linux server, the most famous choice should be cronab script, but cronab has the following disadvantages:

1. It is inconvenient to perform second level tasks.

2. When there are hundreds of scheduled tasks to be performed, the management of cronab will be particularly inconvenient.

Another option is Celery, but the configuration of Celery is troublesome. If you just need a lightweight scheduling tool, Celery will not be a good choice.

When you want to use a lightweight task scheduling tool, and you want it to be as simple and easy to use as possible without external dependencies. It's best to accommodate all the basic functions of Crontab, then the Schedule module is your best choice.

Using it to schedule tasks may only require a few lines of code. Feel the following:

# Python daily
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

The above code indicates that the job function is executed every 10 minutes, which is very simple and convenient. You just need to introduce the schedule module by calling scedule Every Time type do(job) publishes periodic tasks.

The published periodic task needs to use run_ The pending function is used to detect whether it is executed, so a While loop needs to poll the function continuously.

The following describes the installation and primary and advanced use of the Schedule module.

1. Preparation

Please choose one of the following ways to enter the command to install dependencies:

  1. Windows environment opens Cmd (start run CMD).
  2. Open terminal in MacOS environment (Command + space, enter Terminal).
  3. If you use the VSCode editor or pychart, you can directly use the Terminal
pip install schedule

2. Basic use

The most basic use has been mentioned at the beginning of the article. Here are more examples of scheduling tasks:

# Python daily
import schedule
import time

def job():
    print("I'm working...")

# Perform the task every ten minutes
schedule.every(10).minutes.do(job)
# Perform tasks every hour
schedule.every().hour.do(job)
# Perform the task at 10:30 every day
schedule.every().day.at("10:30").do(job)
# Perform tasks every month
schedule.every().monday.do(job)
# Every Wednesday at 13:15
schedule.every().wednesday.at("13:15").do(job)
# Execute the task every 17th second of every minute
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

You can see that the above examples cover the configuration from month to second. However, if you want to run the task only once, you can match it as follows:

# Python daily
import schedule
import time

def job_that_executes_once():
    # The task written here will be executed only once
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

Parameter transfer

If you have parameters that need to be passed to the job for execution, you only need to do this:

# Python daily
import schedule

def greet(name):
    print('Hello', name)

# do() passes additional parameters to the job function
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

Get all current jobs

If you want to get all your current assignments:

# Python daily
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

Cancel all jobs

If some mechanism is triggered, you need to clear all jobs of the current program immediately:

# Python daily
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

Label function

When setting the job, you can tag the job to facilitate the subsequent management of the job, so that you can obtain the job or cancel the job through tag filtering.

# Python daily
import schedule

def greet(name):
    print('Hello {}'.format(name))

# . tag labeling
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_ Jobs: you can get all the tasks of this tag
friends = schedule.get_jobs('friend')

# Cancel all daily tasks tags
schedule.clear('daily-tasks')

Set job deadline

If you need to make an assignment due at a certain time, you can use this method:

# Python daily
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# Run the job every hour and stop after 18:30
schedule.every(1).hours.until("18:30").do(job)

# Operation every hour, 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 8 hours after each job is stopped
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# Run the job every hour and stop after 11:32:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# Operate every hour and stop after 11:36:20, May 17, 2020
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

After the deadline, the job will not run.

Run all jobs immediately, regardless of their schedule

If a mechanism is triggered and you need to run all jobs immediately, you can call schedule run_ all() :

# Python daily
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# Run all jobs immediately at 10 second intervals
schedule.run_all(delay_seconds=10)

3. Advanced use

Decorator scheduling

If you think the setting operation is too verbose, you can also use the decorator mode:

# Python daily
from schedule import every, repeat, run_pending
import time

# This decorator has the same effect as schedule every(10). minutes. do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

Parallel execution

By default, Schedule executes all jobs in order. The reason behind this is that it is difficult to find a parallel execution model that makes everyone happy.

However, you can solve this limitation by running each job in the form of multithreading:

# Python daily
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

Logging

The Schedule module also supports logging, which can be used as follows:

# Python daily
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# The log level is DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

The effect is as follows:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

exception handling

Schedule will not automatically catch exceptions. It will throw exceptions directly, which will lead to a serious problem: all subsequent jobs will be interrupted, so we need to catch these exceptions.

You can capture manually, but some unexpected situations require the program to capture automatically. You can do it with a decorator:

# Python daily
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

So, bad_ Any error encountered during task execution will be caught_ Exceptions capture, which is very important to ensure the normal operation of scheduling tasks.

Tags: Python

Posted by marginalboy on Tue, 19 Apr 2022 09:52:06 +0930