热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Scrapy源码剖析:Scrapy如何完成抓取任务?

上一篇文章:Scrapy源码剖析:Scrapy有哪些核心组件?我们已经分析了Scrapy核心组件的主要职责,以及它们在初始化

上一篇文章:Scrapy源码剖析:Scrapy有哪些核心组件?我们已经分析了 Scrapy 核心组件的主要职责,以及它们在初始化时都完成了哪些工作。

这篇文章就让我们来看一下,也是 Scrapy 最核心的抓取流程是如何运行的,它是如何调度各个组件,完成整个抓取工作的。

运行入口

还是回到最初的入口,在Scrapy源码剖析:Scrapy是如何运行起来的?这篇文章中我们已经详细分析过了,在执行 Scrapy 命令时,主要经过以下几步:

  • 调用 cmdline.py 的 execute 方法

  • 找到对应的 命令实例 解析命令行

  • 构建 CrawlerProcess 实例,调用 crawl 和 start 方法开始抓取

而 crawl 方法最终是调用了 Cralwer 实例的 crawl,这个方法最终把控制权交给了Engine,而 start 方法注册好协程池,就开始异步调度执行了。

我们来看 Cralwer 的 crawl 方法:

@defer.inlineCallbacks
def crawl(self, *args, **kwargs):assert not self.crawling, "Crawling already taking place"self.crawling = Truetry:# 创建爬虫实例self.spider = self._create_spider(*args, **kwargs)# 创建引擎self.engine = self._create_engine()# 调用spider的start_requests 获取种子URLstart_requests = iter(self.spider.start_requests())# 调用engine的open_spider 交由引擎调度yield self.engine.open_spider(self.spider, start_requests)yield defer.maybeDeferred(self.engine.start)except Exception:if six.PY2:exc_info = sys.exc_info()self.crawling = Falseif self.engine is not None:yield self.engine.close()if six.PY2:six.reraise(*exc_info)raise

这里首先会创建出爬虫实例,然后创建引擎,之后调用了 spider 的 start_requests 方法,这个方法就是我们平时写的最多爬虫类的父类,它在 spiders/__init__.py 中定义:

def start_requests(self):# 根据定义好的start_urls属性 生成种子URL对象for url in self.start_urls:yield self.make_requests_from_url(url)def make_requests_from_url(self, url):# 构建Request对象return Request(url, dont_filter=True)构建请求

通过上面这段代码,我们能看到,平时我们必须要定义的 start_urls 属性,原来就是在这里用来构建 Request 的,来看 Request 的定义:

class Request(object_ref):def __init__(self, url, callback=None, method='GET', headers=None, body=None,COOKIEs=None, meta=None, encoding='utf-8', priority=0,dont_filter=False, errback=None):# 编码self._encoding = encoding# 请求方法self.method = str(method).upper()# 设置urlself._set_url(url)# 设置bodyself._set_body(body)assert isinstance(priority, int), "Request priority not an integer: %r" % priority# 优先级self.priority = priorityassert callback or not errback, "Cannot use errback without a callback"# 回调函数self.callback = callback# 异常回调函数self.errback = errback# COOKIEsself.COOKIEs = COOKIEs or {}# 构建Headerself.headers = Headers(headers or {}, encoding=encoding)# 是否需要过滤self.dont_filter = dont_filter# 附加信息self._meta = dict(meta) if meta else None

Request 对象比较简单,就是封装了请求参数、请求方法、回调以及可附加的属性信息。

当然,你也可以在子类中重写 start_requests 和 make_requests_from_url 这 2 个方法,用来自定义逻辑构建种子请求。

引擎调度

再回到 crawl 方法,构建好种子请求对象后,调用了 engine 的 open_spider:

@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):assert self.has_capacity(), "No free spider slot when opening %r" % \spider.namelogger.info("Spider opened", extra={'spider': spider})# 注册_next_request调度方法 循环调度nextcall = CallLaterOnce(self._next_request, spider)# 初始化schedulerscheduler = self.scheduler_cls.from_crawler(self.crawler)# 调用爬虫中间件 处理种子请求start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)# 封装Slot对象slot = Slot(start_requests, close_if_idle, nextcall, scheduler)self.slot = slotself.spider = spider# 调用scheduler的openyield scheduler.open(spider)# 调用scrapyer的openyield self.scraper.open_spider(spider)# 调用stats的openself.crawler.stats.open_spider(spider)yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)# 发起调度slot.nextcall.schedule()slot.heartbeat.start(5)

