热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

celery爬虫使用

简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件,

简介

celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。

它由三部分组成,消息中间件,任务执行单元任务执行结果存储组成。

官网 :http://www.celeryproject.org/                 下载:pip install celery

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。使用 Celery 之前请务必理解以下概念:
a. Celery Beat: 任务调度器,Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
b. Celery Worker: 执行任务的消费者,通常会在多台服务器运行多个消费者来提高运行效率。
c. Broker: 消息代理,也是任务队列本身(通常是消息队列或者数据库),通常称为消息中间件,接收任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方。
d. Producer: 任务生产者,调用 Celery API 的函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

项目结构如下:

其中,app_test.py为主程序,其代码如下:

from celery import Celeryapp = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')if __name__ == '__main__':app.start()

tasks.py为任务函数,代码如下:

import re
import requests
from celery import group
from proj.app_test import app@app.task(trail=True)
# 并行调用任务
def get_content(urls):return group(C.s(url) for url in urls)()@app.task(trail=True)
def C(url):return parser.delay(url)@app.task(trail=True)
# 获取每个网页的name和description
def parser(url):req = requests.get(url)html = req.texttry:name = re.findall(r'(.+?)', html)[0]desc = re.findall(r'(.+?)', html)[0]if name is not None and desc is not None:return name, descexcept Exception as err:return '', ''

celeryconfig.py为celery的配置文件,代码如下:

BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了RedisCELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSONCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

最后是我们的爬虫文件,scrapy.py,代码如下:

import time
import requests
from bs4 import BeautifulSoup
from proj.tasks import get_contentt1 = time.time()
url = "http://www.wikidata.org/w/index.php?title=Special:WhatLinksHere/Q5&limit=500&from=0"headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, \like Gecko) Chrome/67.0.3396.87 Safari/537.36'}req = requests.get(url, headers=headers)
soup = BeautifulSoup(req.text, "lxml")
human_list = soup.find(id='mw-whatlinkshere-list')('li')urls = []
for human in human_list:url = human.find('a')['href']urls.append('https://www.wikidata.org'+url)
#print(urls)# 调用get_content函数,并获取爬虫结果
result = get_content.delay(urls)
res = [v for v in result.collect()]for r in res:if isinstance(r[1], list) and isinstance(r[1][0], str):print(r[1])t2 = time.time() # 结束时间
print('耗时:%s' % (t2 - t1))

在后台启动redis,并切换至proj项目所在目录,运行命令:

celery -A proj.app_test worker -l info

输出结果如下(只显示最后几行的输出):

......
['Antoine de Saint-Exupery', 'French writer and aviator']
['', '']
['Sir John Barrow, 1st Baronet', 'English statesman']
['Amy Johnson', 'pioneering English aviator']
['Mike Oldfield', 'English musician, multi-instrumentalist']
['Willoughby Newton', 'politician from Virginia, USA']
['Mack Wilberg', 'American conductor']
耗时:80.05160284042358

在rdm中查看数据,如下:

 

Celery进阶—分布式爬虫

新建文件crawlertask.py,用于执行数据抓取任务,代码如下。

#coding:utf-8import requests
from bs4 import BeautifulSoup
from celery import Celery,platformsapp = Celery('tasks',broker='redis://localhost:6379/0')
app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'platforms.C_FORCE_ROOT=Truedef format_str(str):return str.replace("\n","").replace(" ","").replace("\t","")@app.task
def get_urls_in_pages(from_page_num,to_page_num):urls=[]search_word='计算机'url_part_1='http://www.phei.com.cn/module/goods/'\'searchkey.jsp?Page='url_part_2='&Page=2&searchKey='for i in range(from_page_num,to_page_num+1):urls.append(url_part_1+str(i)+url_part_2+search_word)all_href_list=[]for url in urls:resp=requests.get(url)bs=BeautifulSoup(resp.text)a_list=bs.find_all('a')needed_list=[]for a in a_list:if 'href' in a.attrs:href_val=a['href']title=a.textif 'bookid' in href_val and 'shopcar0.jsp' not in href_val and title!='':if [title,href_val] not in needed_list:needed_list.append([format_str(title),format_str(href_val)])all_href_list+=needed_listall_href_file = open(str(from_page_num)+'_'+str(to_page_num)+'_'+'all_hrefs.txt','w')for href in all_href_list:all_href_file.write('\t'.join(href)+'\n')all_href_file.close()return len(all_href_list)

【部署服务器方法】

将以上脚本部署到两台云端服务器 , 并且在云端开启redis服务,然后执行:

celery worker -A crawlertask -l info -c 10

【部署从机方法】

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机新建文件task_dist.py用于异步分发任务,代码如下:

