Twisted 框架基础

今天我们会先简单过一遍 Twisted 框架中的一些核心知识点,但是 Twisted 框架庞大而又复杂,不适合在一节内容中全部囊括。我们只需要掌握在 Scrapy 框架中经常用到的那部分模块和方法即可。此外,我们将会重点分析 Scrapy 中对 Twisted 模块的进一步封装,帮助我们更好地理解接下来的源码分析过程。

1. Twisted 中的核心类和方法

Twisted 是用 Python 实现的基于事件驱动的网络引擎框架,是 Python 中一个强大的异步 IO 库,类似于 Java 中的 NIO。为了能更好的掌握 Twisted 模块,我们需要先弄清楚 Twisted 中几个核心的概念: reactor、Protocol、ProtocolFactory、Transport 以及 Deffered 等,我们接下来逐一说明。

1.1 Reactor

Twisted 实现了设计模式中的反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去。Twisted 的核心就是 reactor 事件循环。Reactor 可以感知网络、文件系统以及定时器事件。它等待然后处理这些事件,从特定于平台的行为中抽象出来,并提供统一的接口,使得在网络协议栈的任何位置对事件做出响应都变得简单。基本上reactor完成的任务就是:循环等待事件,然后处理事件。

1.2 Deferred 和 DeferredList

Deferred 对象以抽象化的方式表达了一种思想,即结果还尚不存在。它同样能够帮助管理产生这个结果所需要的回调链。当从函数中返回时,Deferred 对象承诺在某个时刻函数将产生一个结果。返回的 Deferred 对象中包含所有注册到事件上的回调引用,因此在函数间只需要传递这一个对象即可,跟踪这个对象比单独管理所有的回调要简单的多。

Deferred 对象包含一对回调链,一个是针对操作成功的回调,一个是针对操作失败的回调。初始状态下 Deferred 对象的两条链都为空。在事件处理的过程中,每个阶段都为其添加处理成功的回调和处理失败的回调。当一个异步结果到来时,Deferred 对象就被“激活”,那么处理成功的回调和处理失败的回调就可以以合适的方式按照它们添加进来的顺序依次得到调用。

注意:Deferred 对象只能被激活一次,如果试图重复激活将引发一个异常。

案例1:Deferred 的使用案例。

from twisted.internet import reactor, defer
from twisted.python import failure


def callback_func1(r):
    print('回调方法1: 结果=%s' % r)
    return r * r


def callback_func2(r):
    print('回调方法2: 传入结果=%s' % r)
    raise ValueError('value error')


def errback_func(f):
    print('错误回调:{}'.format(f))

d = defer.Deferred()
d.addCallback(callback_func1)
d.addCallback(callback_func2)

d.addErrback(errback_func)

d.callback(10)

上面我们创建了一个 Deferred 对象,然后在其中加入两个正常回调方法以及一个错误回调。接下来我们执行回调并带上一个参数,直接的结果如下:

PS D:\learning-notes\慕课网教程\scrapy-lessons\code>python chap23/deferred_test2.py
回调方法1: 结果=10
回调方法2: 传入结果=100
错误回调:[Failure instance: Traceback: <class 'ValueError'>: value error
d:/learning-notes/慕课网教程/scrapy-lessons/code/chap23/deferred_test2.py:24:<module>
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:460:callback
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:568:_startRunCallbacks
--- <exception caught here> ---
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:654:_runCallbacks
d:/learning-notes/慕课网教程/scrapy-lessons/code/chap23/deferred_test2.py:12:callback_func2
]

可以看到,这里回调函数会依次执行,同时将回调返回结果作为下一个回调的输入。此外,在回调函数中手工抛出一个异常后,程序将进入错误回调链执行。

上面的 Deferred 对象都是针对单个事件的异步回调,而 DeferredList 对象使我们可以将一个 Deferred 对象列表视为一个 Deferred 对象,然后我们启动多个异步操作并且在它们全部完成后再执行回调,而无论它们成功或者失败。

案例2: DeferredList 的使用案例。

"""
DeferredList 对象使用实例
"""
from twisted.internet import defer

def print_result(r):
    print(r)


def add_num_10(r):
    return r + 10


def add_num_20(r):
    return r + 20


d1 = defer.Deferred()
d2 = defer.Deferred()
# 一定要在DeferredList之前才能回调生效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d = defer.DeferredList([d1, d2])
d.addCallback(print_result)
# 回调的顺序无关,只和加入到DeferredList中的元素顺序相关
d1.callback(1) 
d2.callback(2)


d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
# 回调无效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d.addCallback(print_result)
d1.callback(1) 
d2.callback(2)

运行结果如下:

PS D:\learning-notes\慕课网教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

我们可以看到,上面的代码有一个 DeferredList 列表,其回调结果返回的是列表中 Deferred 对象的回调结果,这个结果的顺序是 DeferredList 中元素的顺序。例如我们改动下回调的顺序:

d2.callback(2) 
d1.callback(1)

输出的结果和原来还是一样:

PS D:\learning-notes\慕课网教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

1.3 Transports

Transports 代表网络中两个通信结点之间的连接。Transports 负责描述连接的细节,比如连接是 TCP 的还是 UDP 等。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports 实现了 ITransports 接口,它包含如下的方法:

  • write():以非阻塞的方式按顺序依次将数据写到物理连接上;
  • writeSequence():将一个字符串列表写到物理连接上;
  • loseConnection():将所有挂起的数据写入,然后关闭连接;
  • getPeer():取得连接中对端的地址信息;
  • getHost():取得连接中本端的地址信息;

