亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

Twisted 框架基礎

今天我們會先簡單過一遍 Twisted 框架中的一些核心知識點,但是 Twisted 框架龐大而又復雜,不適合在一節內容中全部囊括。我們只需要掌握在 Scrapy 框架中經常用到的那部分模塊和方法即可。此外,我們將會重點分析 Scrapy 中對 Twisted 模塊的進一步封裝,幫助我們更好地理解接下來的源碼分析過程。

1. Twisted 中的核心類和方法

Twisted 是用 Python 實現的基于事件驅動的網絡引擎框架,是 Python 中一個強大的異步 IO 庫,類似于 Java 中的 NIO。為了能更好的掌握 Twisted 模塊,我們需要先弄清楚 Twisted 中幾個核心的概念: reactor、Protocol、ProtocolFactory、Transport 以及 Deffered 等,我們接下來逐一說明。

1.1 Reactor

Twisted 實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去。Twisted 的核心就是 reactor 事件循環。Reactor 可以感知網絡、文件系統以及定時器事件。它等待然后處理這些事件,從特定于平臺的行為中抽象出來,并提供統一的接口,使得在網絡協議棧的任何位置對事件做出響應都變得簡單?;旧蟫eactor完成的任務就是:循環等待事件,然后處理事件。

1.2 Deferred 和 DeferredList

Deferred 對象以抽象化的方式表達了一種思想,即結果還尚不存在。它同樣能夠幫助管理產生這個結果所需要的回調鏈。當從函數中返回時,Deferred 對象承諾在某個時刻函數將產生一個結果。返回的 Deferred 對象中包含所有注冊到事件上的回調引用,因此在函數間只需要傳遞這一個對象即可,跟蹤這個對象比單獨管理所有的回調要簡單的多。

Deferred 對象包含一對回調鏈,一個是針對操作成功的回調,一個是針對操作失敗的回調。初始狀態下 Deferred 對象的兩條鏈都為空。在事件處理的過程中,每個階段都為其添加處理成功的回調和處理失敗的回調。當一個異步結果到來時,Deferred 對象就被“激活”,那么處理成功的回調和處理失敗的回調就可以以合適的方式按照它們添加進來的順序依次得到調用。

注意:Deferred 對象只能被激活一次,如果試圖重復激活將引發一個異常。

案例1:Deferred 的使用案例。

from twisted.internet import reactor, defer
from twisted.python import failure


def callback_func1(r):
    print('回調方法1: 結果=%s' % r)
    return r * r


def callback_func2(r):
    print('回調方法2: 傳入結果=%s' % r)
    raise ValueError('value error')


def errback_func(f):
    print('錯誤回調:{}'.format(f))

d = defer.Deferred()
d.addCallback(callback_func1)
d.addCallback(callback_func2)

d.addErrback(errback_func)

d.callback(10)

上面我們創建了一個 Deferred 對象,然后在其中加入兩個正?;卣{方法以及一個錯誤回調。接下來我們執行回調并帶上一個參數,直接的結果如下:

PS D:\learning-notes\慕課網教程\scrapy-lessons\code>python chap23/deferred_test2.py
回調方法1: 結果=10
回調方法2: 傳入結果=100
錯誤回調:[Failure instance: Traceback: <class 'ValueError'>: value error
d:/learning-notes/慕課網教程/scrapy-lessons/code/chap23/deferred_test2.py:24:<module>
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:460:callback
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:568:_startRunCallbacks
--- <exception caught here> ---
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:654:_runCallbacks
d:/learning-notes/慕課網教程/scrapy-lessons/code/chap23/deferred_test2.py:12:callback_func2
]

可以看到,這里回調函數會依次執行,同時將回調返回結果作為下一個回調的輸入。此外,在回調函數中手工拋出一個異常后,程序將進入錯誤回調鏈執行。

上面的 Deferred 對象都是針對單個事件的異步回調,而 DeferredList 對象使我們可以將一個 Deferred 對象列表視為一個 Deferred 對象,然后我們啟動多個異步操作并且在它們全部完成后再執行回調,而無論它們成功或者失敗。

案例2: DeferredList 的使用案例。

"""
DeferredList 對象使用實例
"""
from twisted.internet import defer

def print_result(r):
    print(r)


def add_num_10(r):
    return r + 10


def add_num_20(r):
    return r + 20


d1 = defer.Deferred()
d2 = defer.Deferred()
# 一定要在DeferredList之前才能回調生效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d = defer.DeferredList([d1, d2])
d.addCallback(print_result)
# 回調的順序無關,只和加入到DeferredList中的元素順序相關
d1.callback(1) 
d2.callback(2)


d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
# 回調無效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d.addCallback(print_result)
d1.callback(1) 
d2.callback(2)

運行結果如下:

PS D:\learning-notes\慕課網教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

我們可以看到,上面的代碼有一個 DeferredList 列表,其回調結果返回的是列表中 Deferred 對象的回調結果,這個結果的順序是 DeferredList 中元素的順序。例如我們改動下回調的順序:

d2.callback(2) 
d1.callback(1)

輸出的結果和原來還是一樣:

PS D:\learning-notes\慕課網教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

1.3 Transports

Transports 代表網絡中兩個通信結點之間的連接。Transports 負責描述連接的細節,比如連接是 TCP 的還是 UDP 等。它們被設計為“滿足最小功能單元,同時具有最大程度的可復用性,而且從協議實現中分離出來,這讓許多協議可以采用相同類型的傳輸。Transports 實現了 ITransports 接口,它包含如下的方法:

  • write():以非阻塞的方式按順序依次將數據寫到物理連接上;
  • writeSequence():將一個字符串列表寫到物理連接上;
  • loseConnection():將所有掛起的數據寫入,然后關閉連接;
  • getPeer():取得連接中對端的地址信息;
  • getHost():取得連接中本端的地址信息;

1.4 Factory 和 Protocol

Protocols 描述了如何以異步的方式處理網絡中的事件。HTTP、DNS 以及 IMAP 是應用層協議中的例子。Protocols實現了 IProtocol 接口,它包含如下的方法:

  • makeConnection(): 在 transport 對象和服務器之間建立一個連接;
  • connectionMade(): 連接建立起來后調用;
  • dataReceived(): 接收數據時調用;
  • connectionLost(): 關閉連接時調用;

Factory 和 Protocol 有嚴格的不同。Factory 的工作是管理連接事件,并且創建 Protocol 對象處理每一個成功的連接。一旦連接建立,Protocol 對象就接管下面的工作了,包括收發數據和決定是否關閉連接。

1.5 inlineCallbacks

在【文獻4】中對該裝飾器有詳細的介紹,我們先來看下該裝飾器的作用:

twisted.internet.defer.inlineCallbacks 裝飾器是用于同步【異步操作】 的。它用于裝飾生成器函數。調用該裝飾器裝飾的生成器函數會返回一個Deferred對象。

其大致執行流程如下 (取自文獻4):
圖片描述

inlineCallbacks 裝飾器執行流程

2. Scrapy 中對 Twisted 模塊的進一步封裝

qi我們來看看 Scrapy 框架源碼中對 Twisted 模塊的一些封裝代碼,主要有兩個文件:scrapy/utils/defer.pyscrapy/utils/reactor.py。

圖片描述

Scrapy 源碼中對 Twisted 模塊的簡單封裝文件

其中和后面源碼分析中緊密相關的主要是 defer.py 文件,我們也先重點先學習這里的代碼。

# 源碼位置:scrapy/utils/defer.py
# ...


def defer_fail(_failure):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.errback, _failure)
    return d