在这里首先构建了一个 CallLaterOnce,之后把 _next_request 方法注册了进去,看此类的实现:

class CallLaterOnce(object):# 在twisted的reactor中循环调度一个方法def __init__(self, func, *a, **kw):self._func = funcself._a = aself._kw = kwself._call = Nonedef schedule(self, delay=0):# 上次发起调度 才可再次继续调度if self._call is None:# 注册self到callLater中self._call = reactor.callLater(delay, self)def cancel(self):if self._call:self._call.cancel()def __call__(self):# 上面注册的是self 所以会执行__call__self._call = Nonereturn self._func(*self._a, **self._kw) 

这里封装了循环执行的方法类,并且注册的方法会在 twisted 的 reactor 中异步执行,以后执行只需调用 schedule,就会注册 self 到 reactor 的 callLater 中,然后它会执行 __call__ 方法,最终执行的就是我们注册的方法。

而这里我们注册的方法就是引擎的 _next_request,也就是说,此方法会循环调度,直到程序退出。

之后调用了爬虫中间件的 process_start_requests 方法,你可以定义多个自己的爬虫中间件,每个类都重写此方法,爬虫在调度之前会分别调用你定义好的爬虫中间件,来处理初始化请求,你可以进行过滤、加工、筛选以及你想做的任何逻辑。

这样做的好处就是,把想做的逻辑拆分成多个中间件,每个中间件功能独立,而且维护起来更加清晰。

调度器

接下来就要开始调度任务了,这里首先调用了 Scheduler 的 open:

def open(self, spider):self.spider = spider# 实例化优先级队列self.mqs = self.pqclass(self._newmq)# 如果定义了dqdir则实例化基于磁盘的队列self.dqs = self._dq() if self.dqdir else None# 调用请求指纹过滤器的open方法return self.df.open()def _dq(self):# 实例化磁盘队列activef = join(self.dqdir, 'active.json')if exists(activef):with open(activef) as f:prios = json.load(f)else:prios = ()q = self.pqclass(self._newdq, startprios=prios)if q:logger.info("Resuming crawl (%(queuesize)d requests scheduled)",{'queuesize': len(q)}, extra={'spider': self.spider})return q

在 open 方法中,调度器会实例化出优先级队列,以及根据 dqdir是否配置,决定是否使用磁盘队列,最后调用了请求指纹过滤器的 open 方法,这个方法在父类 BaseDupeFilter 中定义:

class BaseDupeFilter(object):# 过滤器基类,子类可重写以下方法@classmethoddef from_settings(cls, settings):return cls()def request_seen(self, request):# 请求过滤return Falsedef open(self):# 可重写 完成过滤器的初始化工作passdef close(self, reason):# 可重写 完成关闭过滤器工作passdef log(self, request, spider):pas

请求过滤器提供了请求过滤的具体实现方式,Scrapy 默认提供了 RFPDupeFilter 过滤器实现过滤重复请求的逻辑,这里先对这个类有个了解,后面会讲具体是如何过滤重复请求的。

Scraper

再之后就调用 Scraper 的 open_spider 方法,在之前的文章中我们提到过,Scraper 类是连接 EngineSpiderItem Pipeline 这 3 个组件的桥梁:

@defer.inlineCallbacks
def open_spider(self, spider):self.slot = Slot()# 调用所有pipeline的open_spideryield self.itemproc.open_spider(spider)

这里的主要逻辑是 Scraper 调用所有 Pipeline 的 open_spider 方法,如果我们定义了多个 Pipeline 输出类,可以重写 open_spider 完成每个 Pipeline 在输出前的初始化工作。

循环调度

调用了一系列组件的 open 方法后,最后调用了 nextcall.schedule() 开始调度,也就是循环执行在上面注册的 _next_request 方法:

