亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

首頁 慕課教程 Scrapy 入門教程 Scrapy 入門教程 深入分析 Scrapy 的 Pipeline 原理

深入理解 Scrapy 的 Pipeline

今天我們來深入學習 Scrapy 框架 Pipeline 的工作原理。這一次我們采取一種新的學習方式:先提出疑問,然后從源碼中進行解答,直到最后我們徹底搞清楚 Pipeline 的工作流程。

1. 問題描述

這一小節我們將從源碼的角度來分析 Pipeline 的工作過程?,F在我先提出幾個疑問:

  • Scrapy 框架中使用 Pipeline 處理 Item 的代碼在哪里?為什么我在 settings.py 中設置了 ITEM_PIPELINES 屬性值,Scrapy 就能將其作為 Pipeline 去處理對應 Spider 生成的 Item 呢?
  • 定義 Pipeline 的那四個方法來自哪里?為什么一定需要 process_item() 方法?
  • 第12節中抓取起點月票榜小說時用到了圖片管道,該管道的一個詳細的處理流程是怎樣的,即它如何實現圖片下載?

帶著這些疑問,我們來進入源碼中尋找答案。

2. 源碼解惑

2.1 Item Pipeline 的管理器類

還記得上一小節我們追蹤 Spider 中間件的代碼時,在 scrapy/core/scraper.py 中找到了 Spider 中間件處理 Spider 模塊返回結果的方法,其代碼內容如下:

# 源碼位置:scrapy/core/scraper.py
# ...

class Scraper:
    # ...
    
    def _process_spidermw_output(self, output, request, response, spider):
        """Process each Request/Item (given in the output parameter) returned
        from the given spider
        """
        if isinstance(output, Request):
            # 如果spider中間件返回的是Request,則繼續調用引擎去處理請求
            self.crawler.engine.crawl(request=output, spider=spider)
        elif is_item(output):
            # 如果spider中間件返回的是item,則調用self.itemproc對象的process_item()方法處理
            self.slot.itemproc_size += 1
            dfd = self.itemproc.process_item(output, spider)
            dfd.addBoth(self._itemproc_finished, output, response, spider)
            return dfd
        elif output is None:
            pass
        else:
            # 打印錯誤日志
            # ...

從上面的代碼我們知道,對于 Spider 中間件模塊最后返回的 Item 類型數據會調用 self.itemproc 對象的 process_item() 方法處理,那么這個 self.itemproc 對象是什么呢?找到 Scraper 類的 __init__() 方法:

# 源碼位置:scrapy/core/scraper.py
# ...

class Scraper:

    def __init__(self, crawler):
        # ...
        itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
        self.itemproc = itemproc_cls.from_crawler(crawler)
        # ...
        
    # ...

來看默認的配置中關于 ITEM_PROCESSOR 的值,如下:

# 源碼位置: scrapy/settings/default_settings.py
# ...
ITEM_PROCESSOR = 'scrapy.pipelines.ItemPipelineManager'
# ...

單看這個類的名稱,又是一個某某管理器類,前面我們學了下載中間件管理類、Spider 中間件管理類,分別管理下載中間件類以及 Spider 中間件類,維護所屬類方法的處理順序。這里我們也是需要一個同樣功能的管理類,來保證依次處理相應的 Item pipelines。我們進入該管理器類,閱讀其實現代碼:

# 源碼位置:scrapy/

from scrapy.middleware import MiddlewareManager
from scrapy.utils.conf import build_component_list
from scrapy.utils.defer import deferred_f_from_coro_f


class ItemPipelineManager(MiddlewareManager):

    component_name = 'item pipeline'

    @classmethod
    def _get_mwlist_from_settings(cls, settings):
        return build_component_list(settings.getwithbase('ITEM_PIPELINES'))

    def _add_middleware(self, pipe):
        super(ItemPipelineManager, self)._add_middleware(pipe)
        if hasattr(pipe, 'process_item'):
           self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item))

    def process_item(self, item, spider):
        return self._process_chain('process_item', item, spider)

同樣,這個管理類直接就繼承了前面的中間件管理器類,其代碼量非常少,十分容易理解。

首先它和所有的中間件管理類一樣從全局配置中獲的對應管理的 pipelines,這個配置正是 ITEM_PIPELINES。其次,注意到這個 _add_middleware() 方法中有個調用父類的 _add_middleware() 方法,而父類中該方法的代碼如下:

# 源碼位置: scrapy/middleware.py
# ...

class MiddlewareManager:
    # ...
    
    def _add_middleware(self, mw):
        if hasattr(mw, 'open_spider'):
            self.methods['open_spider'].append(mw.open_spider)
        if hasattr(mw, 'close_spider'):
            self.methods['close_spider'].appendleft(mw.close_spider)