def defer_succeed(result):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.callback, result)
    return d


def defer_result(result):
    if isinstance(result, defer.Deferred):
        return result
    elif isinstance(result, failure.Failure):
        return defer_fail(result)
    else:
        return defer_succeed(result)


def mustbe_deferred(f, *args, **kw):
    try:
        result = f(*args, **kw)
    except IgnoreRequest as e:
        return defer_fail(failure.Failure(e))
    except Exception:
        return defer_fail(failure.Failure())
    else:
        return defer_result(result)
    
# ...

上面代碼中定義的 mustbe_deferred() 方法在后面會經常使用,它的調用會牽扯到最前面的三個方法:

  • defer_fail():等同于 twisted.internet.defer.fail,會延遲到下一個 reactor 循環才會調用錯誤回調方法;
  • defer_succeed():等同于 twisted.internet.defer.succeed,會延遲到下一個 reactor 循環才會調用成功的回調方法;
  • defer_result(): 根據結果分別進行延遲的成功回調或者錯誤回調;
# 源碼位置:scrapy/utils/defer.py
# ...

def parallel(iterable, count, callable, *args, **named):
    coop = task.Cooperator()
    # 封裝調用的方法
    work = (callable(elem, *args, **named) for elem in iterable)
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])


def process_chain(callbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks"""
    d = defer.Deferred()
    # 添加回調鏈
    for x in callbacks:
        d.addCallback(x, *a, **kw)
    d.callback(input)
    return d


def process_chain_both(callbacks, errbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks and errbacks"""
    d = defer.Deferred()
    for cb, eb in zip(callbacks, errbacks):
        d.addCallbacks(
            callback=cb, errback=eb,
            callbackArgs=a, callbackKeywords=kw,
            errbackArgs=a, errbackKeywords=kw,
        )
    if isinstance(input, failure.Failure):
        d.errback(input)
    else:
        d.callback(input)
    return d


def process_parallel(callbacks, input, *a, **kw):
    dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
    d = defer.DeferredList(dfds, fireOnOneErrback=1, consumeErrors=1)
    d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
    return d

我們來分別對這四個方法進行說明:

  • parallel():對給定可迭代的對象并行調用,使用不超過 count 個并發調用。該方法通過封裝 Twisted 中的 task.Cooperator 對象,實現對任務的并發調用;
  • process_chain():代碼非常簡單,就是單純的將輸入的 callbacks 依次加入到回調鏈中并返回對應的 Deferred 對象;
  • process_chain_both():對上面方法的擴展,構建正?;卣{鏈以及錯誤回調鏈,并返回對應的 Deferred 對象;
  • process_parallel():返回一個包含對給定回調的所有成功調用輸出的 Deferred 對象;

上述這些方法可以通過閱讀其實現理解其功能,最后剩余的幾個方法在后續的源碼介紹中沒有用到,故暫時不進行介紹,大家可以自行參考相關資料理解。

3. 小結

本小節中我們介紹了 Twisted 框架中的幾個核心概念,主要是 reactor、Factory、Protocol 以及 Deferred 等,這些是 Twisted 的核心組成部分,也會在 Scrapy 中大量應用和進一步封裝。此外,我們完成了一些基于 Twisted 模塊的案例,這些也幫助我們很好的理解了 Twisted 模塊的使用。最后我們進一步分析了 Scrapy 框架中對 Twisted 模塊核心方法所做的一些封裝,這些也為了后續更好地理解 Scrapy 框架源碼打好堅實的基礎。