def _next_request(self, spider):# 此方法会循环调度slot = self.slotif not slot:return# 暂停if self.paused:return# 是否等待while not self._needs_backout(spider):# 从scheduler中获取request# 注意:第一次获取时,是没有的,也就是会break出来# 从而执行下面的逻辑if not self._next_request_from_scheduler(spider):break# 如果start_requests有数据且不需要等待if slot.start_requests and not self._needs_backout(spider):try:# 获取下一个种子请求request = next(slot.start_requests)except StopIteration:slot.start_requests = Noneexcept Exception:slot.start_requests = Nonelogger.error('Error while obtaining start requests',exc_info=True, extra={'spider': spider})else:# 调用crawl,实际是把request放入scheduler的队列中self.crawl(request, spider)# 空闲则关闭spiderif self.spider_is_idle(spider) and slot.close_if_idle:self._spider_idle(spider)def _needs_backout(self, spider):# 是否需要等待,取决4个条件# 1. Engine是否stop# 2. slot是否close# 3. downloader下载超过预设# 4. scraper处理response超过预设slot = self.slotreturn not self.running \or slot.closing \or self.downloader.needs_backout() \or self.scraper.slot.needs_backout()def _next_request_from_scheduler(self, spider):slot = self.slot# 从scheduler拿出下个requestrequest = slot.scheduler.next_request()if not request:return# 下载d = self._download(request, spider)# 注册成功、失败、出口回调方法d.addBoth(self._handle_downloader_output, request, spider)d.addErrback(lambda f: logger.info('Error while handling downloader output',exc_info=failure_to_exc_info(f),extra={'spider': spider}))d.addBoth(lambda _: slot.remove_request(request))d.addErrback(lambda f: logger.info('Error while removing request from slot',exc_info=failure_to_exc_info(f),extra={'spider': spider}))d.addBoth(lambda _: slot.nextcall.schedule())d.addErrback(lambda f: logger.info('Error while scheduling new request',exc_info=failure_to_exc_info(f),extra={'spider': spider}))return ddef crawl(self, request, spider):assert spider in self.open_spiders, \"Spider %r not opened when crawling: %s" % (spider.name, request)# request放入scheduler队列,调用nextcall的scheduleself.schedule(request, spider)self.slot.nextcall.schedule()def schedule(self, request, spider):self.signals.send_catch_log(signal=signals.request_scheduled,request=request, spider=spider)# 调用scheduler的enqueue_request,把request放入scheduler队列if not self.slot.scheduler.enqueue_request(request):self.signals.send_catch_log(signal=signals.request_dropped,request=request, spider=spider)

_next_request 方法首先调用 _needs_backout 检查是否需要等待,等待的条件有以下几种情况:

  • 引擎是否主动关闭

  • Slot是否关闭

  • 下载器在网络下载时是否超过预设参数

  • Scraper处理输出是否超过预设参数

如果不需要等待,则调用 _next_request_from_scheduler,此方法从名字上就能看出,主要是从 Schduler 中获取 Request

这里要注意,在第一次调用此方法时,Scheduler 中是没有放入任何 Request 的,这里会直接break 出来,执行下面的逻辑,而下面就会调用 crawl 方法,实际是把请求放到 Scheduler 的请求队列,放入队列的过程会经过请求过滤器校验是否重复。

下次再调用 _next_request_from_scheduler 时,就能从 Scheduler 中获取到下载请求,然后执行下载动作。

先来看第一次调度,执行 crawl:

def crawl(self, request, spider):assert spider in self.open_spiders, \"Spider %r not opened when crawling: %s" % (spider.name, request)# 放入Scheduler队列self.schedule(request, spider)# 进行下一次调度self.slot.nextcall.schedule()def schedule(self, request, spider):self.signals.send_catch_log(signal=signals.request_scheduled,request=request, spider=spider)# 放入Scheduler队列if not self.slot.scheduler.enqueue_request(request):self.signals.send_catch_log(signal=signals.request_dropped,request=request, spider=spider)

调用引擎的 crawl 实际就是把请求放入 Scheduler 的队列中,下面看请求是如何入队列的。

请求入队

Scheduler 请求入队方法:

def enqueue_request(self, request):# 请求入队 若请求过滤器验证重复 返回Falseif not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return False# 磁盘队列是否入队成功dqok = self._dqpush(request)if dqok:self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)else:# 没有定义磁盘队列 则使用内存队列self._mqpush(request)self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)self.stats.inc_value('scheduler/enqueued', spider=self.spider)return Truedef _dqpush(self, request):# 是否定义磁盘队列if self.dqs is None:returntry:# Request对象转dictreqd = request_to_dict(request, self.spider)# 放入磁盘队列self.dqs.push(reqd, -request.priority)except ValueError as e:  # non serializable requestif self.logunser:msg = ("Unable to serialize request: %(request)s - reason:"" %(reason)s - no more unserializable requests will be"" logged (stats being collected)")logger.warning(msg, {'request': request, 'reason': e},exc_info=True, extra={'spider': self.spider})self.logunser = Falseself.stats.inc_value('scheduler/unserializable',spider=self.spider)returnelse:return Truedef _mqpush(self, request):# 入内存队列self.mqs.push(request, -request.priority)

