tornado.ioloop 学习(一)

Tornado 是一个 Python 的 Web 框架和一个异步网络库。

Tornado is a Python web framework and asynchronous networking library

单纯作 Python 的 Web 框架,Tornado 并没有啥特色,和 Web.py 的接口类似。话说我学的第一个 Python Web 框架就是 Web.py,导致我对相似的 Tornado 恋恋不忘。于是乎,我就开始阅读 Tornado 的源代码了。

就像上面的介绍,Tornado 除了是一个 Web 框架,还是一个异步的网络库。而这个库的核心和灵魂就是 torndo.ioloop 了。

Content

socket

作为网络库,就不得不从 socket 说起了。 虽然也是读过大学的人,但是当时网络、操作系统完全等于没学过,只怪自己不努力,所以现在对于 socket,只能从应用的层面去学习了。

server

import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind(('127.0.0.1', 5000))  
s.listen(1)  
print "waiting for connect"  
conn, addr = s.accept()  
print conn, addr

while True:  
    data = conn.recv(1024)
    if len(data) == 1:
        if ord(data) == 4:
            conn.sendall('bye.\n')
            break
    conn.sendall('Hello\n')

s.close()  

首先是通过socket.socket创建一个socket对象,第一参数有三个可选

  • socket.AF_UNIX UNIX socket file,但是要系统支持
  • socket.AF_INET IPv4 地址
  • socket.AF_INET6 IPv6 地址

第二个参数,指定 socket 类型,有很多类型可选,但是常用的是下面两个

  • socket.SOCK_STREAM TCP 协议
  • socket.SOCK_DGRAM UDP 协议

然后通过s.bind来绑定socket监听的地址和端口,如果需要让所有地址都能连接上,可以指定0.0.0.0

最后通过s.accept开始等待客户端连接上,这里是阻塞的,意思就是如果没有客户端连接上,后面的代码是永远也不会执行的。

下面通过telnet 127.0.0.1 5000来测试上面的效果

> telnet 127.0.0.1 5000
Trying 127.0.0.1...  
Connected to localhost.  
Escape character is '^]'.  
hello  
Hello  
hi  
Hello  
bye.  
Connection closed by foreign host.  

当连接telnet连接上 server 时,代码就会进入循环中,等待客户端发送数据,当客户端发送了数据之后,server 就会向客户端发送hello,这里的s.recv也是阻塞的。

最后如果收到了客户端发来的EOF(C-D)的时候,就会退出循环,然后通过s.close关闭 socket 后退出程序。

client

上面是通过telnet作为客户端来测试,下面通过 Python 里的 socket 来建立链接

import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect(('127.0.0.1', 5000))  
s.sendall('hello')  
data = s.recv(1024)  
s.sendall(chr(4))  
s.close()  
print data

跟 server 一样,首先是初始化socket对象,然后通过s.connect连接到 server,接下来通过s.sendall发送数据到 server,这里的s.connects.recv也是阻塞的。

blocking network

上面的 server 和 client 都是阻塞的,或者说 server 只能接受一个 client 的连接,client 也只能连接到一个 server 上去。

但是实际情况比如一个网站(web server)是可以同时多个人浏览,web server 是 HTTP 协议,而 HTTP 协议是基于 TCP(socket)的,并且不止是接受一个连接。

socket based on threading

下面是 server 的例子

import socket  
import threading

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind(('127.0.0.1', 5000))  
s.listen(5)  
print "waiting for connect"


class Server(threading.Thread):

    def __init__(self, sock):
        self.sock = sock
        super(Server, self).__init__()

    def run(self):
        conn, addr = self.sock.accept()
        print "client {} joined\n".format(addr)
        while True:
            data = conn.recv(1024)
            if len(data) == 1:
                if ord(data) == 4:
                    break
            print "data {} from {}".format(data, addr)
            conn.sendall('Hello\n')
        self.sock.close()
        print "client {} left\n".format(addr)

thread = list()  
for i in xrange(5):  
    t = Server(s)
    thread.append(t)
    t.start()

