scrapy
一、scrapy框架 - Twisted使用
1、知道 reactor 、getPage、defer 作用:
from twisted.internet import reactor # 事件循环(终止条件:所有的socket都已经移除)
from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
from twisted.internet import defer # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)
2、利用twisted实现并发,步骤:
- 利用getPage创建socket
- 将socket添加到事件循环中
- 开始事件循环 (内部发送请求,并接受响应;当所有的socekt请求完成后,终止事件循环)
- 终止事件循环
1、利用getPage创建socket:
def response(content):print(content)def task(): # 创建任务url = "http://www.baidu.com"d = getPage(url.encode("utf-8")) # 创建socket对象,有返回值,为defer.Deferred对象d.addCallback(response) #请求完成/下载完成时执行callback函数
2、此时,只是创建了socket,以及构造了回调函数,还没将socket添加到事件循环中去,做如下两步,完成将socket添加到事件循环中去:
- task函数加上装饰器:@defer.inlineCallbacks
- task 函数中添加yield语句:yield d
def response(content):print(content)@defer.inlineCallbacks
def task(): # 创建任务url = "http://www.baidu.com"d = getPage(url.encode("utf-8")) # 创建socket对象,有返回值,为defer.Deferred对象d.addCallback(response) #请求完成/下载完成时执行callback函数yield d
3、开始事件循环:
def response(content):print(content)@defer.inlineCallbacks
def task(): # 创建任务url = "http://www.baidu.com"d = getPage(url.encode("utf-8")) # 创建socket对象,有返回值,为defer.Deferred对象d.addCallback(response) #请求完成/下载完成时执行callback函数yield d #task()时,直到执行到此行才将socket添加到事件循环中去task() # 必须实例化,不然task函数不执行
reactor.run() #开始事件循环
此时,运行该文件就开始了事件循环,该注意的是:现在的情况,事件循环开始后不手动结束,它是会一直循环的
4)终止事件循环:
def response(content):print(content)
@defer.inlineCallbacks
def task():url = "http://www.baidu.com"d = getPage(url.encode('utf-8'))d.addCallback(response)yield d def done(*args,**kwargs): # 终止循环
reactor.stop()d = task() # task中yield返回的 d 类型
dd = defer.DeferredList([d,]) # 将d添加到DeferredList中,监听d请求是否完成或失败
dd.addBoth(done) # 当dd列表中所有请求都完成或者失败时,会执行此函数,同时调用回调函数done,我们在done来终止函数
reactor.run()
#########################
# 1.利用getPage创建socket
# 2.将socket添加到事件循环中
# 3.开始事件循环(自动结束)
#########################
def response(content):print(content)@defer.inlineCallbacks
def task():url = "http://www.baidu.com"d = getPage(url.encode('utf-8'))d.addCallback(response)yield durl = "http://www.baidu.com"d = getPage(url.encode('utf-8'))d.addCallback(response)yield ddef done(*args,**kwargs):reactor.stop()li = []
for i in range(5):d = task()li.append(d)
dd = defer.DeferredList(li)
dd.addBoth(done)reactor.run()
以上算是实现了爬虫twisted的基本使用,我们模拟twisted源码本质进行进一步修改,让程序更安全运行。
上面说过 defer.Deferred 是特殊的socket对象,不会自动移除,我们在task函数中使用它,将task函数中:
yield d → 替换成 :yield defer.Deferred # d是socket对象 ,defer.Deferred也是socket对象,
这样task函数执行后便会一直循环不停止,就算其他socket已经完成数据爬取,但defer.Deferred不发请求也不会自动移除,因为程序不会终止,
我们取巧,在爬虫爬取数据成功时调用的回调函数里判断返回的爬虫数,当所有爬虫数都返回时,证明已经完成数据爬取了,此时我们可以手动移除defer对象,这样 defer.DeferredList 中监听的socket列表便空了,
也就会执行addBoth的回调函数了(我们在addBoth的回调函数中让事件循环终止:reactor.stop())。这样的机制是数据爬取更加安全。
具体代码实现如下:
_close = None
count = 0
def response(content):print(content)global countcount += 1if count == 3:_close.callback(None) # 当所有爬虫完成时,手动移除defer.Deferred对象
@defer.inlineCallbacks
def task():"""每个爬虫的开始:stats_request:return:"""url = "http://www.baidu.com"d1 = getPage(url.encode('utf-8'))d1.addCallback(response)url = "http://www.cnblogs.com"d2 = getPage(url.encode('utf-8'))d2.addCallback(response)url = "http://www.bing.com"d3 = getPage(url.encode('utf-8'))d3.addCallback(response)global _close_close = defer.Deferred()yield _close # yield defer.Deferred ,使事件循环卡在这,不停止,defer.Deferred.callback(None):手动移除def done(*args,**kwargs):reactor.stop()# 每一个爬虫
spider1 = task()
dd = defer.DeferredList([spider1])
dd.addBoth(done)reactor.run()
from twisted.internet import reactor # 事件循环(终止条件,所有的socket都已经移除)
from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
from twisted.internet import defer # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)class Request(object):def __init__(self,url,callback):self.url = urlself.callback = callback
class HttpResponse(object):def __init__(self,content,request):self.content = contentself.request = requestself.url = request.urlself.text = str(content,encoding='utf-8')class ChoutiSpider(object):name = 'chouti'def start_requests(self):start_url = ['http://www.baidu.com','http://www.bing.com',]for url in start_url:yield Request(url,self.parse)def parse(self,response):print(response) #response是下载的页面yield Request('http://www.cnblogs.com',callback=self.parse)import queue
Q &#61; queue.Queue()class Engine(object):def __init__(self):self._close &#61; Noneself.max &#61; 5self.crawlling &#61; []def get_response_callback(self,content,request):self.crawlling.remove(request)rep &#61; HttpResponse(content,request)result &#61; request.callback(rep)import typesif isinstance(result,types.GeneratorType):for req in result:Q.put(req)def _next_request(self):"""去取request对象&#xff0c;并发送请求最大并发数限制:return:"""print(self.crawlling,Q.qsize())if Q.qsize() &#61;&#61; 0 and len(self.crawlling) &#61;&#61; 0:self._close.callback(None)returnif len(self.crawlling) >&#61; self.max:returnwhile len(self.crawlling) < self.max:try:req &#61; Q.get(block&#61;False) #有数据则取&#xff0c;没数据不等待数据会报错
self.crawlling.append(req)d &#61; getPage(req.url.encode(&#39;utf-8&#39;)) # 生成socket对象# 页面下载完成&#xff0c;get_response_callback&#xff0c;调用用户spider中定义的parse方法&#xff0c;并且将新请求添加到调度器
d.addCallback(self.get_response_callback,req)# 未达到最大并发数&#xff0c;可以再去调度器中获取Requestd.addCallback(lambda _:reactor.callLater(0, self._next_request))except Exception as e:print(e)return&#64;defer.inlineCallbacksdef crawl(self,spider):# 将初始Request对象添加到调度器start_requests &#61; iter(spider.start_requests())while True:try:request &#61; next(start_requests)Q.put(request)except StopIteration as e:break# 去调度器中取request&#xff0c;并发送请求# self._next_request()
reactor.callLater(0, self._next_request)self._close &#61; defer.Deferred()yield self._closespider &#61; ChoutiSpider()_active &#61; set()
engine &#61; Engine()
d &#61; engine.crawl(spider)
_active.add(d)dd &#61; defer.DeferredList(_active)
dd.addBoth(lambda a:reactor.stop())reactor.run()
from twisted.internet import reactor # 事件循环&#xff08;终止条件&#xff0c;所有的socket都已经移除&#xff09;
from twisted.web.client import getPage # socket对象&#xff08;如果下载完成&#xff0c;自动从时间循环中移除...&#xff09;
from twisted.internet import defer # defer.Deferred 特殊的socket对象 &#xff08;不会发请求&#xff0c;手动移除&#xff09;
from queue import Queueclass Request(object):"""用于封装用户请求相关信息"""def __init__(self,url,callback):self.url &#61; urlself.callback &#61; callbackclass HttpResponse(object):def __init__(self,content,request):self.content &#61; contentself.request &#61; requestclass Scheduler(object):"""任务调度器"""def __init__(self):self.q &#61; Queue()def open(self):passdef next_request(self):try:req &#61; self.q.get(block&#61;False)except Exception as e:req &#61; Nonereturn reqdef enqueue_request(self,req):self.q.put(req)def size(self):return self.q.qsize()class ExecutionEngine(object):"""引擎&#xff1a;所有调度"""def __init__(self):self._close &#61; Noneself.scheduler &#61; Noneself.max &#61; 5self.crawlling &#61; []def get_response_callback(self,content,request):self.crawlling.remove(request)response &#61; HttpResponse(content,request)result &#61; request.callback(response)import typesif isinstance(result,types.GeneratorType):for req in result:self.scheduler.enqueue_request(req)def _next_request(self):if self.scheduler.size() &#61;&#61; 0 and len(self.crawlling) &#61;&#61; 0:self._close.callback(None)returnwhile len(self.crawlling) < self.max:req &#61; self.scheduler.next_request()if not req:returnself.crawlling.append(req)d &#61; getPage(req.url.encode(&#39;utf-8&#39;))d.addCallback(self.get_response_callback,req)d.addCallback(lambda _:reactor.callLater(0,self._next_request))&#64;defer.inlineCallbacksdef open_spider(self,start_requests):self.scheduler &#61; Scheduler()yield self.scheduler.open()while True:try:req &#61; next(start_requests)except StopIteration as e:breakself.scheduler.enqueue_request(req)reactor.callLater(0,self._next_request)&#64;defer.inlineCallbacksdef start(self):self._close &#61; defer.Deferred()yield self._closeclass Crawler(object):"""用户封装调度器以及引擎的..."""def _create_engine(self):return ExecutionEngine()def _create_spider(self,spider_cls_path):""":param spider_cls_path: spider.chouti.ChoutiSpider:return:"""module_path,cls_name &#61; spider_cls_path.rsplit(&#39;.&#39;,maxsplit&#61;1)import importlibm &#61; importlib.import_module(module_path)cls &#61; getattr(m,cls_name)return cls()&#64;defer.inlineCallbacksdef crawl(self,spider_cls_path):engine &#61; self._create_engine()spider &#61; self._create_spider(spider_cls_path)start_requests &#61; iter(spider.start_requests())yield engine.open_spider(start_requests)yield engine.start()class CrawlerProcess(object):"""开启事件循环"""def __init__(self):self._active &#61; set()def crawl(self,spider_cls_path):""":param spider_cls_path::return:"""crawler &#61; Crawler()d &#61; crawler.crawl(spider_cls_path)self._active.add(d)def start(self):dd &#61; defer.DeferredList(self._active)dd.addBoth(lambda _:reactor.stop())reactor.run()class Commond(object):def run(self):crawl_process &#61; CrawlerProcess()spider_cls_path_list &#61; [&#39;spider.chouti.ChoutiSpider&#39;,&#39;spider.cnblogs.CnblogsSpider&#39;,]for spider_cls_path in spider_cls_path_list:crawl_process.crawl(spider_cls_path)crawl_process.start()if __name__ &#61;&#61; &#39;__main__&#39;:cmd &#61; Commond()cmd.run()