在上一篇文章时有说到,调度器主要定义了 2 种队列:基于磁盘队列、基于内存队列。

如果在实例化 Scheduler 时候传入 jobdir,则使用磁盘队列,否则使用内存队列,默认使用内存队列。

指纹过滤

上面说到,在请求入队之前,首先会通过请求指纹过滤器检查请求是否重复,也就是调用了过滤器的 request_seen:

def request_seen(self, request):# 生成请求指纹fp = self.request_fingerprint(request)# 请求指纹如果在指纹集合中 则认为重复if fp in self.fingerprints:return True# 不重复则记录此指纹self.fingerprints.add(fp)# 实例化如果有path则把指纹写入文件if self.file:self.file.write(fp + os.linesep)def request_fingerprint(self, request):# 调用utils.request的request_fingerprintreturn request_fingerprint(request)

utils.request 的 request_fingerprint 逻辑如下:

def request_fingerprint(request, include_headers=None):"""生成请求指纹"""# 指纹生成是否包含headersif include_headers:include_headers = tuple(to_bytes(h.lower())for h in sorted(include_headers))cache = _fingerprint_cache.setdefault(request, {})if include_headers not in cache:# 使用sha1算法生成指纹fp = hashlib.sha1()fp.update(to_bytes(request.method))fp.update(to_bytes(canonicalize_url(request.url)))fp.update(request.body or b'')if include_headers:for hdr in include_headers:if hdr in request.headers:fp.update(hdr)for v in request.headers.getlist(hdr):fp.update(v)cache[include_headers] = fp.hexdigest()return cache[include_headers]

这个过滤器先是通过 Request 对象生成一个请求指纹,在这里使用 sha1 算法,并记录到指纹集合,每次请求入队前先到这里验证一下指纹集合,如果已存在,则认为请求重复,则不会重复入队列。

不过如果我想不校验重复,也想重复爬取怎么办?看 enqueue_request 的第一行判断,仅需将 Request 实例的 dont_filter 设置为 True 就可以重复抓取此请求,非常灵活。

Scrapy 就是通过此逻辑实现重复请求的过滤,默认情况下,重复请求是不会进行重复抓取的。

下载请求

请求第一次进来后,肯定是不重复的,那么则会正常进入调度器队列。之后下一次调度,再次调用 _next_request_from_scheduler 方法,此时调用调度器的 next_request 方法,就是从调度器队列中取出一个请求,这次就要开始进行网络下载了,也就是调用 _download:

def _download(self, request, spider):# 下载请求slot = self.slotslot.add_request(request)def _on_success(response):# 成功回调 结果必须是Request或Responseassert isinstance(response, (Response, Request))if isinstance(response, Response):# 如果下载后结果为Response 返回Responseresponse.request = requestlogkws = self.logformatter.crawled(request, response, spider)logger.log(*logformatter_adapter(logkws), extra={'spider': spider})self.signals.send_catch_log(signal=signals.response_received, \response=response, request=request, spider=spider)return responsedef _on_complete(_):# 此次下载完成后 继续进行下一次调度slot.nextcall.schedule()return _# 调用Downloader进行下载dwld = self.downloader.fetch(request, spider)# 注册成功回调dwld.addCallbacks(_on_success)# 结束回调dwld.addBoth(_on_complete)return dwld

在进行网络下载时,调用了 Downloader 的 fetch:

def fetch(self, request, spider):def _deactivate(response):# 下载结束后删除此记录self.active.remove(request)return response# 下载前记录处理中的请求self.active.add(request)# 调用下载器中间件download 并注册下载成功的回调方法是self._enqueue_requestdfd = self.middleware.download(self._enqueue_request, request, spider)# 注册结束回调return dfd.addBoth(_deactivate)

这里调用下载器中间件的 download,并注册下载成功的回调方法是 _enqueue_request,来看下载方法:

def download(self, download_func, request, spider):@defer.inlineCallbacksdef process_request(request):# 如果下载器中间件有定义process_request 则依次执行for method in self.methods['process_request']:response = yield method(request=request, spider=spider)assert response is None or isinstance(response, (Response, Request)), \'Middleware %s.process_request must return None, Response or Request, got %s' % \(six.get_method_self(method).__class__.__name__, response.__class__.__name__)# 如果下载器中间件有返回值 直接返回此结果if response:defer.returnValue(response)# 如果下载器中间件没有返回值,则执行注册进来的方法 也就是Downloader的_enqueue_requestdefer.returnValue((yield download_func(request=request,spider=spider)))@defer.inlineCallbacksdef process_response(response):assert response is not None, 'Received None in process_response'if isinstance(response, Request):defer.returnValue(response)# 如果下载器中间件有定义process_response 则依次执行for method in self.methods['process_response']:response = yield method(request=request, response=response,spider=spider)assert isinstance(response, (Response, Request)), \'Middleware %s.process_response must return Response or Request, got %s' % \(six.get_method_self(method).__class__.__name__, type(response))if isinstance(response, Request):defer.returnValue(response)defer.returnValue(response)@defer.inlineCallbacksdef process_exception(_failure):exception = _failure.value# 如果下载器中间件有定义process_exception 则依次执行for method in self.methods['process_exception']:response = yield method(request=request, exception=exception,spider=spider)assert response is None or isinstance(response, (Response, Request)), \'Middleware %s.process_exception must return None, Response or Request, got %s' % \(six.get_method_self(method).__class__.__name__, type(response))if response:defer.returnValue(response)defer.returnValue(_failure)# 注册执行、错误、回调方法deferred = mustbe_deferred(process_request, request)deferred.addErrback(process_exception)deferred.addCallback(process_response)return deferred

在下载过程中,首先找到所有定义好的下载器中间件,包括内置定义好的,也可以自己扩展下载器中间件,下载前先依次执行 process_request,可对 Request 进行加工、处理、校验等操作,然后发起真正的网络下载,也就是第一个参数 download_func,在这里是 Downloader 的 _enqueue_request 方法:

下载成功后回调 Downloader的 _enqueue_request:

def _enqueue_request(self, request, spider):# 加入下载请求队列key, slot = self._get_slot(request, spider)request.meta['download_slot'] = keydef _deactivate(response):slot.active.remove(request)return responseslot.active.add(request)deferred = defer.Deferred().addBoth(_deactivate)# 下载队列slot.queue.append((request, deferred))# 处理下载队列self._process_queue(spider, slot)return deferreddef _process_queue(self, spider, slot):if slot.latercall and slot.latercall.active():return# 如果延迟下载参数有配置 则延迟处理队列now = time()delay = slot.download_delay()if delay:penalty = delay - now + slot.lastseenif penalty > 0:slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)return# 处理下载队列while slot.queue and slot.free_transfer_slots() > 0:slot.lastseen = now# 从下载队列中取出下载请求request, deferred = slot.queue.popleft()# 开始下载dfd = self._download(slot, request, spider)dfd.chainDeferred(deferred)# 延迟if delay:self._process_queue(spider, slot)breakdef _download(self, slot, request, spider):# 注册方法 调用handlers的download_requestdfd = mustbe_deferred(self.handlers.download_request, request, spider)# 注册下载完成回调方法def _downloaded(response):self.signals.send_catch_log(signal=signals.response_downloaded,response=response,request=request,spider=spider)return responsedfd.addCallback(_downloaded)slot.transferring.add(request)def finish_transferring(_):slot.transferring.remove(request)# 下载完成后调用_process_queueself._process_queue(spider, slot)return _return dfd.addBoth(finish_transferring)

这里也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求是调用了 self.handlers.download_request:

def download_request(self, request, spider):# 获取请求的schemescheme = urlparse_cached(request).scheme# 根据scheeme获取下载处理器handler = self._get_handler(scheme)if not handler:raise NotSupported("Unsupported URL scheme '%s': %s" %(scheme, self._notconfigured[scheme]))# 开始下载 并返回结果return handler.download_request(request, spider)def _get_handler(self, scheme):# 根据scheme获取对应的下载处理器# 配置文件中定义好了http、https、ftp等资源的下载处理器if scheme in self._handlers:return self._handlers[scheme]if scheme in self._notconfigured:return Noneif scheme not in self._schemes:self._notconfigured[scheme] = 'no handler available for that scheme'return Nonepath = self._schemes[scheme]try:# 实例化下载处理器dhcls = load_object(path)dh = dhcls(self._crawler.settings)except NotConfigured as ex:self._notconfigured[scheme] = str(ex)return Noneexcept Exception as ex:logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"',{"clspath": path, "scheme": scheme},exc_info=True,  extra={'crawler': self._crawler})self._notconfigured[scheme] = str(ex)return Noneelse:self._handlers[scheme] = dhreturn self._handlers[scheme]