1.4 Factory 和 Protocol

Protocols 描述了如何以异步的方式处理网络中的事件。HTTP、DNS 以及 IMAP 是应用层协议中的例子。Protocols实现了 IProtocol 接口,它包含如下的方法:

  • makeConnection(): 在 transport 对象和服务器之间建立一个连接;
  • connectionMade(): 连接建立起来后调用;
  • dataReceived(): 接收数据时调用;
  • connectionLost(): 关闭连接时调用;

Factory 和 Protocol 有严格的不同。Factory 的工作是管理连接事件,并且创建 Protocol 对象处理每一个成功的连接。一旦连接建立,Protocol 对象就接管下面的工作了,包括收发数据和决定是否关闭连接。

1.5 inlineCallbacks

在【文献4】中对该装饰器有详细的介绍,我们先来看下该装饰器的作用:

twisted.internet.defer.inlineCallbacks 装饰器是用于同步【异步操作】 的。它用于装饰生成器函数。调用该装饰器装饰的生成器函数会返回一个Deferred对象

其大致执行流程如下 (取自文献4):
图片描述

inlineCallbacks 装饰器执行流程

2. Scrapy 中对 Twisted 模块的进一步封装

qi我们来看看 Scrapy 框架源码中对 Twisted 模块的一些封装代码,主要有两个文件:scrapy/utils/defer.pyscrapy/utils/reactor.py

图片描述

Scrapy 源码中对 Twisted 模块的简单封装文件

其中和后面源码分析中紧密相关的主要是 defer.py 文件,我们也先重点先学习这里的代码。

# 源码位置:scrapy/utils/defer.py
# ...


def defer_fail(_failure):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.errback, _failure)
    return d


def defer_succeed(result):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.callback, result)
    return d


def defer_result(result):
    if isinstance(result, defer.Deferred):
        return result
    elif isinstance(result, failure.Failure):
        return defer_fail(result)
    else:
        return defer_succeed(result)


def mustbe_deferred(f, *args, **kw):
    try:
        result = f(*args, **kw)
    except IgnoreRequest as e:
        return defer_fail(failure.Failure(e))
    except Exception:
        return defer_fail(failure.Failure())
    else:
        return defer_result(result)
    
# ...

上面代码中定义的 mustbe_deferred() 方法在后面会经常使用,它的调用会牵扯到最前面的三个方法:

  • defer_fail():等同于 twisted.internet.defer.fail,会延迟到下一个 reactor 循环才会调用错误回调方法;
  • defer_succeed():等同于 twisted.internet.defer.succeed,会延迟到下一个 reactor 循环才会调用成功的回调方法;
  • defer_result(): 根据结果分别进行延迟的成功回调或者错误回调;
# 源码位置:scrapy/utils/defer.py
# ...

def parallel(iterable, count, callable, *args, **named):
    coop = task.Cooperator()
    # 封装调用的方法
    work = (callable(elem, *args, **named) for elem in iterable)
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])


def process_chain(callbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks"""
    d = defer.Deferred()
    # 添加回调链
    for x in callbacks:
        d.addCallback(x, *a, **kw)
    d.callback(input)
    return d


def process_chain_both(callbacks, errbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks and errbacks"""
    d = defer.Deferred()
    for cb, eb in zip(callbacks, errbacks):
        d.addCallbacks(
            callback=cb, errback=eb,
            callbackArgs=a, callbackKeywords=kw,
            errbackArgs=a, errbackKeywords=kw,
        )
    if isinstance(input, failure.Failure):
        d.errback(input)
    else:
        d.callback(input)
    return d


def process_parallel(callbacks, input, *a, **kw):
    dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
    d = defer.DeferredList(dfds, fireOnOneErrback=1, consumeErrors=1)
    d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
    return d

我们来分别对这四个方法进行说明:

  • parallel():对给定可迭代的对象并行调用,使用不超过 count 个并发调用。该方法通过封装 Twisted 中的 task.Cooperator 对象,实现对任务的并发调用;
  • process_chain():代码非常简单,就是单纯的将输入的 callbacks 依次加入到回调链中并返回对应的 Deferred 对象;
  • process_chain_both():对上面方法的扩展,构建正常回调链以及错误回调链,并返回对应的 Deferred 对象;
  • process_parallel():返回一个包含对给定回调的所有成功调用输出的 Deferred 对象;

上述这些方法可以通过阅读其实现理解其功能,最后剩余的几个方法在后续的源码介绍中没有用到,故暂时不进行介绍,大家可以自行参考相关资料理解。

3. 小结

本小节中我们介绍了 Twisted 框架中的几个核心概念,主要是 reactorFactoryProtocol 以及 Deferred 等,这些是 Twisted 的核心组成部分,也会在 Scrapy 中大量应用和进一步封装。此外,我们完成了一些基于 Twisted 模块的案例,这些也帮助我们很好的理解了 Twisted 模块的使用。最后我们进一步分析了 Scrapy 框架中对 Twisted 模块核心方法所做的一些封装,这些也为了后续更好地理解 Scrapy 框架源码打好坚实的基础。