对照下面的脑图,理解整个Scrapy下载流程:
首先接着上篇,Engine中注册到事件循环的_next_request_from_scheduler()方法开始。
实际上注册的是_next_request(),但_next_request()中真正执行的是_next_request_from_scheduler()
看下这个方法:
1 | def _next_request_from_scheduler(self, spider): |
结合上面我总结的脑图,实际上是调用Downloader的fetch()方法:1
2
3
4
5
6
7
8def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
self.active.add(request)
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)
这个fetch方法有会调用middleware的download方法;这里的middleware是DownloaderMiddlewareManager,会实例化配置的所有下载中间件。并在download()方法中依序执行process_request、process_response方法,其中process_request是正序执行、process_response是逆序执行。并且会在所有中间件执行process_request之后,process_response执行之前执行真正的下载方法,这个方法是Downloader调用middleware时注册进来的_enqueue_request方法;看下面这个方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
def _deactivate(response):
slot.active.remove(request)
return response
slot.active.add(request)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
self._process_queue(spider, slot)
return deferred
def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active():
return
# Delay queue processing if a download_delay is configured
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break
看这里会做相应的并发控制,最终会调用_download()方法;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28def _download(self, slot, request, spider):
# The order is very important for the following deferreds. Do not change!
# 1. Create the download deferred
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)
# 3. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
slot.transferring.add(request)
def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
return _
return dfd.addBoth(finish_transferring)
这里会调用handles的download_request()方法;handlers是DownloadHandlers,会加载配置中所有的DOWNLOAD_HANDLERS并实例化。
1 | def download_request(self, request, spider): |
这个地方会根据Request的协议类型,调用相应的handler执行下载请求;下面以Http为例,继续下面的流程;
HttpDownloadHandler其实是HTTP10DownloadHandler的扩展,实际的下载器是HTTP10DownloadHandler
1 | def download_request(self, request, spider): |
这里直接调用的Twisted的相关网络实现来完成下载请求的。