from celery import Celery
from threading import Thread
import timeredis_ips={0:'redis://101.200.163.195:6379/0',1:'redis://112.124.28.41:6379/0',2:'redis://112.124.28.41:6379/0',3:'redis://101.200.163.195:6379/0',
}def send_task_and_get_results(ind,from_page,to_page):app=Celery('crawlertask',broker=redis_ips[ind])app.conf.CELERY_RESULT_BACKEND=redis_ips[ind]result=app.send_task('crawlertask.get_urls_in_pages',args=(from_page,to_page))print(redis_ips[ind],result.get())if __name__=='__main__':t1=time.time()page_ranges_lst=[(1,10),(11,20),(21,30),(31,40),]th_lst = []for ind, page_range in enumerate(page_ranges_lst):th = Thread(target=send_task_and_get_results,args=(ind,page_range[0], page_range[1]))th_lst.append(th)for th in th_lst:th.start()for th in th_lst:th.join()t2 = time.time()print("用时:", t2 - t1)

举例:

以爬douban小说为例 首先启动Redis,新建文件crawl_douban.py

import requests
import time
import redis
from celery import Celery
from bs4 import BeautifulSoup
from configparser import ConfigParsercp=ConfigParser()
cp.read('config')#获取配置信息
db_host=cp.get(section='redis',option='db_host')
db_port=cp.getint('redis','db_port')
db_pwd=cp'redis'#redis连接
pool = redis.ConnectionPool(host=db_host, port=db_port, db=15, password=db_pwd)
r = redis.StrictRedis(connection_pool=pool)
set_name='crawl:douban'app = Celery('crawl', include=['task'], broker='redis://:{}@{}:{}/12'.format(db_pwd,db_host,db_port), backend='redis://:{}@{}:{}/13'.format(db_pwd,db_host,db_port))#官方推荐使用json作为消息序列化方式
app.conf.update(CELERY_TIMEZONE='Asia/Shanghai',CELERY_ENABLE_UTC=True,CELERY_ACCEPT_CONTENT=['json'],CELERY_TASK_SERIALIZER='json',CELERY_RESULT_SERIALIZER='json',
)headers={'User-Agent':'',}@app.task
def crawl(url):res=requests.get(url,headers=headers)#延迟2秒time.sleep(2)soup=BeautifulSoup(res.text,'lxml')items=soup.select('.subject-list .subject-item .info h2 a')titles=[item['title'] for item in items]#将小说的title存入redis数据库r.sadd(set_name,(url,titles,time.time()))print(titles)return (url,titles)

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机C新建文件task_dispatcher.py用于异步分发任务,代码如下:

from crawl_douban import app
from crawl_douban import crawldef manage_crawl(urls):for url in urls:app.send_task('crawl_douban.crawl', args=(url,))#上句也可以写成 crawl.apply_async(args=(url,)) 或 crawl.delay(url)if __name__ == '__main__':start_url = 'https://book.douban.com/tag/小说'#爬去10页,每页20本书url_list = ['{}?start={}&type=T'.format(start_url, page * 20) for page in range(10)]manage_crawl(url_list)

运行task_dispatcher.py,跑完用时2.8s

 

celery worker -A tasks --loglevel=info --concurrency=5# 参数”-A”指定了Celery实例的位置
# 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
# 参数”concurrency”指定最大并发数,默认为CPU核数。

 


推荐阅读
  • 本文提供了一套实用的方法论,旨在帮助开发者构建能够应对高并发请求且易于扩展的Web服务。内容涵盖了服务器架构、数据库管理、缓存策略以及异步处理等多个方面。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • 微软Exchange服务器遭遇2022年版“千年虫”漏洞
    微软Exchange服务器在新年伊始遭遇了一个类似于‘千年虫’的日期处理漏洞,导致邮件传输受阻。该问题主要影响配置了FIP-FS恶意软件引擎的Exchange 2016和2019版本。 ... [详细]
  • 探讨如何真正掌握Java EE,包括所需技能、工具和实践经验。资深软件教学总监李刚分享了对毕业生简历中常见问题的看法,并提供了详尽的标准。 ... [详细]
  • 本文探讨了Java编程的核心要素,特别是其面向对象的特性,并详细介绍了Java虚拟机、类装载器体系结构、Java类文件和Java API等关键技术。这些技术使得Java成为一种功能强大且易于使用的编程语言。 ... [详细]
  • 全面解析运维监控:白盒与黑盒监控及四大黄金指标
    本文深入探讨了白盒和黑盒监控的概念,以及它们在系统监控中的应用。通过详细分析基础监控和业务监控的不同采集方法,结合四个黄金指标的解读,帮助读者更好地理解和实施有效的监控策略。 ... [详细]
  • 远程过程调用(RPC)是一种允许客户端通过网络请求服务器执行特定功能的技术。它简化了分布式系统的交互,使开发者可以像调用本地函数一样调用远程服务,并获得返回结果。本文将深入探讨RPC的工作原理、发展历程及其在现代技术中的应用。 ... [详细]
  • 利用RabbitMQ实现高效延迟任务处理
    本文详细探讨了如何利用RabbitMQ实现延迟任务,包括其应用场景、实现原理、系统设计以及具体的Spring Boot实现方式。 ... [详细]
  • Working with Errors in Go 1.13
    作者|陌无崖 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
author-avatar
易_拉罐
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有