欢迎各位兄弟 发布技术文章

这里的技术是共享的

Event Loop

event loop 对象包含两个部分:event 和 loop。event 负责 I/O 事件通知而 loop 负责循环处理 I/O 通知并在就绪时调用回调。这里 event 的含义与 select 中的 event mask 类似。

BaseEventLoop 类实现了基本的 loop 部分,而类似于 BaseSelectorEventLoop 这样的类实现了基于 selector 的 event 部分。

event loop 内部维护着两个容器:_ready 和 _scheduled。类型分别是 deque 和 list。_ready 代表已经可以执行,_scheduled 代表计划执行。_scheduled 中的 handle 是可以 cancel 的。

一次 loop 的基本流程可以参见 _run_once() 方法,其说明文档如下:

This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.

流程为:

  1. 将 _scheduled 中已 canceled 的 handle 去掉
  2. 检查 _ready 和 _scheduled 以确定一个用于 _selector.select() 的 timeout 值

     timeout = None
     if self._ready:
         timeout = 0
     elif self._scheduled:
         # Compute the desired timeout.
         when = self._scheduled[0]._when
         timeout = max(0, when - self.time())
    
  3. 通过 _selector.select() 获得一个 event_list 并 _process_events() 之
  4. _process_events 即为将 得到的 events(handle)添加到 _ready 中
  5. 顺序检查 _scheduled 将其中 .when() 到期的 handle 挪到 _ready 中
  6. 顺序执行 _ready 中的 handle (handle._run())

故 eventloop 计划异步任务的基本方法就是将延时任务添加到 _scheduled 中,以及将即时任务添加到 _ready 中。延时任务的来源有 await futureloop.create_task() 等(最简单的方法应该是直接实例化 Future 实例,但这种做法除非在测试,一般不必用于真实业务中)。即时任务的来源基本有三种:call_soon() 的调用、_shceduled 到期,和 selector.select() 的返回。在 IO 处理中一般主要依赖第三种机制。

callback

callback 类型是普通的函数(不能是 coroutine)。

可以使用的方法有 call_soon 和 call_at。(call_later 是通过 call_at 实现的)

调用 call_soon 会将一个 Handle 压入 _ready

调用 call_at 会将一个 TimerHandle 压入 _scheduled

create_task

Task 用于处理 coroutine。底层机制上实际仍然依赖 callback。

I/O

EventLoop 对 IO 的支持依赖 selector,因此基本可以想象一下它的实现逻辑,下面以 BaseSelectorEventLoop 类的 add_reader 方法为例

def add_reader(self, fd, callback, *args):
    """Add a reader callback."""
    self._check_closed()
    handle = events.Handle(callback, args, self)
    try:
        key = self._selector.get_key(fd)
    except KeyError:
        self._selector.register(fd, selectors.EVENT_READ,
                                (handle, None))
    else:
        mask, (reader, writer) = key.events, key.data
        self._selector.modify(fd, mask | selectors.EVENT_READ,
                              (handle, writer))
        if reader is not None:
            reader.cancel()

当你调用 loop.create_connection 或 asyncio.open_connection 这类操作 IO 的方法时, add_reader 就会被调用。他会把你要使用的 fd 注册到 selector 里面,注册的 data 参数是一个 Handle 对象。如调用 asyncio.open_connection时,这个 callback 就是 _SelectorSocketTransport._read_ready 方法。这样当 selector 发现这个 fd 就绪的时候,_read_ready就会被调用。

Handle

Handle 是对一个回调的封装,是调用 call 类方法返回的一个对象,标识一个任务。拥有 cancel() 和 _run() 方法。

Task & Coroutine


Coroutine

使用 async def 定义一个 coroutine function,调用 coroutine function 可以得到一个 coroutine object。下文中统称 coroutine,具体含义的辩解依赖上下文。

区别两种对象的一个显式方法是使用 asyncio.iscoroutine() 和 asyncio.iscoroutinefunction() 函数。

coroutine 的执行依赖 event loop。