通过threading来启动了5个socket.accept来接收 client 的连接,通过telnet可以同时启动5个 client,并且可以同时工作,如果需要接受更多的 client,可以增加线程数,不过增加线程带来的结果就是内存飚升,会影响性能,据说 windows 下有限制子线程数,也就是限制了客户端的连接数。

select

为了解决阻塞的问题,牛逼的人类发明了select这个系统调用,多路 I/O 复用,可以在单线程里同时处理多个 socket 连接。在类 Unix 系统中,网络被抽象成了文件,可读可写,所以作为一个文件描述符,可以被select这个系统调用监视。

import socket  
import select

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind(('127.0.0.1', 5000))  
s.listen(5)  
rlist = [s]  
wlist = []  
message = dict()

print "waiting connect"  
while rlist:  
    readable, writable, exceptional = select.select(rlist, wlist, rlist)
    for sock in readable:
        if sock is s:
            conn, addr = s.accept()
            print "client {} joined.".format(addr)
            message[conn] = list()
            rlist.append(conn)
        else:  # conn
            data = sock.recv(1024)
            if len(data) == 1:
                if ord(data) == 4:
                    sock.close()
                    if sock in wlist:
                        wlist.remove(sock)
                    if sock in rlist:
                        rlist.remove(sock)
                    if sock in message:
                        del message[sock]
                    print "client {} left.".format(addr)
                    continue
            message[sock].append('Hello\n')
            if sock not in wlist:
                wlist.append(sock)

    for sock in writable:
        if sock in message:
            if message[sock]:
                data = message[sock].pop()
                if data:
                    sock.sendall(data)

    for sock in exceptional:
        rlist.remove(sock)
        print "error occur."
        sock.close()
        del message[sock]

上面代码看着变得复杂起来了,其实细细看来还是很好理解的,select.select接受三个参数,

  • rlist 可读的文件描述符
  • wlist 可写的文件描述符
  • xlist 异常的对象

并且select.select 会阻塞,直到有可操作的对象(文件描述符),然后返回readable, writable, exceptional,然后通过循环遍历这三个对象(文件描述符数组)进行操作。

readable循环里,如果可读的对象是当前socket对象,说明有新的连接进入了,通过sock.accept得到的连接对象,这个连接对象也是一个可读可写的文件描述符,所以要得到新的连接的数据,将这个对象也放入到rlist里面去。当readable循环里的对象不是当前socket对时,那肯定就是建立的连接有了客户端发来的数据了,通过sock.recv就可以得到数据了。同理writable

使用select,其中需要监视的文件描述符随着连接的加入,会不断的变多,因为select是遍历的数组,数组的长度是有限制的,并且对于不活跃的文件描述符,select还是会不停的遍历,这样效率极低。

epoll

由于select会有各种限制,所有在 Linux 中,又出现了epoll

import socket  
import select

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind(('0.0.0.0', 5000))  
s.listen(1)  
s.setblocking(0)  
print "waiting for connect"

epoll = select.epoll()  
epoll.register(s.fileno(), select.EPOLLIN)


connections = {}  
requests = {}  
responses = {}  
clients = {}  
try:  
    while True:
        events = epoll.poll(1)
        for fileno, event in events:
            if fileno == s.fileno():  # sock
                conn, addr = s.accept()
                conn.setblocking(0)
                epoll.register(conn.fileno(), select.EPOLLIN)  # add conn to read
                connections[conn.fileno()] = conn
                clients[conn.fileno()] = addr
                print "client {} joined".format(addr)
            elif event & select.EPOLLIN:
                data = connections[fileno].recv(1024)
                print "recv data {} from client {}".format(data, clients[fileno])
                if len(data) == 1:
                    if ord(data) == 4:
                        responses[fileno] = "bye\n"
                else:
                    responses[fileno] = "hello\n"
                epoll.modify(fileno, select.EPOLLOUT)
            elif event & select.EPOLLOUT:
                if fileno in responses:
                    try:
                        connections[fileno].sendall(responses[fileno])
                        if responses[fileno] == "bye\n":
                            epoll.modify(fileno, 0)
                            connections[fileno].shutdown(socket.SHUT_RDWR)
                        else:
                            epoll.modify(fileno, select.EPOLLIN)
                        del responses[fileno]
                    except:
                        epoll.modify(fileno, 0)
            elif event & select.EPOLLHUP:
                epoll.unregister(fileno)
                connections[fileno].close()
                del connections[fileno]
                print "client {} left".format(clients[fileno])
                del clients[fileno]
