热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

celery爬虫使用

简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件,

简介

celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。

它由三部分组成,消息中间件,任务执行单元任务执行结果存储组成。

官网 :http://www.celeryproject.org/                 下载:pip install celery

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。使用 Celery 之前请务必理解以下概念:
a. Celery Beat: 任务调度器,Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
b. Celery Worker: 执行任务的消费者,通常会在多台服务器运行多个消费者来提高运行效率。
c. Broker: 消息代理,也是任务队列本身(通常是消息队列或者数据库),通常称为消息中间件,接收任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方。
d. Producer: 任务生产者,调用 Celery API 的函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

项目结构如下:

其中,app_test.py为主程序,其代码如下:

from celery import Celeryapp = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')if __name__ == '__main__':app.start()

tasks.py为任务函数,代码如下:

import re
import requests
from celery import group
from proj.app_test import app@app.task(trail=True)
# 并行调用任务
def get_content(urls):return group(C.s(url) for url in urls)()@app.task(trail=True)
def C(url):return parser.delay(url)@app.task(trail=True)
# 获取每个网页的name和description
def parser(url):req = requests.get(url)html = req.texttry:name = re.findall(r'(.+?)', html)[0]desc = re.findall(r'(.+?)', html)[0]if name is not None and desc is not None:return name, descexcept Exception as err:return '', ''

celeryconfig.py为celery的配置文件,代码如下:

BROKER_URL = 'redis://localhost' # 使用Redis作为消息代理CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了RedisCELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSONCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

最后是我们的爬虫文件,scrapy.py,代码如下:

import time
import requests
from bs4 import BeautifulSoup
from proj.tasks import get_contentt1 = time.time()
url = "http://www.wikidata.org/w/index.php?title=Special:WhatLinksHere/Q5&limit=500&from=0"headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, \like Gecko) Chrome/67.0.3396.87 Safari/537.36'}req = requests.get(url, headers=headers)
soup = BeautifulSoup(req.text, "lxml")
human_list = soup.find(id='mw-whatlinkshere-list')('li')urls = []
for human in human_list:url = human.find('a')['href']urls.append('https://www.wikidata.org'+url)
#print(urls)# 调用get_content函数,并获取爬虫结果
result = get_content.delay(urls)
res = [v for v in result.collect()]for r in res:if isinstance(r[1], list) and isinstance(r[1][0], str):print(r[1])t2 = time.time() # 结束时间
print('耗时:%s' % (t2 - t1))

在后台启动redis,并切换至proj项目所在目录,运行命令:

celery -A proj.app_test worker -l info

输出结果如下(只显示最后几行的输出):

......
['Antoine de Saint-Exupery', 'French writer and aviator']
['', '']
['Sir John Barrow, 1st Baronet', 'English statesman']
['Amy Johnson', 'pioneering English aviator']
['Mike Oldfield', 'English musician, multi-instrumentalist']
['Willoughby Newton', 'politician from Virginia, USA']
['Mack Wilberg', 'American conductor']
耗时:80.05160284042358

在rdm中查看数据,如下:

 

Celery进阶—分布式爬虫

新建文件crawlertask.py,用于执行数据抓取任务,代码如下。

#coding:utf-8import requests
from bs4 import BeautifulSoup
from celery import Celery,platformsapp = Celery('tasks',broker='redis://localhost:6379/0')
app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'platforms.C_FORCE_ROOT=Truedef format_str(str):return str.replace("\n","").replace(" ","").replace("\t","")@app.task
def get_urls_in_pages(from_page_num,to_page_num):urls=[]search_word='计算机'url_part_1='http://www.phei.com.cn/module/goods/'\'searchkey.jsp?Page='url_part_2='&Page=2&searchKey='for i in range(from_page_num,to_page_num+1):urls.append(url_part_1+str(i)+url_part_2+search_word)all_href_list=[]for url in urls:resp=requests.get(url)bs=BeautifulSoup(resp.text)a_list=bs.find_all('a')needed_list=[]for a in a_list:if 'href' in a.attrs:href_val=a['href']title=a.textif 'bookid' in href_val and 'shopcar0.jsp' not in href_val and title!='':if [title,href_val] not in needed_list:needed_list.append([format_str(title),format_str(href_val)])all_href_list+=needed_listall_href_file = open(str(from_page_num)+'_'+str(to_page_num)+'_'+'all_hrefs.txt','w')for href in all_href_list:all_href_file.write('\t'.join(href)+'\n')all_href_file.close()return len(all_href_list)

【部署服务器方法】

将以上脚本部署到两台云端服务器 , 并且在云端开启redis服务,然后执行:

celery worker -A crawlertask -l info -c 10

【部署从机方法】

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机新建文件task_dist.py用于异步分发任务,代码如下:

from celery import Celery
from threading import Thread
import timeredis_ips={0:'redis://101.200.163.195:6379/0',1:'redis://112.124.28.41:6379/0',2:'redis://112.124.28.41:6379/0',3:'redis://101.200.163.195:6379/0',
}def send_task_and_get_results(ind,from_page,to_page):app=Celery('crawlertask',broker=redis_ips[ind])app.conf.CELERY_RESULT_BACKEND=redis_ips[ind]result=app.send_task('crawlertask.get_urls_in_pages',args=(from_page,to_page))print(redis_ips[ind],result.get())if __name__=='__main__':t1=time.time()page_ranges_lst=[(1,10),(11,20),(21,30),(31,40),]th_lst = []for ind, page_range in enumerate(page_ranges_lst):th = Thread(target=send_task_and_get_results,args=(ind,page_range[0], page_range[1]))th_lst.append(th)for th in th_lst:th.start()for th in th_lst:th.join()t2 = time.time()print("用时:", t2 - t1)

举例:

以爬douban小说为例 首先启动Redis,新建文件crawl_douban.py

import requests
import time
import redis
from celery import Celery
from bs4 import BeautifulSoup
from configparser import ConfigParsercp=ConfigParser()
cp.read('config')#获取配置信息
db_host=cp.get(section='redis',option='db_host')
db_port=cp.getint('redis','db_port')
db_pwd=cp'redis'#redis连接
pool = redis.ConnectionPool(host=db_host, port=db_port, db=15, password=db_pwd)
r = redis.StrictRedis(connection_pool=pool)
set_name='crawl:douban'app = Celery('crawl', include=['task'], broker='redis://:{}@{}:{}/12'.format(db_pwd,db_host,db_port), backend='redis://:{}@{}:{}/13'.format(db_pwd,db_host,db_port))#官方推荐使用json作为消息序列化方式
app.conf.update(CELERY_TIMEZONE='Asia/Shanghai',CELERY_ENABLE_UTC=True,CELERY_ACCEPT_CONTENT=['json'],CELERY_TASK_SERIALIZER='json',CELERY_RESULT_SERIALIZER='json',
)headers={'User-Agent':'',}@app.task
def crawl(url):res=requests.get(url,headers=headers)#延迟2秒time.sleep(2)soup=BeautifulSoup(res.text,'lxml')items=soup.select('.subject-list .subject-item .info h2 a')titles=[item['title'] for item in items]#将小说的title存入redis数据库r.sadd(set_name,(url,titles,time.time()))print(titles)return (url,titles)

将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:

celery -A crawl_douban worker -l info

在本机C新建文件task_dispatcher.py用于异步分发任务,代码如下:

from crawl_douban import app
from crawl_douban import crawldef manage_crawl(urls):for url in urls:app.send_task('crawl_douban.crawl', args=(url,))#上句也可以写成 crawl.apply_async(args=(url,)) 或 crawl.delay(url)if __name__ == '__main__':start_url = 'https://book.douban.com/tag/小说'#爬去10页,每页20本书url_list = ['{}?start={}&type=T'.format(start_url, page * 20) for page in range(10)]manage_crawl(url_list)

运行task_dispatcher.py,跑完用时2.8s

 

celery worker -A tasks --loglevel=info --concurrency=5# 参数”-A”指定了Celery实例的位置
# 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
# 参数”concurrency”指定最大并发数,默认为CPU核数。

 


推荐阅读
  • 历经两个月,他成功斩获阿里巴巴Offer
    经过两个月的努力,一位普通的双非本科毕业生最终成功获得了阿里巴巴的录用通知。 ... [详细]
  • 本文提供了一套实用的方法论,旨在帮助开发者构建能够应对高并发请求且易于扩展的Web服务。内容涵盖了服务器架构、数据库管理、缓存策略以及异步处理等多个方面。 ... [详细]
  • RabbitMQ 核心组件解析
    本文详细介绍了RabbitMQ的核心概念,包括其基本原理、应用场景及关键组件,如消息、生产者、消费者、信道、交换机、路由键和虚拟主机等。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • 本文深入探讨了UNIX/Linux系统中的进程间通信(IPC)机制,包括消息传递、同步和共享内存等。详细介绍了管道(Pipe)、有名管道(FIFO)、Posix和System V消息队列、互斥锁与条件变量、读写锁、信号量以及共享内存的使用方法和应用场景。 ... [详细]
  • 前言无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上 ... [详细]
  • 字节跳动夏季招聘面试经验分享
    本文详细记录了字节跳动夏季招聘的面试经历,涵盖了一、二、三轮面试的技术问题及项目讨论,旨在为准备类似面试的求职者提供参考。 ... [详细]
  • 本文探讨了Web开发与游戏开发之间的主要区别,旨在帮助开发者更好地理解两种开发领域的特性和需求。文章基于作者的实际经验和网络资料整理而成。 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
  • Django xAdmin 使用指南(第一部分)
    本文介绍如何在Django项目中集成和使用xAdmin,这是一个增强版的管理界面,提供了比Django默认admin更多的功能。文中详细描述了集成步骤及配置方法。 ... [详细]
  • 一、搭建项目创建Maven项目导入rabbitmq包com.rabbitmqamqp-clien ... [详细]
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
author-avatar
易_拉罐
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有