较早期版本(3.5 以前)定义 coroutine 的方法是使用 @asyncio.coroutine 装饰器,await 也需要用 yield from 替代。asyncio 的 sleep() 函数定义如下:

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    future = futures.Future(loop=loop)
    h = future._loop.call_later(delay, future._set_result_unless_cancelled, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

Future

class asyncio.Future(*, loop=None) 是对一个可调用对象的异步执行控制或者说代理的封装。因此具有如下方法:

  • cancel()
  • cancelled()
  • done()
  • result()
  • exception()
  • add_done_callback(fn)
  • remove_done_callback(fn)
  • set_result(result)
  • set_exception(exception)

注意 Future 并不包含可执行对象的本体,他只保存状态、结果、额外的回调函数这些东西。这也是上面称之为代理的原因。因为实际的调用过程是在 event loop 里发生的,event loop 负责在异步执行完成后向 future 对象写入 result 或 exception。这是异步任务的基本逻辑。

future 实例有三种状态:

  • PENDING
  • CANCELLED
  • FINISHED

初始状态为 PENDING,当调用 cancel() 方法时会立即进入 CANCELLED 状态并 schedule callbacks。当被调用 set_result()时会进入 FINISHED 状态,并 schedule callbacks。当然两种情况下传入 callback 的参数会不同。

schedule callbacks 依然依赖 event loop 来执行:

def _schedule_callbacks(self):
    """Internal: Ask the event loop to call all callbacks.

    The callbacks are scheduled to be called as soon as possible. Also
    clears the callback list.
    """
    callbacks = self._callbacks[:]
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback in callbacks:
        self._loop.call_soon(callback, self)

Task

class asyncio.Task(coro, *, loop=None) 是 Future 的子类。因为 Future 没有保存其相关可执行对象的信息,我们 schedule the execution of a coroutine 这件事一般是通过 Task 对象来做的。

create_task 通过调用 _loop.call_soon(self._step) 来 schedule coroutine,而 Task 的核心是其 _step() 方法,_step(self, value=None, exc=None) 的核心代码是: (更多异常处理代码没有贴进来)

    try:
        if exc is not None:
            result = coro.throw(exc)
        else:
            result = coro.send(value)
    except StopIteration as exc:
        self.set_result(exc.value)
    else:
        if isinstance(result, futures.Future):
            # Yielded Future must come from Future.__iter__().
            if result._blocking:
                result._blocking = False
                result.add_done_callback(self._wakeup)
                self._fut_waiter = result
        elif result is None:
            # Bare yield relinquishes control for one event loop iteration.
            self._loop.call_soon(self._step)

result is None 的情况发生在 coroutine 中嵌套 await 时,否则 coroutine return 的值会以 StopIeration 的 value 属性的形式抛出。这便是当捕获到此异常时会调用 set_result 的原因。调用此方法意味着 coroutine(Future) 的结束。

因为 Coroutine 中可以 await future,所以 Task 提供了一种机制用于当 future 完成时唤醒父级协程。即为当 await future 时,task 对象会将此 future 保存到 _fut_waiter 对象中,并为其添加一个名为 _wake_up() 的回调。

Transport & Protocol


Transport 是 asyncio 提供的一个抽象了通信接口的类。操作 transport 时可以不再关心具体的通信对象是 socket 还是 pipe。它的基本继承结构是这样的:

BaseTransport
    get_extra_info
    close

ReadTransport(BaseTransport)
    pause_reading
    resume_reading

WriteTransport(BaseTransport)
    set_write_buffer_limits
    get_write_buffer_size
    write
    writelines
    write_eof
    can_write_eof
    abort

Transport(ReadTransport, Writetransport)

Protocol 是当创建连接时与 Transport 一起被创建的一个业务层对象。一般用户需要子类化一个 Protocol 并覆盖他的一些方法以实现设计的功能。Protocol 的继承结构是这样的

BaseProtocol
    connection_made
    connection_lost
    pause_writing
    resume_writing

Protocol(BaseProtocol)
    data_received
    eof_received

Protocol 默认定义了以上的事件回调,由 event_loop 负责在事件发生时调用。protocol 的生命周期中这些回调的顺序为:

start -> CM [-> DR*] [-> ER?] -> CL -> end

* CM: connection_made()
* DR: data_received()
* ER: eof_received()
* CL: connection_lost()

其中 connection_made 会传入对应的 Transport 对象用于标识远端连接,可以使用这个对象返回消息。因此一个简单的 echo server 的 Protocol 至少应该是这样的:

class EchoProtocol(Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self.transport.write(data)

event_loop

Protocol 的实例化与回调调用都依赖 EventLoop,具体的注册机制为调用 loop 的 create_connection 和 create_server协程方法。分别用于创建客户端与服务器连接。

@coroutine
def create_connection(self, protocol_factory, host=None, port=None, *,
                      ssl=None, family=0, proto=0, flags=0, sock=None,
                      local_addr=None, server_hostname=None):
    """Connect to a TCP server.

    Create a streaming transport connection to a given Internet host and
    port: socket family AF_INET or socket.AF_INET6 depending on host (or
    family if specified), socket type SOCK_STREAM. protocol_factory must be
    a callable returning a protocol instance.

    This method is a coroutine which will try to establish the connection
    in the background.  When successful, the coroutine returns a
    (transport, protocol) pair.
    """
    .
    .
    .
    return transport, protocol

@coroutine
def create_server(self, protocol_factory, host=None, port=None,
                  *,
                  family=socket.AF_UNSPEC,
                  flags=socket.AI_PASSIVE,
                  sock=None,
                  backlog=100,
                  ssl=None,
                  reuse_address=None):
    """Create a TCP server bound to host and port.

    Return a Server object which can be used to stop the service.

    This method is a coroutine.
    """
    .
    .
    .
    return server

其中 host 和 port 并不是必需的,可以用一个 socket 实例代替。

Streams


Stream 是对 Transport + Protocol 模式的又一层封装(而非取代)。当使用 asyncio.open_connection 创建客户端时,得到的是一对 StreamReader 和 StreamWriter 对象。

StreamReader 的核心方法是 _wakeup_waiter 和 feed_data。当 Protocol(StreamProtocol) 的 data_received(data) 被调用时,data 会被直接转发给 feed_data,然后 feed_data 会把 data 放到 self.buffer 里并调用 _wakeup_waiter,这里的 waiter 是之前如 data = await reader.read(n) 这样的语句生成的 Future。

_wakeup_waiter 的代码其实基本上只是调用了一下 self._waiter.set_result(None)。这里之所以使用 None 只是为了将 waiter 的状态从 _PENDING 改为 _FINISHED,真正的数据获取(read(n))是从 buffer 里取的。

Stream 之于 Protocol 的优势在于对 I/O 事件的处理方式,从被动的 def data_received(data) 转变为主动的 data = await reader.read(n)。将 asyncio 的编程模式从回调式变成了一种类似同步编程的方式。

StreamReaderProtocol

这是用来替代 StreamReader 成为 Protocol 的一个没什么特别作用只是在转发方法调用的 helper class,其之所以存在是因为要避免 StreamReader 直接成为 Protocol 的子类。否则用户就可以直接通过 StreamReader 访问到 Protocol 的方法,而这种情形与 StreamReader 的设计目标相悖。

因此这个类的存在一般可以忽视。

举个栗子


下面通过一个最简单的 socket read 操作解释一下 event_loop 的调度方式:

import asyncio

loop = asyncio.get_event_loop()


async def foo(loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 1234, loop=loop)
    for i in range(2):
        data = await reader.read(100)
        print(data.decode())

loop.run_until_complete(loop.create_task(foo()))

上例尝试创建一个 socket 连接 1234 端口,并读取两次数据。内部流程基本是这样的:

  1. foo() 的调用返回了一个 coroutine 对象,loop 被要求运行到这个对象完成为止
  2. 第一次 _run_once 开始,foo() 开始执行,直到 await open_connection
  3. openconnection 的本质是创建一个 _SelectorSocketTransport 对象,在这个类的 `_init方法中调用了三次loop.call_soon()` 三个回调分别是
     * self._protocol.connection_made, self
     * self._loop.add_reader, self._sock_fd, self._read_ready
     * waiter._set_result_unless_cancelled, None
    
  4. 第二次 _run_once 开始,前面三个回调被依次调用。现在
     * protocol 绑定了 transport
     * _read_ready 被注册到 selector 中
     * await open_connection 被标记为已完成
    
  5. 第三次 _run_once 开始,代码执行到了 await reader.read() 处。read 先调用一次 await _wait_for_data() 将自己挂起,挂起的方式为,_wait_for_data 为 reader 创建一个 无意义的 Future:self._waiter = Future(loop=self._loop)。等待其他程序为此 _waiter set_result
  6. 第四次 _run_once 开始,selector 发现之前注册的 fd 有数据可用,遂将 _read_ready 加入 _ready 中,因为直接加入了 _ready,这些 _read_ready 会在本轮内执行
  7. _read_ready 直接调用 self._sock.recv() 读取数据,然后将数据传给 self._protocol.data_received(data),因为我们创建 StreamReader 用的是 open_connection 函数,这里 _protocol 类型是 StreamReaderProtocol ,他会把数据再次转发给 self._stream_reader.feed_data(data) 。
  8. feed_data 会将 data 直接存入 self._buffer,然后调用 self._wakeup_waiter()。_wakeup_waiter 其实就是执行一下 self._waiter.set_result(None)。因为这个 waiter 只是作为一个挂起机制在用,并没有人期待他的 result,所以这里传一个 None 就可以了。第五条中挂起的 reader 将在下一轮恢复执行。
  9. 第五次 _run_once 开始,reader.read() 恢复执行,此时因为已经历过一次 select,buffer 中基本可以确信已有数据。于是直接将 buffer 中的数据取走返回。
  10. foo() 里面的代码得以继续执行,data 被打印出来。然后开始下一次循环。

例子中使用了 for 循环来实现同一 socket 的多次读操作,是因为 foo() 这个 task 一旦执行完毕,由 open_connection 创建的 reader 对象就会被关闭。

可见 Stream 的核心思路便是一旦需要读数据就使用 _wait_for_data 将自身挂起,并等待 selector 将之唤醒。同时异步编程的基本思路便是:以 Handle 为单位,不断把下一个 handle 交给 loop 的下一次 _run_once 执行。而 coroutine 又在异步之上增加了挂起/唤醒功能,使得任务流调度更加灵活和接近同步。

来自  https://my.oschina.net/lionets/blog/499803

唯物品评历史

关注"唯物品评历史",跟着泪痕春雨先生,读懂历史,看彻人生

打开隐藏二维码