celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。
它由三部分组成,消息中间件,任务执行单元和任务执行结果存储组成。
官网 :http://www.celeryproject.org/ 下载:pip install celery
消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。使用 Celery 之前请务必理解以下概念:
a. Celery Beat: 任务调度器,Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
b. Celery Worker: 执行任务的消费者,通常会在多台服务器运行多个消费者来提高运行效率。
c. Broker: 消息代理,也是任务队列本身(通常是消息队列或者数据库),通常称为消息中间件,接收任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方。
d. Producer: 任务生产者,调用 Celery API 的函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
其中,app_test.py为主程序,其代码如下:
from celery import Celeryapp = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')if __name__ == '__main__':app.start()
tasks.py为任务函数,代码如下:
import re
import requests
from celery import group
from proj.app_test import app@app.task(trail=True)
# 并行调用任务
def get_content(urls):return group(C.s(url) for url in urls)()@app.task(trail=True)
def C(url):return parser.delay(url)@app.task(trail=True)
# 获取每个网页的name和description
def parser(url):req = requests.get(url)html = req.texttry:name = re.findall(r'(.+?)', html)[0]desc = re.findall(r'(.+?)', html)[0]if name is not None and desc is not None:return name, descexcept Exception as err:return '', ''
celeryconfig.py为celery的配置文件,代码如下:
BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了RedisCELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSONCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
最后是我们的爬虫文件,scrapy.py,代码如下:
import time
import requests
from bs4 import BeautifulSoup
from proj.tasks import get_contentt1 = time.time()
url = "http://www.wikidata.org/w/index.php?title=Special:WhatLinksHere/Q5&limit=500&from=0"headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, \like Gecko) Chrome/67.0.3396.87 Safari/537.36'}req = requests.get(url, headers=headers)
soup = BeautifulSoup(req.text, "lxml")
human_list = soup.find(id='mw-whatlinkshere-list')('li')urls = []
for human in human_list:url = human.find('a')['href']urls.append('https://www.wikidata.org'+url)
#print(urls)# 调用get_content函数,并获取爬虫结果
result = get_content.delay(urls)
res = [v for v in result.collect()]for r in res:if isinstance(r[1], list) and isinstance(r[1][0], str):print(r[1])t2 = time.time() # 结束时间
print('耗时:%s' % (t2 - t1))
在后台启动redis,并切换至proj项目所在目录,运行命令:
celery -A proj.app_test worker -l info
输出结果如下(只显示最后几行的输出):
......
['Antoine de Saint-Exupery', 'French writer and aviator']
['', '']
['Sir John Barrow, 1st Baronet', 'English statesman']
['Amy Johnson', 'pioneering English aviator']
['Mike Oldfield', 'English musician, multi-instrumentalist']
['Willoughby Newton', 'politician from Virginia, USA']
['Mack Wilberg', 'American conductor']
耗时:80.05160284042358
在rdm中查看数据,如下:
新建文件crawlertask.py,用于执行数据抓取任务,代码如下。
#coding:utf-8import requests
from bs4 import BeautifulSoup
from celery import Celery,platformsapp = Celery('tasks',broker='redis://localhost:6379/0')
app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'platforms.C_FORCE_ROOT=Truedef format_str(str):return str.replace("\n","").replace(" ","").replace("\t","")@app.task
def get_urls_in_pages(from_page_num,to_page_num):urls=[]search_word='计算机'url_part_1='http://www.phei.com.cn/module/goods/'\'searchkey.jsp?Page='url_part_2='&Page=2&searchKey='for i in range(from_page_num,to_page_num+1):urls.append(url_part_1+str(i)+url_part_2+search_word)all_href_list=[]for url in urls:resp=requests.get(url)bs=BeautifulSoup(resp.text)a_list=bs.find_all('a')needed_list=[]for a in a_list:if 'href' in a.attrs:href_val=a['href']title=a.textif 'bookid' in href_val and 'shopcar0.jsp' not in href_val and title!='':if [title,href_val] not in needed_list:needed_list.append([format_str(title),format_str(href_val)])all_href_list+=needed_listall_href_file = open(str(from_page_num)+'_'+str(to_page_num)+'_'+'all_hrefs.txt','w')for href in all_href_list:all_href_file.write('\t'.join(href)+'\n')all_href_file.close()return len(all_href_list)
【部署服务器方法】
将以上脚本部署到两台云端服务器 , 并且在云端开启redis服务,然后执行:
celery worker -A crawlertask -l info -c 10
【部署从机方法】
将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:
celery -A crawl_douban worker -l info
在本机新建文件task_dist.py用于异步分发任务,代码如下:
from celery import Celery
from threading import Thread
import timeredis_ips={0:'redis://101.200.163.195:6379/0',1:'redis://112.124.28.41:6379/0',2:'redis://112.124.28.41:6379/0',3:'redis://101.200.163.195:6379/0',
}def send_task_and_get_results(ind,from_page,to_page):app=Celery('crawlertask',broker=redis_ips[ind])app.conf.CELERY_RESULT_BACKEND=redis_ips[ind]result=app.send_task('crawlertask.get_urls_in_pages',args=(from_page,to_page))print(redis_ips[ind],result.get())if __name__=='__main__':t1=time.time()page_ranges_lst=[(1,10),(11,20),(21,30),(31,40),]th_lst = []for ind, page_range in enumerate(page_ranges_lst):th = Thread(target=send_task_and_get_results,args=(ind,page_range[0], page_range[1]))th_lst.append(th)for th in th_lst:th.start()for th in th_lst:th.join()t2 = time.time()print("用时:", t2 - t1)
以爬douban小说为例 首先启动Redis,新建文件crawl_douban.py
import requests
import time
import redis
from celery import Celery
from bs4 import BeautifulSoup
from configparser import ConfigParsercp=ConfigParser()
cp.read('config')#获取配置信息
db_host=cp.get(section='redis',option='db_host')
db_port=cp.getint('redis','db_port')
db_pwd=cp'redis'#redis连接
pool = redis.ConnectionPool(host=db_host, port=db_port, db=15, password=db_pwd)
r = redis.StrictRedis(connection_pool=pool)
set_name='crawl:douban'app = Celery('crawl', include=['task'], broker='redis://:{}@{}:{}/12'.format(db_pwd,db_host,db_port), backend='redis://:{}@{}:{}/13'.format(db_pwd,db_host,db_port))#官方推荐使用json作为消息序列化方式
app.conf.update(CELERY_TIMEZONE='Asia/Shanghai',CELERY_ENABLE_UTC=True,CELERY_ACCEPT_CONTENT=['json'],CELERY_TASK_SERIALIZER='json',CELERY_RESULT_SERIALIZER='json',
)headers={'User-Agent':'',}@app.task
def crawl(url):res=requests.get(url,headers=headers)#延迟2秒time.sleep(2)soup=BeautifulSoup(res.text,'lxml')items=soup.select('.subject-list .subject-item .info h2 a')titles=[item['title'] for item in items]#将小说的title存入redis数据库r.sadd(set_name,(url,titles,time.time()))print(titles)return (url,titles)
将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:
celery -A crawl_douban worker -l info
在本机C新建文件task_dispatcher.py用于异步分发任务,代码如下:
from crawl_douban import app
from crawl_douban import crawldef manage_crawl(urls):for url in urls:app.send_task('crawl_douban.crawl', args=(url,))#上句也可以写成 crawl.apply_async(args=(url,)) 或 crawl.delay(url)if __name__ == '__main__':start_url = 'https://book.douban.com/tag/小说'#爬去10页,每页20本书url_list = ['{}?start={}&type=T'.format(start_url, page * 20) for page in range(10)]manage_crawl(url_list)
运行task_dispatcher.py,跑完用时2.8s
celery worker -A tasks --loglevel=info --concurrency=5# 参数”-A”指定了Celery实例的位置
# 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
# 参数”concurrency”指定最大并发数,默认为CPU核数。