🍞 网络与Web编程

🎯 创建TCP服务器

创建一个TCP服务器的一个简单方法是使用 socketserver 库。例如,下面是一个简单的应答服务器:

from socketserver import BaseRequestHandler, TCPServer

class EchoHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        while True:

            msg = self.request.recv(8192)
            if not msg:
                break
            self.request.send(msg)

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

在这段代码中,定义了一个特殊的处理类,实现了一个 handle() 方法,用来为客户端连接服务。 request 属性是客户端socket,client_address 有客户端地址。 为了测试这个服务器,运行它并打开另外一个Python进程连接这个服务器:

>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>

很多时候,可以很容易的定义一个不同的处理器。下面是一个使用 StreamRequestHandler 基类将一个类文件接口放置在底层socket上的例子:

from socketserver import StreamRequestHandler, TCPServer

class EchoHandler(StreamRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        # self.rfile is a file-like object for reading
        for line in self.rfile:
            # self.wfile is a file-like object for writing
            self.wfile.write(line)

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

⚠️ socketserver 可以让我们很容易的创建简单的TCP服务器。 但是,需要注意的是,默认情况下这种服务器是单线程的,一次只能为一个客户端连接服务。 如果想处理多个客户端,可以初始化一个 ForkingTCPServer 或者是 ThreadingTCPServer 对象。例如:

from socketserver import ThreadingTCPServer


if __name__ == '__main__':
    serv = ThreadingTCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

使用fork或线程服务器有个潜在问题就是它们会为每个客户端连接创建一个新的进程或线程。 由于客户端连接数是没有限制的,因此一个恶意的黑客可以同时发送大量的连接让服务器奔溃。

如果担心这个问题,可以创建一个预先分配大小的工作线程池或进程池。 先创建一个普通的非线程服务器,然后在一个线程池中使用 serve_forever() 方法来启动它们。

if __name__ == '__main__':
    from threading import Thread
    NWORKERS = 16
    serv = TCPServer(('', 20000), EchoHandler)
    for n in range(NWORKERS):
        t = Thread(target=serv.serve_forever)
        t.daemon = True
        t.start()
    serv.serve_forever()

一般来讲,一个 TCPServer 在实例化的时候会绑定并激活相应的 socket 。 不过,有时候想通过设置某些选项去调整底下的 socket,可以设置参数bind_and_activate=False` 。如下:

if __name__ == '__main__':
    serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
    # Set up various socket options
    serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    # Bind and activate
    serv.server_bind()
    serv.server_activate()
    serv.serve_forever()

上面的 socket 选项是一个非常普遍的配置项,它允许服务器重新绑定一个之前使用过的端口号。 由于要被经常使用到,它被放置到类变量中,可以直接在 TCPServer 上面设置。 在实例化服务器的时候去设置它的值,如下所示:

if __name__ == '__main__':
    TCPServer.allow_reuse_address = True
    serv = TCPServer(('', 20000), EchoHandler)
    serv.serve_forever()

在上面示例中,我们演示了两种不同的处理器基类( BaseRequestHandlerStreamRequestHandler )。 StreamRequestHandler 更加灵活点,能通过设置其他的类变量来支持一些新的特性。比如:

import socket

class EchoHandler(StreamRequestHandler):
    # Optional settings (defaults shown)
    timeout = 5                      # Timeout on all socket operations
    rbufsize = -1                    # Read buffer size
    wbufsize = 0                     # Write buffer size
    disable_nagle_algorithm = False  # Sets TCP_NODELAY socket option
    def handle(self):
        print('Got connection from', self.client_address)
        try:
            for line in self.rfile:
                # self.wfile is a file-like object for writing
                self.wfile.write(line)
        except socket.timeout:
            print('Timed out!')

最后,还需要注意的是绝大部分Python的高层网络模块(比如HTTP、XML-RPC等)都是建立在 socketserver 功能之上。 也就是说,直接使用 socket 库来实现服务器也并不是很难。 下面是一个使用 socket 直接编程实现的一个服务器简单例子:

from socket import socket, AF_INET, SOCK_STREAM

def echo_handler(address, client_sock):
    print('Got connection from {}'.format(address))
    while True:
        msg = client_sock.recv(8192)
        if not msg:
            break
        client_sock.sendall(msg)
    client_sock.close()

def echo_server(address, backlog=5):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(address)
    sock.listen(backlog)
    while True:
        client_sock, client_addr = sock.accept()
        echo_handler(client_addr, client_sock)

if __name__ == '__main__':
    echo_server(('', 20000))

🎯 创建UDP服务器

跟TCP一样,UDP服务器也可以通过使用 socketserver 库很容易的被创建。 例如,下面是一个简单的时间服务器:

from socketserver import BaseRequestHandler, UDPServer
import time

class TimeHandler(BaseRequestHandler):
    def handle(self):
        print('Got connection from', self.client_address)
        # Get message and client socket
        msg, sock = self.request
        resp = time.ctime()
        sock.sendto(resp.encode('ascii'), self.client_address)

if __name__ == '__main__':
    serv = UDPServer(('', 20000), TimeHandler)
    serv.serve_forever()

跟之前一样,先定义一个实现 handle() 特殊方法的类,为客户端连接服务。 这个类的 request 属性是一个包含了数据报和底层socket对象的元组。client_address 包含了客户端地址。

我们来测试下这个服务器,首先运行它,然后打开另外一个Python进程向服务器发送消息:

>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Wed Aug 15 20:35:08 2012', ('127.0.0.1', 20000))
>>>

⚠️ 一个典型的UDP服务器接收到达的数据报(消息)和客户端地址。如果服务器需要做应答, 它要给客户端回发一个数据报。对于数据报的传送, 应该使用socket的 sendto()recvfrom() 方法。 尽管传统的 send()recv() 也可以达到同样的效果, 但是前面的两个方法对于UDP连接而言更普遍。

由于没有底层的连接,UPD服务器相对于TCP服务器来讲实现起来更加简单。 不过,UDP天生是不可靠的(因为通信没有建立连接,消息可能丢失)。 因此需要由自己来决定该怎样处理丢失消息的情况。这个已经不在本书讨论范围内了, 不过通常来说,如果可靠性对于程序很重要,需要借助于序列号、重试、超时以及一些其他方法来保证。 UDP通常被用在那些对于可靠传输要求不是很高的场合。例如,在实时应用如多媒体流以及游戏领域, 无需返回恢复丢失的数据包(程序只需简单的忽略它并继续向前运行)。

UDPServer 类是单线程的,也就是说一次只能为一个客户端连接服务。 实际使用中,这个无论是对于UDP还是TCP都不是什么大问题。 如果想要并发操作,可以实例化一个 ForkingUDPServerThreadingUDPServer 对象:

from socketserver import ThreadingUDPServer

   if __name__ == '__main__':
    serv = ThreadingUDPServer(('',20000), TimeHandler)
    serv.serve_forever()

直接使用 socket 来实现一个UDP服务器也不难,下面是一个例子:

from socket import socket, AF_INET, SOCK_DGRAM
import time

def time_server(address):
    sock = socket(AF_INET, SOCK_DGRAM)
    sock.bind(address)
    while True:
        msg, addr = sock.recvfrom(8192)
        print('Got message from', addr)
        resp = time.ctime()
        sock.sendto(resp.encode('ascii'), addr)

if __name__ == '__main__':
    time_server(('', 20000))

🎯 简单的客户端认证

可以利用 hmac 模块实现一个连接握手,从而实现一个简单而高效的认证过程。下面是代码示例:

import hmac
import os

def client_authenticate(connection, secret_key):
    '''
    Authenticate client to a remote service.
    connection represents a network connection.
    secret_key is a key known only to both client/server.
    '''
    message = connection.recv(32)
    hash = hmac.new(secret_key, message)
    digest = hash.digest()
    connection.send(digest)

def server_authenticate(connection, secret_key):
    '''
    Request client authentication.
    '''
    message = os.urandom(32)
    connection.send(message)
    hash = hmac.new(secret_key, message)
    digest = hash.digest()
    response = connection.recv(len(digest))
    return hmac.compare_digest(digest,response)

基本原理是当连接建立后,服务器给客户端发送一个随机的字节消息(这里例子中使用了 os.urandom() 返回值)。 客户端和服务器同时利用hmac和一个只有双方知道的密钥来计算出一个加密哈希值。然后客户端将它计算出的摘要发送给服务器, 服务器通过比较这个值和自己计算的是否一致来决定接受或拒绝连接。摘要的比较需要使用 hmac.compare_digest() 函数。 使用这个函数可以避免遭到时间分析攻击,不要用简单的比较操作符(==)。 为了使用这些函数,需要将它集成到已有的网络或消息代码中。例如,对于sockets,服务器代码应该类似下面:

from socket import socket, AF_INET, SOCK_STREAM

secret_key = b'peekaboo'
def echo_handler(client_sock):
    if not server_authenticate(client_sock, secret_key):
        client_sock.close()
        return
    while True:

        msg = client_sock.recv(8192)
        if not msg:
            break
        client_sock.sendall(msg)

def echo_server(address):
    s = socket(AF_INET, SOCK_STREAM)
    s.bind(address)
    s.listen(5)
    while True:
        c,a = s.accept()
        echo_handler(c)

echo_server(('', 18000))

Within a client, you would do this:

from socket import socket, AF_INET, SOCK_STREAM

secret_key = b'peekaboo'

s = socket(AF_INET, SOCK_STREAM)
s.connect(('localhost', 18000))
client_authenticate(s, secret_key)
s.send(b'Hello World')
resp = s.recv(1024)

⚠️ hmac 认证的一个常见使用场景是内部消息通信系统和进程间通信。 例如,如果编写的系统涉及到一个集群中多个处理器之间的通信, 可以使用本节方案来确保只有被允许的进程之间才能彼此通信。 事实上,基于 hmac 的认证被 multiprocessing 模块使用来实现子进程直接的通信。

还有一点需要强调的是连接认证和加密是两码事。 认证成功之后的通信消息是以明文形式发送的,任何人只要想监听这个连接线路都能看到消息(尽管双方的密钥不会被传输)。

🎯 理解事件驱动的IO

事件驱动I/O本质上来讲就是将基本I/O操作(比如读和写)转化为程序需要处理的事件。 例如,当数据在某个socket上被接受后,它会转换成一个 receive 事件,然后被定义的回调方法或函数来处理。 作为一个可能的起始点,一个事件驱动的框架可能会以一个实现了一系列基本事件处理器方法的基类开始:

class EventHandler:
    def fileno(self):
        'Return the associated file descriptor'
        raise NotImplemented('must implement')

    def wants_to_receive(self):
        'Return True if receiving is allowed'
        return False

    def handle_receive(self):
        'Perform the receive operation'
        pass

    def wants_to_send(self):
        'Return True if sending is requested'
        return False

    def handle_send(self):
        'Send outgoing data'
        pass

这个类的实例作为插件被放入类似下面这样的事件循环中:

import select

def event_loop(handlers):
    while True:
        wants_recv = [h for h in handlers if h.wants_to_receive()]
        wants_send = [h for h in handlers if h.wants_to_send()]
        can_recv, can_send, _ = select.select(wants_recv, wants_send, [])
        for h in can_recv:
            h.handle_receive()
        for h in can_send:
            h.handle_send()

事件循环的关键部分是 select() 调用,它会不断轮询文件描述符从而激活它。 在调用 select() 之前,事件循环会询问所有的处理器来决定哪一个想接受或发生。 然后它将结果列表提供给 select() 。然后 select() 返回准备接受或发送的对象组成的列表。 然后相应的 handle_receive()handle_send() 方法被触发。

编写应用程序的时候,EventHandler 的实例会被创建。例如,下面是两个简单的基于UDP网络服务的处理器例子:

import socket
import time

class UDPServer(EventHandler):
    def __init__(self, address):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(address)

    def fileno(self):
        return self.sock.fileno()

    def wants_to_receive(self):
        return True

class UDPTimeServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(1)
        self.sock.sendto(time.ctime().encode('ascii'), addr)

class UDPEchoServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(8192)
        self.sock.sendto(msg, addr)

if __name__ == '__main__':
    handlers = [ UDPTimeServer(('',14000)), UDPEchoServer(('',15000))  ]
    event_loop(handlers)

测试这段代码,试着从另外一个Python解释器连接它:

>>> from socket import *
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'',('localhost',14000))
0
>>> s.recvfrom(128)
(b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000))
>>> s.sendto(b'Hello',('localhost',15000))
5
>>> s.recvfrom(128)
(b'Hello', ('127.0.0.1', 15000))
>>>

实现一个TCP服务器会更加复杂一点,因为每一个客户端都要初始化一个新的处理器对象。 下面是一个TCP应答客户端例子:

class TCPServer(EventHandler):
    def __init__(self, address, client_handler, handler_list):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.sock.bind(address)
        self.sock.listen(1)
        self.client_handler = client_handler
        self.handler_list = handler_list

    def fileno(self):
        return self.sock.fileno()

    def wants_to_receive(self):
        return True

    def handle_receive(self):
        client, addr = self.sock.accept()
        # Add the client to the event loop's handler list
        self.handler_list.append(self.client_handler(client, self.handler_list))

class TCPClient(EventHandler):
    def __init__(self, sock, handler_list):
        self.sock = sock
        self.handler_list = handler_list
        self.outgoing = bytearray()

    def fileno(self):
        return self.sock.fileno()

    def close(self):
        self.sock.close()
        # Remove myself from the event loop's handler list
        self.handler_list.remove(self)

    def wants_to_send(self):
        return True if self.outgoing else False

    def handle_send(self):
        nsent = self.sock.send(self.outgoing)
        self.outgoing = self.outgoing[nsent:]

class TCPEchoClient(TCPClient):
    def wants_to_receive(self):
        return True

    def handle_receive(self):
        data = self.sock.recv(8192)
        if not data:
            self.close()
        else:
            self.outgoing.extend(data)

if __name__ == '__main__':
   handlers = []
   handlers.append(TCPServer(('',16000), TCPEchoClient, handlers))
   event_loop(handlers)

TCP例子的关键点是从处理器中列表增加和删除客户端的操作。 对每一个连接,一个新的处理器被创建并加到列表中。当连接被关闭后,每个客户端负责将其从列表中删除。 如果运行程序并试着用Telnet或类似工具连接,它会将发送的消息回显给。并且它能很轻松的处理多客户端连接。

⚠️ 实际上所有的事件驱动框架原理跟上面的例子相差无几。实际的实现细节和软件架构可能不一样, 但是在最核心的部分,都会有一个轮询的循环来检查活动socket,并执行响应操作。

事件驱动I/O的一个可能好处是它能处理非常大的并发连接,而不需要使用多线程或多进程。 也就是说,select() 调用(或其他等效的)能监听大量的socket并响应它们中任何一个产生事件的。 在循环中一次处理一个事件,并不需要其他的并发机制。

事件驱动I/O的缺点是没有真正的同步机制。 如果任何事件处理器方法阻塞或执行一个耗时计算,它会阻塞所有的处理进程。 调用那些并不是事件驱动风格的库函数也会有问题,同样要是某些库函数调用会阻塞,那么也会导致整个事件循环停止。

对于阻塞或耗时计算的问题可以通过将事件发送个其他单独的线程或进程来处理。 不过,在事件循环中引入多线程和多进程是比较棘手的, 下面的例子演示了如何使用 concurrent.futures 模块来实现:

from concurrent.futures import ThreadPoolExecutor
import os

class ThreadPoolHandler(EventHandler):
    def __init__(self, nworkers):
        if os.name == 'posix':
            self.signal_done_sock, self.done_sock = socket.socketpair()
        else:
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self.signal_done_sock = socket.socket(socket.AF_INET,
                                                  socket.SOCK_STREAM)
            self.signal_done_sock.connect(server.getsockname())
            self.done_sock, _ = server.accept()
            server.close()

        self.pending = []
        self.pool = ThreadPoolExecutor(nworkers)

    def fileno(self):
        return self.done_sock.fileno()

    # Callback that executes when the thread is done
    def _complete(self, callback, r):

        self.pending.append((callback, r.result()))
        self.signal_done_sock.send(b'x')

    # Run a function in a thread pool
    def run(self, func, args=(), kwargs={},*,callback):
        r = self.pool.submit(func, *args, **kwargs)
        r.add_done_callback(lambda r: self._complete(callback, r))

    def wants_to_receive(self):
        return True

    # Run callback functions of completed work
    def handle_receive(self):
        # Invoke all pending callback functions
        for callback, result in self.pending:
            callback(result)
            self.done_sock.recv(1)
        self.pending = []

在代码中,run() 方法被用来将工作提交给回调函数池,处理完成后被激发。 实际工作被提交给 ThreadPoolExecutor 实例。 不过一个难点是协调计算结果和事件循环,为了解决它,我们创建了一对socket并将其作为某种信号量机制来使用。 当线程池完成工作后,它会执行类中的 _complete() 方法。 这个方法再某个socket上写入字节之前会讲挂起的回调函数和结果放入队列中。 fileno() 方法返回另外的那个socket。 因此,这个字节被写入时,它会通知事件循环, 然后 handle_receive() 方法被激活并为所有之前提交的工作执行回调函数。 坦白讲,说了这么多连我自己都晕了。 下面是一个简单的服务器,演示了如何使用线程池来实现耗时的计算:

# A really bad Fibonacci implementation
def fib(n):
    if n < 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

class UDPFibServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(128)
        n = int(msg)
        pool.run(fib, (n,), callback=lambda r: self.respond(r, addr))

    def respond(self, result, addr):
        self.sock.sendto(str(result).encode('ascii'), addr)

if __name__ == '__main__':
    pool = ThreadPoolHandler(16)
    handlers = [ pool, UDPFibServer(('',16000))]
    event_loop(handlers)

运行这个服务器,然后试着用其它Python程序来测试它:

from socket import *
sock = socket(AF_INET, SOCK_DGRAM)
for x in range(40):
    sock.sendto(str(x).encode('ascii'), ('localhost', 16000))
    resp = sock.recvfrom(8192)
    print(resp[0])

应该能在不同窗口中重复的执行这个程序,并且不会影响到其他程序,尽管当数字便越来越大时候它会变得越来越慢。

已经阅读完了这一小节,那么应该使用这里的代码吗?也许不会。应该选择一个可以完成同样任务的高级框架。 不过,如果理解了基本原理,就能理解这些框架所使用的核心技术。 作为对回调函数编程的替代,事件驱动编码有时候会使用到协程。

🍞 并发编程

🎯 线程间通信

⚠️ 编写涉及到大量的线程间同步问题的代码会让人痛不欲生。

比较合适的方式是使用 队列 来进行线程间通信或者每个把线程当作一个 Actor ,利用Actor模型来控制并发。

🧩 使用队列

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put()get() 操作来向队列中添加或者删除元素。 例如:

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
# Get some data
        data = in_q.get()
        # Process the data
        ...

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

Queue 对象已经包含了必要的锁,所以可以通过它在多个线程间多安全地共享数据。 当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。例如:

from queue import Queue
from threading import Thread

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Process the data
        ...

本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition 变量来包装的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列,如同1.5节中介绍的那样。

import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的 task_done()join()

from queue import Queue
from threading import Thread

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        out_q.put(data)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()

        # Process the data
        ...
        # Indicate completion
        in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()

如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,可以把要发送的数据和一个 Event 放到一起使用,这样“生产者”就可以通过这个Event对象来监测处理的过程了。示例如下:

from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    while running:
        # Produce some data
        ...
        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
        ...
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        ...
        # Indicate completion
        evt.set()

基于简单队列编写多线程程序在多数情况下是一个比较明智的选择。从线程安全队列的底层实现来看,无需在的代码中使用锁和其他底层的同步机制,这些只会把的程序弄得乱七八糟。此外,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,可以把的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。 使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果担心对象的共享状态,那最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。例如:

from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    while True:
        # Produce some data
        ...
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        ...

Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者” “消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着的程序可能存在脆弱设计或者固有的可伸缩问题。 get()put() 方法都支持非阻塞方式和设定超时,例如:

import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    ...

try:
    q.put(item, block=False)
except queue.Full:
    ...

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    ...

这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况,比如,一个非阻塞的 put() 方法和一个固定大小的队列一起使用,这样当队列已满时就可以执行不同的代码。比如输出一条日志信息并丢弃。

def producer(q):
    ...
    try:
        q.put(item, block=False)
    except queue.Full:
        log.warning('queued item %r discarded!', item)

如果试图让消费者线程在执行像 q.get() 这样的操作时,超时自动终止以便检查终止标志,应该使用 q.get() 的可选参数 timeout ,如下:

_running = True

def consumer(q):
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
            ...
        except queue.Empty:
            pass

最后,有 q.qsize()q.full()q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能对一个队列使用 empty() 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,最好不要在的代码中使用这些方法。

🧩 Actor模式

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实上,它天生的简单性是它如此受欢迎的重要原因之一。 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。

结合使用一个线程和一个队列可以很容易的定义actor,例如:

from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        '''
        Send a message to the actor
        '''
        self._mailbox.put(msg)

    def recv(self):
        '''
        Receive an incoming message
        '''
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        '''
        Close the actor, thus shutting it down
        '''
        self.send(ActorExit)

    def start(self):
        '''
        Start concurrent execution
        '''
        self._terminated = Event()
        t = Thread(target=self._bootstrap)

        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        '''
        Run method to be implemented by the user
        '''
        while True:
            msg = self.recv()

# Sample ActorTask
class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

这个例子中,使用actor实例的 send() 方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程。 close() 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。 用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。 ActorExit 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 (异常被get()方法抛出并传播出去)。

如果放宽对于同步和异步消息发送的要求, 类actor对象还可以通过生成器来简化定义。例如:

def print_actor():
    while True:

        try:
            msg = yield      # Get a message
            print('Got:', msg)
        except GeneratorExit:
            print('Actor terminating')

# Sample use
p = print_actor()
next(p)     # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()

actor模式的魅力就在于它的简单性。 实际上,这里仅仅只有一个核心操作 send() . 甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。 例如,可以以元组形式传递标签消息,让actor执行不同的操作,如下:

class TaggedActor(Actor):
    def run(self):
        while True:
             tag, *payload = self.recv()
             getattr(self,'do_'+tag)(*payload)

    # Methods correponding to different message tags
    def do_A(self, x):
        print('Running A', x)

    def do_B(self, x, y):
        print('Running B', x, y)

# Example
a = TaggedActor()
a.start()
a.send(('A', 1))      # Invokes do_A(1)
a.send(('B', 2, 3))   # Invokes do_B(2,3)
a.close()
a.join()

作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数, 并且通过一个特殊的Result对象返回结果:

from threading import Event
class Result:
    def __init__(self):
        self._evt = Event()
        self._result = None

    def set_result(self, value):
        self._result = value

        self._evt.set()

    def result(self):
        self._evt.wait()
        return self._result

class Worker(Actor):
    def submit(self, func, *args, **kwargs):
        r = Result()
        self.send((func, args, kwargs, r))
        return r

    def run(self):
        while True:
            func, args, kwargs, r = self.recv()
            r.set_result(func(*args, **kwargs))

# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())

最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。 例如,一个类actor对象的 send() 方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件(比如AMQP、ZMQ等)来发送。

🎯 锁

需要对多线程程序中的临界区加锁以避免竞争条件。

要在多线程程序中安全使用可变对象,需要使用 threading 库中的 Lock 对象,就像下边这个例子这样:

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta

Lock 对象和 with 语句块一起使用可以保证互斥执行,就是每次只有一个线程可以执行 with 语句包含的代码块。with语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁。

线程调度本质上是不确定的,因此,在多线程程序中错误地使用锁机制可能会导致随机数据损坏或者其他的异常行为,我们称之为竞争条件。为了避免竞争条件,最好只在临界区(对临界资源进行操作的那部分代码)使用锁。

⚠️ 为了避免出现死锁的情况,使用锁机制的程序应该设定为 每个线程一次只允许获取一个锁 。如果不能这样做的话,就需要更高级的死锁避免机制.

一个多线程程序,其中线程需要一次获取多个锁,此时如何避免死锁问题。

在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。举个例子:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。 解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的,示例如下:

import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))

    # Make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

如何使用这个上下文管理器呢?可以按照正常途径创建一个锁对象,但不论是单个锁还是多个锁中都使用 acquire() 函数来申请锁, 示例如下:

import threading
import time
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        time.sleep(0.1)
        with acquire(x_lock, y_lock):
            print('Thread-1')

def thread_2():
    while True:
        time.sleep(0.1)
        with acquire(y_lock, x_lock):
            print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

time.sleep(1)

如果执行这段代码,会发现它即使在不同的函数中以不同的顺序获取锁也没有发生死锁。 其关键在于,在第一段代码中,我们对这些锁进行了排序。通过排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取。 如果有多个 acquire() 操作被嵌套调用,可以通过线程本地存储(TLS)来检测潜在的死锁问题。 假设的代码是这样写的:

import threading
import time
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        time.sleep(0.1)
        with acquire(x_lock):
            with acquire(y_lock):
                print('Thread-1')

def thread_2():
    while True:
        time.sleep(0.1)
        with acquire(y_lock):
            with acquire(x_lock):
                print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

time.sleep(1)

如果运行这个版本的代码,必定会有一个线程发生崩溃,异常信息可能像这样:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.3/threading.py", line 639, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.3/threading.py", line 596, in run
    self._target(*self._args, **self._kwargs)
  File "deadlock.py", line 49, in thread_1
    with acquire(y_lock):
  File "/usr/local/lib/python3.3/contextlib.py", line 48, in __enter__
    return next(self.gen)
  File "deadlock.py", line 15, in acquire
    raise RuntimeError("Lock Order Violation")
RuntimeError: Lock Order Violation
>>>

发生崩溃的原因在于,每个线程都记录着自己已经获取到的锁。 acquire() 函数会检查之前已经获取的锁列表, 由于锁是按照升序排列获取的,所以函数会认为之前已获取的锁的id必定小于新申请到的锁,这时就会触发异常。

🎯 保存线程的状态信息

有时在多线程编程中,需要只保存当前运行线程的状态。 要这么做,可使用 thread.local() 创建一个本地线程存储对象。 对这个对象的属性的保存和读取操作都只会对执行线程可见,而其他线程并不可见。

作为使用本地存储的一个有趣的实际例子, 考虑定义过的 LazyConnection 上下文管理器类。 下面我们对它进行一些小的修改使得它可以适用于多线程:

from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()

    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock

    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock

代码中,自己观察对于 self.local 属性的使用。 它被初始化为一个 threading.local() 实例。 其他方法操作被存储为 self.local.sock 的套接字对象。 有了这些就可以在多线程中安全的使用 LazyConnection 实例了。例如:

from functools import partial
def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')

        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))

    print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
    conn = LazyConnection(('www.python.org', 80))

    t1 = threading.Thread(target=test, args=(conn,))
    t2 = threading.Thread(target=test, args=(conn,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

它之所以行得通的原因是每个线程会创建一个自己专属的套接字连接(存储为self.local.sock)。 因此,当不同的线程执行套接字操作时,由于操作的是不同的套接字,因此它们不会相互影响。

⚠️

在大部分程序中创建和操作线程特定状态并不会有什么问题。 不过,当出了问题的时候,通常是因为某个对象被多个线程使用到,用来操作一些专用的系统资源, 比如一个套接字或文件。不能让所有线程共享一个单独对象, 因为多个线程同时读和写的时候会产生混乱。 本地线程存储通过让这些资源只能在被使用的线程中可见来解决这个问题。

本节中,使用 thread.local() 可以让 LazyConnection 类支持一个线程一个连接, 而不是对于所有的进程都只有一个连接。

其原理是,每个 threading.local() 实例为每个线程维护着一个单独的实例字典。 所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。 每个线程使用一个独立的字典就可以保证数据的隔离了。

🎯 线程池

concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务。 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端:

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
    '''
    Handle a client connection
    '''
    print('Got connection from', client_addr)
    while True:
        msg = sock.recv(65536)
        if not msg:
            break
        sock.sendall(msg)
    print('Client closed connection')
    sock.close()

def echo_server(addr):
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

🎯 简单的并行编程(进程池)

concurrent.futures 库提供了一个 ProcessPoolExecutor 类, 可被用来在一个单独的Python解释器中执行计算密集型函数。 不过,要使用它,首先要有一些计算密集型的任务。 我们通过一个简单而实际的例子来演示它。假定有个Apache web服务器日志目录的gzip压缩包:

logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...

进一步假设每个日志文件内容类似下面这样:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机:

# findrobots.py

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

前面的程序使用了通常的map-reduce风格来编写。 函数 find_robots() 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果, 也就是 find_all_robots() 函数中的 all_robots 集合。 现在,假设想要修改这个程序让它使用多核CPU。 很简单——只需要将map()操作替换为一个 concurrent.futures 库中生成的类似操作即可。 下面是一个简单修改版本:

# findrobots.py

import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。 实际的性能优化效果根据的机器CPU数量的不同而不同。

⚠️

ProcessPoolExecutor 的典型用法如下:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as pool:
    ...
    do work in parallel using pool
    ...

其原理是,一个 ProcessPoolExecutor 创建N个独立的Python解释器, N是系统上面可用CPU的个数。可以通过提供可选参数给 ProcessPoolExecutor(N) 来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。

被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果想让一个列表推导或一个 map() 操作并行执行的话,可使用 pool.map() :

# A function that performs a lot of work
def work(x):
    ...
    return result

# Nonparallel code
results = map(work, data)

# Parallel implementation
with ProcessPoolExecutor() as pool:
    results = pool.map(work, data)

另外,可以使用 pool.submit() 来手动的提交单个任务:

# Some function
def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()
    ...

如果手动提交一个任务,结果是一个 Future 实例。 要获取最终结果,需要调用它的 result() 方法。 它会阻塞进程直到结果被返回来。

如果不想阻塞,还可以使用一个回调函数,例如:

def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)

回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result()方法)。 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:

  • 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
  • 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
  • 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
  • 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,

一旦启动不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。

🎯 实现消息发布/订阅模型

要实现发布/订阅的消息通信模式, 通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。 也就是说,不直接将消息从一个任务发送到另一个,而是将其发送给交换机, 然后由交换机将它发送给一个或多个被关联任务。下面是一个非常简单的交换机实现例子:

from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

一个交换机就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。 每个交换机通过一个名称定位,get_exchange() 通过给定一个名称返回相应的 Exchange 实例。

下面是一个简单例子,演示了如何使用一个交换机:

# Example of a task.  Any object with a send() method

class Task:
    ...
    def send(self, msg):
        ...

task_a = Task()
task_b = Task()

# Example of getting an exchange
exc = get_exchange('name')

# Examples of subscribing tasks to it
exc.attach(task_a)
exc.attach(task_b)

# Example of sending messages
exc.send('msg1')
exc.send('msg2')

# Example of unsubscribing
exc.detach(task_a)
exc.detach(task_b)

尽管对于这个问题有很多的变种,不过万变不离其宗。 消息会被发送给一个交换机,然后交换机会将它们发送给被绑定的订阅者。

⚠️

通过队列发送消息的任务或线程的模式很容易被实现并且也非常普遍。 不过,使用发布/订阅模式的好处更加明显。

首先,使用一个交换机可以简化大部分涉及到线程通信的工作。 无需去写通过多进程模块来操作多个线程,只需要使用这个交换机来连接它们。 某种程度上,这个就跟日志模块的工作原理类似。 实际上,它可以轻松的解耦程序中多个任务。

其次,交换机广播消息给多个订阅者的能力带来了一个全新的通信模式。 例如,可以使用多任务系统、广播或扇出。 还可以通过以普通订阅者身份绑定来构建调试和诊断工具。 例如,下面是一个简单的诊断类,可以显示被发送的消息:

class DisplayMessages:
    def __init__(self):
        self.count = 0
    def send(self, msg):
        self.count += 1
        print('msg[{}]: {!r}'.format(self.count, msg))

exc = get_exchange('name')
d = DisplayMessages()
exc.attach(d)

最后,该实现的一个重要特点是它能兼容多个“task-like”对象。 例如,消息接受者可以是actor(12.10小节介绍)、协程、网络连接或任何实现了正确的 send() 方法的东西。

关于交换机的一个可能问题是对于订阅者的正确绑定和解绑。 为了正确的管理资源,每一个绑定的订阅者必须最终要解绑。 在代码中通常会是像下面这样的模式:

exc = get_exchange('name')
exc.attach(some_task)
try:
    ...
finally:
    exc.detach(some_task)

某种意义上,这个和使用文件、锁和类似对象很像。 通常很容易会忘记最后的 detach() 步骤。 为了简化这个,可以考虑使用上下文管理器协议。 例如,在交换机对象上增加一个 subscribe() 方法,如下:

from contextlib import contextmanager
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)

    def detach(self, task):
        self._subscribers.remove(task)

    @contextmanager
    def subscribe(self, *tasks):
        for task in tasks:
            self.attach(task)
        try:
            yield
        finally:
            for task in tasks:
                self.detach(task)

    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)

# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]

# Example of using the subscribe() method
exc = get_exchange('name')
with exc.subscribe(task_a, task_b):
     ...
     exc.send('msg1')
     exc.send('msg2')
     ...

# task_a and task_b detached here

最后还应该注意的是关于交换机的思想有很多种的扩展实现。 例如,交换机可以实现一整个消息通道集合或提供交换机名称的模式匹配规则。 交换机还可以被扩展到分布式计算程序中(比如,将消息路由到不同机器上面的任务中去)。

🎯 使用生成器代替线程

想使用生成器(协程)替代系统线程来实现并发。这个有时又被称为用户级线程或绿色线程。

要使用生成器实现自己的并发,首先要对生成器函数和 yield 语句有深刻理解。 yield 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器, 将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。 要演示这种思想,考虑下面两个使用简单的 yield 语句的生成器函数:

# Two simple generator functions
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')

def countup(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1

这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:

from collections import deque

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        '''
        Admit a newly started task to the scheduler
        '''
        self._task_queue.append(task)

    def run(self):
        '''
        Run until there are no more tasks
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # Run until the next yield statement
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # Generator is no longer executing
                pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

TaskScheduler 类在一个循环中运行生成器集合——每个都运行到碰到yield语句为止。 运行这个例子,输出如下:

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。 生成器函数就是任务,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

实际上,可能想要使用生成器来实现简单的并发。 那么,在实现actor或网络服务器的时候可以使用生成器来替代线程的使用。

下面的代码演示了使用生成器来实现一个不依赖线程的actor:

from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = {}          # Mapping of names to actors
        self._msg_queue = deque()   # Message queue

    def new_actor(self, name, actor):
        '''
        Admit a newly started actor to the scheduler and give it a name
        '''
        self._msg_queue.append((actor,None))
        self._actors[name] = actor

    def send(self, name, msg):
        '''
        Send a message to a named actor
        '''
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor,msg))

    def run(self):
        '''
        Run as long as there are pending messages.
        '''
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            try:
                 actor.send(msg)
            except StopIteration:
                 pass

# Example use
if __name__ == '__main__':
    def printer():
        while True:
            msg = yield
            print('Got:', msg)

    def counter(sched):
        while True:
            # Receive the current count
            n = yield
            if n == 0:
                break
            # Send to the printer task
            sched.send('printer', n)
            # Send the next count to the counter task (recursive)
            sched.send('counter', n-1)

    sched = ActorScheduler()
    # Create the initial actors
    sched.new_actor('printer', printer())
    sched.new_actor('counter', counter(sched))

    # Send an initial message to the counter to initiate
    sched.send('counter', 10000)
    sched.run()

完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。 本质上,调度器在有需要发送的消息时会一直运行着。 计数生成器会给自己发送消息并在一个递归循环中结束。

🎯 多个线程队列轮询

有一个线程队列集合,想为到来的元素轮询它们, 就跟为一个客户端请求去轮询一个网络连接集合的方式一样。

对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。 本质上讲其思想就是:对于每个想要轮询的队列,创建一对连接的套接字。 然后在其中一个套接字上面编写代码来标识存在的数据, 另外一个套接字被传给 select() 或类似的一个轮询数据到达的函数。下面的例子演示了这个思想:

import queue
import socket
import os

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

在这个代码中,一个新的 Queue 实例类型被定义,底层是一个被连接套接字对。 在Unix机器上的 socketpair() 函数能轻松的创建这样的套接字。 在Windows上面,必须使用类似代码来模拟它。 然后定义普通的 get()put() 方法在这些套接字上面来执行I/O操作。 put() 方法再将数据放入队列后会写一个单字节到某个套接字中去。 而 get() 方法在从队列中移除一个元素时会从另外一个套接字中读取到这个单字节数据。

fileno() 方法使用一个函数比如 select() 来让这个队列可以被轮询。 它仅仅只是暴露了底层被 get() 函数使用到的socket的文件描述符而已。

下面是一个例子,定义了一个为到来的元素监控多个队列的消费者:

import select
import threading

def consumer(queues):
    '''
    Consumer that reads data on multiple queues simultaneously
    '''
    while True:
        can_read, _, _ = select.select(queues,[],[])
        for r in can_read:
            item = r.get()
            print('Got:', item)

q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
t.daemon = True
t.start()

# Feed data to the queues
q1.put(1)
q2.put(10)
q3.put('hello')
q2.put(15)
...

如果试着运行它,会发现这个消费者会接受到所有的被放入的元素,不管元素被放进了哪个队列中。