项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但是太麻烦。看看就知道,Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用 SQLite 进行本地开发。需要用到队列,对于这点需求简直就是大材小用。最后找到了比较合适的Flask-APScheduler。
介绍
看看 github的flask-apscheduler介绍。
- Loads scheduler configuration from Flask configuration.(支持从Flask中加载调度)
- Loads job definitions from Flask configuration.(支持从Flask中加载任务配置)
- Allows to specify the hostname which the scheduler will run on.(允许指定服务器运行任务)
- Provides a REST API to manage the scheduled jobs.(提供Rest接口管理任务)
- Provides authentication for the REST API.(提供Rest接口认证)
安装及配置
pip install Flask-APScheduler
在Flask配置文件中添加
SCHEDULER_API_ENABLED = True
JOBS = [{'id': 'job_1h_data','func': job_1h_data,'args': '','trigger': {'type': 'cron','day_of_week': "0-6",'hour': '*','minute': '1','second': '0'}},{'id': 'job_announce','func': exchange_an,'args': '','trigger': 'interval','seconds': 300}
]
上面指定了每一小时获取所有货币24h最高位以及交易所公告。
获取公告
def exchange_an():""":param start_date: 开始时间 YYYY-MM-DD HH:MM:SS:param end_date: 结束时间 YYYY-MM-DD HH:MM:SS:return: 推送消息,保持数据库"""current_local = time.time()start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local - 300))end_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local))announce = pro.query('exchange_ann', start_date=start_date, end_date=end_date)print('请求交易所公告...')for x in announce.values:s = {'title': x[0],'content': x[1],'type': x[2],'url': x[3],'datetime': x[4]}value = json.dumps(s)print(value)mqttClient.publish('system/ex_announce', value)
动态添加任务
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetimedef aps_test(x):print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), xscheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')scheduler.start()
"""
暂停任务
"""
scheduler.pause_job('interval_task')
"""
恢复任务
"""
scheduler.resume_job('interval_task')
"""
删除任务
"""
scheduler.remove_job('interval_task')
apscheduler支持添加三种方式的任务,分别是定时任务,一次性任务及循环任务。同时也包含了对任务的控制。
总结
因为是单机版本,所以指定服务器运行任务,Rest接口管理任务,Rest接口认证就不写了。后续有需求在继续。
欢迎长按下图 -> 识别图中二维码或者微信扫一扫关注我的公众号