我們從而得知,在 pipeline 中會將 open_spider()、close_spider() 以及 process_item() 方法加入到對應的處理鏈中,且 MiddlewareManager 類中 from_crawler() 是一個類方法,因此對于繼承該類的子類也同樣會有該方法,也即具備了通過 Crawler 類對象實例化的能力。

2.2 Scrapy 框架內置的 Pipelines 分析

前面第12節中,我們在介紹 Scrapy 框架的管道內容時,使用了其內置的圖片處理管道 (ImagesPipeline),它對應的代碼位置為:scrapy/pipelines/images.py。接下來,我們將分析其源碼,看看如何實現圖片下載的功能。

首先看看類繼承關系:在 images.py 中定義的 ImagesPipeline 繼承了 files.py 中定義的 FilesPipeline 類;而 FilesPipeline 類又繼承至 media.py 中定義的 MediaPipeline 類。因此,我們先從分析基類開始,我們從管道的兩個核心方法開始入手:

  • 初始化方法:__init__() ;
  • Item 核心處理方法:process_item()

首先來看初始化的代碼,如下:

# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:

    LOG_FAILED_RESULTS = True

    class SpiderInfo:
        def __init__(self, spider):
            self.spider = spider
            self.downloading = set()
            self.downloaded = {}
            self.waiting = defaultdict(list)

    def __init__(self, download_func=None, settings=None):
        self.download_func = download_func

        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)
        resolve = functools.partial(self._key_for_pipe,
                                    base_class_name="MediaPipeline",
                                    settings=settings)
        self.allow_redirects = settings.getbool(
            resolve('MEDIA_ALLOW_REDIRECTS'), False
        )
        self._handle_statuses(self.allow_redirects)

    def _handle_statuses(self, allow_redirects):
        # 默認不允許重定向
        self.handle_httpstatus_list = None
        if allow_redirects:
            # 當設置了allow_redirects時,會考慮處理存在3xx的下載地址
            self.handle_httpstatus_list = SequenceExclude(range(300, 400))

    def _key_for_pipe(self, key, base_class_name=None, settings=None):
        """
        >>> MediaPipeline()._key_for_pipe("IMAGES")
        'IMAGES'
        >>> class MyPipe(MediaPipeline):
        ...     pass
        >>> MyPipe()._key_for_pipe("IMAGES", base_class_name="MediaPipeline")
        'MYPIPE_IMAGES'
        """
        class_name = self.__class__.__name__
        formatted_key = "{}_{}".format(class_name.upper(), key)
        if (
            not base_class_name
            or class_name == base_class_name
            or settings and not settings.get(formatted_key)
        ):
            return key
        return formatted_key
    
    # ...

上面的類中又定義了一個類:SpiderInfo ,這個類只是用來保存多個數據用的。此外,初始化方法中主要讀取相關的配置,判斷是否需要允許下載的 URL 重定向。該參數在 Scrapy 官方文檔中的說明如下:

圖片描述

處理管道媒體文件下載地址重定向問題

接下來是核心的處理 Item 的方法:

# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
    # ...
    
    def process_item(self, item, spider):
        info = self.spiderinfo
        # 從item中獲取請求列表 
        requests = arg_to_iter(self.get_media_requests(item, info))
        # 形成相關的處理鏈表
        dlist = [self._process_request(r, info) for r in requests]
        dfd = DeferredList(dlist, consumeErrors=1)
        # 上述的處理全部完成后的回調
        return dfd.addCallback(self.item_completed, item, info)
    
    # ...

我們知道管道類中處理 Item 的核心方法是 process_item() ,上面的 process_item() 方法先調用對象的 get_media_requests() 方法從輸入的 item 中獲取相應的請求列表,然后在形成對應的請求列表,處理請求的方法為:_process_request(),最后所有的請求完成后會執行對象的 item_completed() 方法。

# 源碼位置:scrapy/pipelines/media.py
# ...

class MediaPipeline:
    # ...
    
    def get_media_requests(self, item, info):
        """Returns the media requests to download"""
        pass
    
    # ...

這個 get_media_requests() 需要在后續的繼承類中實現。接下來看處理下載請求的方法:

# 源碼位置:scrapy/pipelines/media.py
# ...