下载前,先通过解析 request 的 scheme 来获取对应的下载处理器,默认配置文件中定义的下载处理器如下:

DOWNLOAD_HANDLERS_BASE = {'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler','http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler','https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler','s3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler','ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}

然后调用 download_request 方法,完成网络下载,这里不再详细讲解每个处理器的实现,简单来说,你可以把它想象成封装好的网络下载库,输入URL,它会给你输出下载结果,这样方便理解。

在下载过程中,如果发生异常情况,则会依次调用下载器中间件的 process_exception 方法,每个中间件只需定义自己的异常处理逻辑即可。

如果下载成功,则会依次执行下载器中间件的 process_response 方法,每个中间件可以进一步处理下载后的结果,最终返回。

这里值得提一下,process_request 方法是每个中间件顺序执行的,而 process_response 和 process_exception 方法是每个中间件倒序执行的,具体可看一下 DownaloderMiddlewareManager 的 _add_middleware 方法,就可以明白是如何注册这个方法链的。

拿到最终的下载结果后,再回到 ExecuteEngine 的 _next_request_from_scheduler 中,会看到调用了 _handle_downloader_output,也就是处理下载结果的逻辑:

def _handle_downloader_output(self, response, request, spider):# 下载结果必须是Request、Response、Failure其一assert isinstance(response, (Request, Response, Failure)), response# 如果是Request 则再次调用crawl 执行Scheduler的入队逻辑if isinstance(response, Request):self.crawl(response, spider)return# 如果是Response或Failure 则调用scraper的enqueue_scrape进一步处理# 主要是和Spiders和Pipeline交互d = self.scraper.enqueue_scrape(response, request, spider)d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',exc_info=failure_to_exc_info(f),extra={'spider': spider}))return d

拿到下载结果后,主要分 2 个逻辑:

  • 如果返回的是 Request 实例,则直接再次放入 Scheduler 请求队列

  • 如果返回的是是 Response 或 Failure 实例,则调用 Scraper 的 enqueue_scrape 方法,做进一步处理

处理下载结果

请求入队逻辑不用再说,前面已经讲过。现在主要看 Scraper 的 enqueue_scrape,看Scraper 组件是如何处理后续逻辑的:

def enqueue_scrape(self, response, request, spider):# 加入Scrape处理队列slot = self.slotdfd = slot.add_response_request(response, request)def finish_scraping(_):slot.finish_response(response, request)self._check_if_closing(spider, slot)self._scrape_next(spider, slot)return _dfd.addBoth(finish_scraping)dfd.addErrback(lambda f: logger.error('Scraper bug processing %(request)s',{'request': request},exc_info=failure_to_exc_info(f),extra={'spider': spider}))self._scrape_next(spider, slot)return dfddef _scrape_next(self, spider, slot):while slot.queue:# 从Scraper队列中获取一个待处理的任务response, request, deferred = slot.next_response_request_deferred()self._scrape(response, request, spider).chainDeferred(deferred)def _scrape(self, response, request, spider):assert isinstance(response, (Response, Failure))# 调用_scrape2继续处理dfd = self._scrape2(response, request, spider)# 注册异常回调dfd.addErrback(self.handle_spider_error, request, response, spider)# 出口回调dfd.addCallback(self.handle_spider_output, request, response, spider)return dfddef _scrape2(self, request_result, request, spider):# 如果结果不是Failure实例 则调用爬虫中间件管理器的scrape_responseif not isinstance(request_result, Failure):return self.spidermw.scrape_response(self.call_spider, request_result, request, spider)else:# 直接调用call_spiderdfd = self.call_spider(request_result, request, spider)return dfd.addErrback(self._log_download_errors, request_result, request, spider)

首先把请求和响应加入到 Scraper 的处理队列中,然后从队列中获取到任务,如果不是异常结果,则调用爬虫中间件管理器的 scrape_response 方法:

def scrape_response(self, scrape_func, response, request, spider):fname = lambda f:'%s.%s' % (six.get_method_self(f).__class__.__name__,six.get_method_function(f).__name__)def process_spider_input(response):# 执行一系列爬虫中间件的process_spider_inputfor method in self.methods['process_spider_input']:try:result = method(response=response, spider=spider)assert result is None, \'Middleware %s must returns None or ' \'raise an exception, got %s ' \% (fname(method), type(result))except:return scrape_func(Failure(), request, spider)# 执行完中间件的一系列process_spider_input方法后 执行call_spiderreturn scrape_func(response, request, spider)def process_spider_exception(_failure):# 执行一系列爬虫中间件的process_spider_exceptionexception = _failure.valuefor method in self.methods['process_spider_exception']:result = method(response=response, exception=exception, spider=spider)assert result is None or _isiterable(result), \'Middleware %s must returns None, or an iterable object, got %s ' % \(fname(method), type(result))if result is not None:return resultreturn _failuredef process_spider_output(result):# 执行一系列爬虫中间件的process_spider_outputfor method in self.methods['process_spider_output']:result = method(response=response, result=result, spider=spider)assert _isiterable(result), \'Middleware %s must returns an iterable object, got %s ' % \(fname(method), type(result))return result# 执行process_spider_inputdfd = mustbe_deferred(process_spider_input, response)# 注册异常回调dfd.addErrback(process_spider_exception)# 注册出口回调dfd.addCallback(process_spider_output)return dfd

有没有感觉套路很熟悉?与上面下载器中间件调用方式非常相似,也调用一系列的前置方法,再执行真正的处理逻辑,最后执行一系列的后置方法。

回调爬虫

接下来看一下,Scrapy 是如何执行我们写好的爬虫逻辑的,也就是 call_spider 方法,这里回调我们写好的爬虫类:

def call_spider(self, result, request, spider):# 回调爬虫模块result.request = requestdfd = defer_result(result)# 注册回调方法 取得request.callback 如果未定义则调用爬虫模块的parse方法dfd.addCallbacks(request.callback or spider.parse, request.errback)return dfd.addCallback(iterate_spider_output)

看到这里,你应该更熟悉,平时我们写的最多的爬虫代码,parse 则是第一个回调方法。之后爬虫类拿到下载结果,就可以定义下载后的 callback 方法,也是在这里进行回调执行的。

处理输出

在与爬虫类交互完成之后,Scraper 调用了 handle_spider_output 方法处理爬虫的输出结果:

def handle_spider_output(self, result, request, response, spider):# 处理爬虫输出结果if not result:return defer_succeed(None)it = iter_errback(result, self.handle_spider_error, request, response, spider)# 注册_process_spidermw_outputdfd = parallel(it, self.concurrent_items,self._process_spidermw_output, request, response, spider)return dfddef _process_spidermw_output(self, output, request, response, spider):# 处理Spider模块返回的每一个Request/Itemif isinstance(output, Request):# 如果结果是Request 再次入Scheduler的请求队列self.crawler.engine.crawl(request=output, spider=spider)elif isinstance(output, (BaseItem, dict)):# 如果结果是BaseItem/dictself.slot.itemproc_size += 1# 调用Pipeline的process_itemdfd = self.itemproc.process_item(output, spider)dfd.addBoth(self._itemproc_finished, output, response, spider)return dfdelif output is None:passelse:typename = type(output).__name__logger.error('Spider must return Request, BaseItem, dict or None, ''got %(typename)r in %(request)s',{'request': request, 'typename': typename},extra={'spider': spider})

执行完我们自定义的解析逻辑后,解析方法可返回新的 Request 或 BaseItem 实例。

如果是新的请求,则再次通过 Scheduler 进入请求队列,如果是 BaseItem 实例,则调用 Pipeline 管理器,依次执行 process_item。我们想输出结果时,只需要定义 Pepeline 类,然后重写这个方法就可以了。

ItemPipeManager 处理逻辑:

class ItemPipelineManager(MiddlewareManager):component_name = 'item pipeline'@classmethoddef _get_mwlist_from_settings(cls, settings):return build_component_list(settings.getwithbase('ITEM_PIPELINES'))def _add_middleware(self, pipe):super(ItemPipelineManager, self)._add_middleware(pipe)if hasattr(pipe, 'process_item'):self.methods['process_item'].append(pipe.process_item)def process_item(self, item, spider):# 依次调用Pipeline的process_itemreturn self._process_chain('process_item', item, spider)

可以看到 ItemPipeManager 也是一个中间件,和之前下载器中间件管理器和爬虫中间件管理器类似,如果子类有定义 process_item,则依次执行它。

执行完之后,调用 _itemproc_finished:

def _itemproc_finished(self, output, item, response, spider):self.slot.itemproc_size -= 1if isinstance(output, Failure):ex = output.value# 如果在Pipeline处理中抛DropItem异常 忽略处理结果if isinstance(ex, DropItem):logkws = self.logformatter.dropped(item, ex, response, spider)logger.log(*logformatter_adapter(logkws), extra={'spider': spider})return self.signals.send_catch_log_deferred(signal=signals.item_dropped, item=item, response=response,spider=spider, exception=output.value)else:logger.error('Error processing %(item)s', {'item': item},exc_info=failure_to_exc_info(output),extra={'spider': spider})else:logkws = self.logformatter.scraped(output, response, spider)logger.log(*logformatter_adapter(logkws), extra={'spider': spider})return self.signals.send_catch_log_deferred(signal=signals.item_scraped, item=output, response=response,spider=spider)

这里可以看到,如果想在 Pipeline 中丢弃某个结果,直接抛出 DropItem 异常即可,Scrapy 会进行对应的处理。

到这里,抓取结果会根据自定义的输出类,然后输出到指定位置,而新的 Request 则会再次进入请求队列,等待引擎下一次调度,也就是再次调用 ExecutionEngine 的 _next_request,直至请求队列没有新的任务,整个程序退出。

CrawlerSpider

以上,基本上整个核心抓取流程就讲完了。

这里再简单说一下 CrawlerSpider 类,我们平时用的也比较多,它其实就是继承了 Spider 类,然后重写了 parse 方法(这也是继承此类不要重写此方法的原因),并结合 Rule 规则类,来完成 Request 的自动提取逻辑。

Scrapy 提供了这个类方便我们更快速地编写爬虫代码,我们也可以基于此类进行再次封装,让我们的爬虫代码写得更简单。

由此我们也可看出,Scrapy 的每个模块的实现都非常纯粹,每个组件都通过配置文件定义连接起来,如果想要扩展或替换,只需定义并实现自己的处理逻辑即可,其他模块均不受任何影响,所以我们也可以看到,业界有非常多的 Scrapy 插件,都是通过此机制来实现的。

总结

这篇文章的代码量较多,也是 Scrapy 最为核心的抓取流程,如果你能把这块逻辑搞清楚了,那对 Scrapy 开发新的插件,或者在它的基础上进行二次开发也非常简单了。

总结一下整个抓取流程,还是用这两张图表示再清楚不过:

Scrapy 整体给我的感觉是,虽然它只是个单机版的爬虫框架,但我们可以非常方便地编写插件,或者自定义组件替换默认的功能,从而定制化我们自己的爬虫,最终可以实现一个功能强大的爬虫框架,例如分布式、代理调度、并发控制、可视化、监控等功能,它的灵活度非常高。

更多阅读

2020 年最佳流行 Python 库 Top 10

2020 Python中文社区热门文章 Top 10

5分钟快速掌握 Python 定时任务框架

特别推荐


点击下方阅读原文加入社区会员


推荐阅读
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
  • MySQL语句大全:创建、授权、查询、修改等【MySQL】的使用方法详解
    本文详细介绍了MySQL语句的使用方法,包括创建用户、授权、查询、修改等操作。通过连接MySQL数据库,可以使用命令创建用户,并指定该用户在哪个主机上可以登录。同时,还可以设置用户的登录密码。通过本文,您可以全面了解MySQL语句的使用方法。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • Iamtryingtocreateanarrayofstructinstanceslikethis:我试图创建一个这样的struct实例数组:letinstallers: ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 标题: ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 背景应用安全领域,各类攻击长久以来都危害着互联网上的应用,在web应用安全风险中,各类注入、跨站等攻击仍然占据着较前的位置。WAF(Web应用防火墙)正是为防御和阻断这类攻击而存在 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
author-avatar
mobiledu2502861197
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有