pip install apscheduler
BaseScheduler
。def __init__(self, gconfig={}, **options):
gconfig
可以看做是一个全局初始化参数。**options
可以看做是自定义初始化参数。logger
:str/logging.Logger,用于输出日志信息。timezone
:str/datetime.tzinfo,指定时区。jobstore_retry_interval
:int/float,当job store抛出异常时,最短的重试时间间隔。job_defaults
:dict,新建jobs的默认参数jobstores
:dict,job store alias -> job store instance or configuration dictexecutors
:dict, executor alias -> executor instance or configuration dictconfigure(gconfig={}, prefix='apscheduler.', **options)
:在开始运行前设置参数。 start(paused=False)
:运行scheduler。 paused=True
则需要等到运行 resume()
方法后才会真正运行scheduler。shutdown(wait=True)
:关闭scheduler wait=True
时会等待所有jobs运行完成。pause()
:暂停,与resume()
配合使用。resume()
:重新开始,与pause()
配合使用。add/remove_executor
add/remove_jobstore
add/remove_listener
add_job()
:参数太多 func
trigger
:str或trigger对象,初始化参数通过 **trigger_args
来获得。args
:func
参数kwargs
:func
参数id
name
misfire_grace_timec
coalesce
max_instances
next_run_time
jobstore
:str,通过 alias 来选择。executor
:str,通过 alias 来选择。replace_existing
**trigger_args
:用于 trigger
modify_job(job_id, jobstore=None, **changes)
reschedule_job(job_id, jobstore=None, trigger=None, **trigger_args)
pause/resume_job
:暂停/继续某个job。get_jobs(jobstore=None, pending=None)
get_job(job_id, jobstore=None)
remove_job(job_id, jobstore=None)
remove_all_jobs(jobstore=None)
print_jobs(jobstore=None, out=sys.stdout)
from datetime import datetime
import time
import osfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())# background sheculer 实例
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:# This is here to simulate application activity (which keeps the main thread alive).while True:time.sleep(2)
except (KeyboardInterrupt, SystemExit):# Not strictly necessary if daemonic mode is enabled but should be done if possiblescheduler.shutdown()# blocking sheduler 实例
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:scheduler.start()
except (KeyboardInterrupt, SystemExit):pass
year
:int/str,4位month
:int/str,[1, 12]day
:int/str,[1, 31]week
:int/str,[1, 53]day_of_week
:int/str,[0, 6],从周一开始,也可以用英文的钱三个小写字母。hour
:int/str,[0, 23]minute
:int/str,[0, 59]second
:int/str,[0, 59]start_date
:datetime/strend_date
:datetime/strtimezone
:datetime.tzinfo|strjitter
:int/None,运行时间随机偏移[-jitter, jitter]秒。*
或最小值: *
day=1, minute=20
,则设置second=0
,其他元素为*
。Expression | Field | Description |
---|---|---|
* | any | fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
apscheduler.triggers.interval.IntervalTrigger
weeks
:intdays
:inthours
:intminutes
:intseconds
:intstart_date
:datetime/strend_date
:datetime/strtimezone
:datetime.tzinfo|strjitter
:int/None,运行时间随机偏移[-jitter, jitter]秒。apscheduler.triggers.date.DateTrigger(run_date=None, timezone=None)
run_date
:datetime|str,指定运行时间。timezone
:datetime.tzinfo|strapscheduler.triggers.combining.AndTrigger(triggers, jitter=None)
apscheduler.triggers.combining.OrTrigger(triggers, jitter=None)
scheduler.add_job
方法,与trigger有关的参数有: trigger=None
: date/interval/cron
。**trigger_args
:即 add_job
方法中所有没有定义的命名参数都会作为trigger对象的参数导入。# date
# 方法一
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
# 方法二
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])
# 方法三(马上执行)
sched.add_job(my_job, args=['text'])# interval
# 方法一
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
# 方法二
@sched.scheduled_job('interval', id='my_job_id', hours=2)
def job_function():print("Hello World")# cron
# 方法一
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 方法二
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():print("I am printed at 00:00:00 on the last Sunday of every month!")
# 方法三
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))# AndTrigger
trigger = AndTrigger([IntervalTrigger(hours=2),CronTrigger(day_of_week='sat,sun')])
scheduler.add_job(job_function, trigger)# OrTrigger
trigger = OrTrigger([CronTrigger(day_of_week='mon', hour=2),CronTrigger(day_of_week='tue', hour=15)])
scheduler.add_job(job_function, trigger)
executors
参数。 add_job
的时候指定对应executor的别名。jobstores
参数。 add_job
的时候指定对应jobstore的别名。from pytz import utcfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'mongo': {'type': 'mongodb'},'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}executors = {'default': {'type': 'threadpool', 'max_workers': 20},'processpool': ProcessPoolExecutor(max_workers=5)
}
# executors = {
# 'default': ThreadPoolExecutor(20),
# 'processpool': ProcessPoolExecutor(5)
# }job_defaults = {'coalesce': False,'max_instances': 3
}
scheduler = BackgroundScheduler()# .. do something else here, maybe add jobs etc.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)