finally:  
    epoll.unregister(s.fileno())
    epoll.close()
    s.close()

上面无限循环之前,是常规的 socket 对象的创建,然后是epoll对象的创建和 socket 对象的文件描述符的fileno作为EPOLLIN事件加入到epoll里去监视。

然后就进入了无限次的循环中,然后通过epoll.poll系统调用,获取可操作的文件描述符,和对应的事件,因为只注册了一个EPOLLIN事件,所以当有 client 要连接的时候,就可以通过socket.accpet来建立连接,然后再把建立的连接(也是一个文件描述符)加入到epoll中去监视读数据。当有EPOLLIN进入时,就可以从可读连接里去读数据了,在这里我设定如果读到数据就给 client 发送hello,所以就把当前连接(文件描述符)修改为EPOLLOUT事件,然后在进入EPOLLOUT事件时,就把数据发送给客户端去。

那么 epoll不会不停的扫描所有的文件描述符,而是在有事件发生时,才会处理这个事件。

tornado.ioloop

为什么看tornado.ioloop会有上面这些,因为特么的直接看tornado.ioloop,我看不太懂,经过上面一步一步下来,对于理解tornado.ioloop里的代码有很大的帮助。

因为是以学习为目的,所以从 Tornadov1.2这个版本的tornado.ioloop来看,因为少,才557行,所以荣誉理解。

tornado.ioloop是以epoll为基础的,但是在不支持epoll的平台,会选择该平台对应的实现来做。

这里开始

# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
if hasattr(select, "epoll"):  
    # Python 2.6+ on Linux
    _poll = select.epoll
elif hasattr(select, "kqueue"):  
    # Python 2.6+ on BSD or Mac
    _poll = _KQueue
else:  
    try:
        # Linux systems with our C module installed
        import epoll
        _poll = _EPoll
    except:
        # All other systems
        import sys
        if "linux" in sys.platform:
            logging.warning("epoll module not found; using select()")
        _poll = _Select

如果是 Python 2.6+ 在 Linux 上,选择select下的 epoll,如果是在 BSD 或者 Mac,则选择kqueue,如果没有则首先尝试 Tronado 写的 C 扩展的epoll模块,最后实在不行,就使用select

Tronado 将不同的平台不同的 poll 实现了相同的接口,_Select_KQueue_EPoll,方便在IOLoop里使用。

IOLoop

  • add_handler相当于 epoll.register
  • update_handler相当于epoll.modify
  • remove_handler相当于epoll.unregister

然后就是IOLoop.start了,也是一个无限次循环,然后抛弃其他的不看,直接进入243行的event_pairs = self._impl.poll(poll_timeout),然后是267行这里,得到文件描述符fdevents,接下来就像上述的epoll操作一样了,只是tornado.ioloop增加了一些 helper ,方便了操作而已。简化下来就是下述的代码了

class IOLoop(object):

    def __init__(self, impl=None):
        self._impl = impl or _poll()

    def add_handler(self, fd, handler, events):
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        self._impl.unregister(fd)

    def start(self):
        while True:
            event_pairs = self._impl.poll(poll_timeout)

            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                self._handlers[fd](fd, events)

上面就是极其简化后的IOLoop了,看起来是不是跟epoll很像啊。

然后结合tornado.ioloop的示例代码

import errno  
import functools  
import ioloop  
import socket  
def connection_ready(sock, fd, events):  
    while True:
        try:
            connection, address = sock.accept()
        except socket.error, e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        connection.setblocking(0)
        handle_connection(connection, address)

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)  
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
sock.setblocking(0)  
sock.bind(("", port))  
sock.listen(128)  
io_loop = ioloop.IOLoop.instance()  
callback = functools.partial(connection_ready, sock)  
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)  
io_loop.start()  

在之前,完全是不懂上面的示例代码的意思,经过层层剥茧,从最原始的 socket 开始理解,就很好的理解了tornado.ioloop对于网络的处理了。

refs