class MediaPipeline:
    # ...
    
    def _process_request(self, request, info):
        # 每個請求計算一個指紋,以保證后面不重復請求
        fp = request_fingerprint(request)
        # 請求回調
        cb = request.callback or (lambda _: _)
        # 請求錯誤回調
        eb = request.errback
        request.callback = None
        request.errback = None

        # 如果已經請求過了,直接取緩存的結果
        if fp in info.downloaded:
            return defer_result(info.downloaded[fp]).addCallbacks(cb, eb)

        # Otherwise, wait for result
        wad = Deferred().addCallbacks(cb, eb)
        # 將請求的回調鏈加入對應的請求key中
        info.waiting[fp].append(wad)

        # 檢查請求是否正在下載中,避免二次請求
        if fp in info.downloading:
            return wad

        # 將請求加入正在下載的隊列
        info.downloading.add(fp)
        # 創建Deferred對象,對應方法為self.media_to_download()
        dfd = mustbe_deferred(self.media_to_download, request, info)
        # 在self.media_to_download()方法處理完后回調self._check_media_to_download()方法
        dfd.addCallback(self._check_media_to_download, request, info)
        # 此外,再加入統一回調方法
        dfd.addBoth(self._cache_result_and_execute_waiters, fp, info)
        dfd.addErrback(lambda f: logger.error(
            f.value, exc_info=failure_to_exc_info(f), extra={'spider': info.spider})
        )
        return dfd.addBoth(lambda _: wad)  # it must return wad at last
    
    # ...

上面請求的過程在注釋中已詳細說明,這里處理下載請求主要涉及的方法為:self.media_to_download() 以及 self._check_media_to_download()。我們繼續查看該方法的代碼:

# 源碼位置:scrapy/pipelines/media.py
# ...

class MediaPipeline:
    # ...
    
    # Overridable Interface
    def media_to_download(self, request, info):
        """Check request before starting download"""
        pass
    
    def _check_media_to_download(self, result, request, info):
        if result is not None:
            return result
        if self.download_func:
            # this ugly code was left only to support tests. TODO: remove
            dfd = mustbe_deferred(self.download_func, request, info.spider)
            dfd.addCallbacks(
                callback=self.media_downloaded, callbackArgs=(request, info),
                errback=self.media_failed, errbackArgs=(request, info))
        else:
            self._modify_media_request(request)
            # 將請求發給引擎模塊,調用download()方法下載網頁
            dfd = self.crawler.engine.download(request, info.spider)
            dfd.addCallbacks(
                callback=self.media_downloaded, callbackArgs=(request, info),
                errback=self.media_failed, errbackArgs=(request, info))
        return dfd
    
    # ...

可以看到 media_to_download() 方法也是在繼承類中需要重寫的,_check_media_to_download() 方法則是核心處理下載文件或者圖片的地方。該方法中首先判斷是否有傳入的 download_func() 方法用于下載網頁,如果沒有則調用引擎模塊中的 download() 方法下載網頁數據,成功后調用 media_downloaded() 方法,失敗則調用 media_failed() 方法。最后我們來看下 self._cache_result_and_execute_waiters() 方法,其內容和邏輯比較簡單,就是緩存請求的數據并將請求清除等待隊列:

# 源碼位置:scrapy/pipelines/media.py
# ...

class MediaPipeline:
    # ...
    
    def _cache_result_and_execute_waiters(self, result, fp, info):
        if isinstance(result, Failure):
            # minimize cached information for failure
            result.cleanFailure()
            result.frames = []
            result.stack = None  
            context = getattr(result.value, '__context__', None)
            if isinstance(context, StopIteration):
                setattr(result.value, '__context__', None)
                
        # 下載隊列中移除該請求
        info.downloading.remove(fp)
        # 緩存下載請求結果
        info.downloaded[fp] = result 
        # 移除等待隊列中的該請求
        for wad in info.waiting.pop(fp):
            # 將原來請求的回調方法以及錯誤回調方法,加入回調處理鏈
            defer_result(result).chainDeferred(wad)

此時,我們總結下 MediaPipeline 類的核心處理流程:

圖片描述

管道下載媒體數據詳細流程

到此,MediaPipeline 類的核心方法我們已經研究完畢,接下來開始繼續學習 MediaPipeline 這個類。注意到該類中并沒有 process_item() 方法,因此它直接繼承父類的 process_item() 方法。從 MediaPipeline 類中可知在 _check_media_to_download() 方法中會下載相應的媒體文件,成功后會回調 media_downloaded() 方法

# 源碼位置:scrapy/pipelines/files.py
# ...

class FilesPipeline(MediaPipeline):
    # ...
    
    def media_downloaded(self, response, request, info):
        referer = referer_str(request)

        if response.status != 200:
            # 打印告警信息,下載失敗
            # ...
            raise FileException('download-error')

        if not response.body:
            # 打印告警信息,無下載內容
            # ...
            raise FileException('empty-content')

        status = 'cached' if 'cached' in response.flags else 'downloaded'
        # 打印debug信息
        self.inc_stats(info.spider, status)

        try:
            # 設置下載文件路徑
            path = self.file_path(request, response=response, info=info)
            # 將下載的內容保存成本地文件
            checksum = self.file_downloaded(response, request, info)
        except FileException as exc:
            # 打印異常信息
            # ...
            raise
        except Exception as exc:
            # 打印異常信息
            # ...
            raise FileException(str(exc))

        return {'url': request.url, 'path': path, 'checksum': checksum, 'status': status}
    
    # ...

從上面的代碼可知,在請求成功后,下載的內容保存在 response.body 中,上面的代碼就是將該文件內容保存成磁盤上的文件:

# 源碼位置:scrapy/pipelines/files.py
# ...

class FilesPipeline(MediaPipeline):
    # ...

    def file_downloaded(self, response, request, info):
        # 生成文件保存路徑
        path = self.file_path(request, response=response, info=info)
        # 獲取字節流形式的下載內容
        buf = BytesIO(response.body)
        checksum = md5sum(buf)
        buf.seek(0)
        # 持久化保存
        self.store.persist_file(path, buf, info)
        # 返回文件的md5值
        return checksum

上面的代碼是不是已經夠清楚了?最后文件內容是 buf,保存的方法是 self.store.persist_file(path, buf, info),該方法是支持將下載內容保存成多種形式,比如保存到本地文件中、保存到 FTP 服務器上,甚至可以通過 S3 接口保存到云存儲中。來看看保存成本地文件形式的代碼,其實和我們平時寫的文件操作一樣,都是 open() 方法打開文件句柄,然后使用 wb 模式將內容寫到文件中。

# 源碼位置:scrapy/pipelines/files.py
# ...

class FSFilesStore:
    # ...
    
    def persist_file(self, path, buf, info, meta=None, headers=None):
        absolute_path = self._get_filesystem_path(path)
        self._mkdir(os.path.dirname(absolute_path), info)
        # 保存文件
        with open(absolute_path, 'wb') as f:
            f.write(buf.getvalue())
            
     # ...

最后對于 ImagesPipeline 類,其基本處理流程不變,只不過最后的保存方式和普通文件管道不一樣,我們來看下面幾個方法:

# 源碼位置:scrapy/pipelines/images.py
# ...

class ImagesPipeline(FilesPipeline):
    # ...
    
    def file_downloaded(self, response, request, info):
        return self.image_downloaded(response, request, info)

    def image_downloaded(self, response, request, info):
        checksum = None
        for path, image, buf in self.get_images(response, request, info):
            if checksum is None:
                buf.seek(0)
                checksum = md5sum(buf)
            width, height = image.size
            # 保存成圖片形式
            self.store.persist_file(
                path, buf, info,
                meta={'width': width, 'height': height},
                headers={'Content-Type': 'image/jpeg'})
        return checksum

    def get_images(self, response, request, info):
        path = self.file_path(request, response=response, info=info)
        # 下載的圖片內容主體
        orig_image = Image.open(BytesIO(response.body))

        width, height = orig_image.size
        if width < self.min_width or height < self.min_height:
            raise ImageException("Image too small (%dx%d < %dx%d)" %
                                 (width, height, self.min_width, self.min_height))

        image, buf = self.convert_image(orig_image)
        yield path, image, buf

        for thumb_id, size in self.thumbs.items():
            thumb_path = self.thumb_path(request, thumb_id, response=response, info=info)
            thumb_image, thumb_buf = self.convert_image(image, size)
            yield thumb_path, thumb_image, thumb_buf

    def convert_image(self, image, size=None):
        # 圖片轉換格式
        # ...

        if size:
            image = image.copy()
            image.thumbnail(size, Image.ANTIALIAS)

        buf = BytesIO()
        image.save(buf, 'JPEG')
        return image, buf

至于上面的代碼細節限于篇幅就不再深究了,有興趣的可以課后去深入學習,這里主要是使用了 Python 的一個專門用來處理圖片的第三方模塊:PIL。掌握了該模塊的基本用法后,再看這些代碼就一目了然了,都是非常常規和基礎的代碼。

好了,本小節的內容就到這里了。如果你能堅持看到這里,在回過頭看看前面提出的問題,是否在心里都已經有了準確的答案?所有的疑問其實在看一遍源碼之后便會豁然開朗,我們也能理解 Scrapy 中設置的參數的含義以及其作用,這些是我們后續深入定制化 Scrapy 框架的基礎,一定要掌握。

3. 小結

本小節中我們先提出了 3 個問題,然后帶著問題進入 Scrapy 的源碼尋找答案,在最后完整看完 Pipeline 的工作代碼后,在回過頭來看原來的問題時,答案已經一目了然了。這種學習源碼的方式也是非常有效的,帶著問題去看代碼。此外,我們沒有深究代碼細節,主要是根據架構圖的數據導向來學習源碼,課后也希望讀者能繼續深入這塊的代碼研究,提出問題,然后解答問題,最后完全掌握該模塊。