tornado.ioloop 学习(一)
Tornado 是一个 Python 的 Web 框架和一个异步网络库。
Tornado is a Python web framework and asynchronous networking library
单纯作 Python 的 Web 框架,Tornado 并没有啥特色,和 Web.py 的接口类似。话说我学的第一个 Python Web 框架就是 Web.py,导致我对相似的 Tornado 恋恋不忘。于是乎,我就开始阅读 Tornado 的源代码了。
就像上面的介绍,Tornado 除了是一个 Web 框架,还是一个异步的网络库。而这个库的核心和灵魂就是 torndo.ioloop 了。
Content
socket
作为网络库,就不得不从 socket 说起了。
虽然也是读过大学的人,但是当时网络、操作系统完全等于没学过,只怪自己不努力,所以现在对于 socket,只能从应用的层面去学习了。
server
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 5000))
s.listen(1)
print "waiting for connect"
conn, addr = s.accept()
print conn, addr
while True:
data = conn.recv(1024)
if len(data) == 1:
if ord(data) == 4:
conn.sendall('bye.\n')
break
conn.sendall('Hello\n')
s.close()
首先是通过socket.socket
创建一个socket
对象,第一参数有三个可选
socket.AF_UNIX
UNIX socket file,但是要系统支持socket.AF_INET
IPv4 地址socket.AF_INET6
IPv6 地址
第二个参数,指定 socket 类型,有很多类型可选,但是常用的是下面两个
socket.SOCK_STREAM
TCP 协议socket.SOCK_DGRAM
UDP 协议
然后通过s.bind
来绑定socket
监听的地址和端口,如果需要让所有地址都能连接上,可以指定0.0.0.0
最后通过s.accept
开始等待客户端连接上,这里是阻塞的,意思就是如果没有客户端连接上,后面的代码是永远也不会执行的。
下面通过telnet 127.0.0.1 5000
来测试上面的效果
> telnet 127.0.0.1 5000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
Hello
hi
Hello
bye.
Connection closed by foreign host.
当连接telnet
连接上 server 时,代码就会进入循环中,等待客户端发送数据,当客户端发送了数据之后,server 就会向客户端发送hello
,这里的s.recv
也是阻塞的。
最后如果收到了客户端发来的EOF
(C-D)的时候,就会退出循环,然后通过s.close
关闭 socket 后退出程序。
client
上面是通过telnet
作为客户端来测试,下面通过 Python 里的 socket 来建立链接
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 5000))
s.sendall('hello')
data = s.recv(1024)
s.sendall(chr(4))
s.close()
print data
跟 server 一样,首先是初始化socket
对象,然后通过s.connect
连接到 server,接下来通过s.sendall
发送数据到 server,这里的s.connect
和s.recv
也是阻塞的。
blocking network
上面的 server 和 client 都是阻塞的,或者说 server 只能接受一个 client 的连接,client 也只能连接到一个 server 上去。
但是实际情况比如一个网站(web server)是可以同时多个人浏览,web server 是 HTTP 协议,而 HTTP 协议是基于 TCP(socket)的,并且不止是接受一个连接。
socket based on threading
下面是 server 的例子
import socket
import threading
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 5000))
s.listen(5)
print "waiting for connect"
class Server(threading.Thread):
def __init__(self, sock):
self.sock = sock
super(Server, self).__init__()
def run(self):
conn, addr = self.sock.accept()
print "client {} joined\n".format(addr)
while True:
data = conn.recv(1024)
if len(data) == 1:
if ord(data) == 4:
break
print "data {} from {}".format(data, addr)
conn.sendall('Hello\n')
self.sock.close()
print "client {} left\n".format(addr)
thread = list()
for i in xrange(5):
t = Server(s)
thread.append(t)
t.start()
通过threading
来启动了5个socket.accept
来接收 client 的连接,通过telnet
可以同时启动5个 client,并且可以同时工作,如果需要接受更多的 client,可以增加线程数,不过增加线程带来的结果就是内存飚升,会影响性能,据说 windows 下有限制子线程数,也就是限制了客户端的连接数。
select
为了解决阻塞的问题,牛逼的人类发明了select
这个系统调用,多路 I/O 复用,可以在单线程里同时处理多个 socket 连接。在类 Unix 系统中,网络被抽象成了文件,可读可写,所以作为一个文件描述符,可以被select
这个系统调用监视。
import socket
import select
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 5000))
s.listen(5)
rlist = [s]
wlist = []
message = dict()
print "waiting connect"
while rlist:
readable, writable, exceptional = select.select(rlist, wlist, rlist)
for sock in readable:
if sock is s:
conn, addr = s.accept()
print "client {} joined.".format(addr)
message[conn] = list()
rlist.append(conn)
else: # conn
data = sock.recv(1024)
if len(data) == 1:
if ord(data) == 4:
sock.close()
if sock in wlist:
wlist.remove(sock)
if sock in rlist:
rlist.remove(sock)
if sock in message:
del message[sock]
print "client {} left.".format(addr)
continue
message[sock].append('Hello\n')
if sock not in wlist:
wlist.append(sock)
for sock in writable:
if sock in message:
if message[sock]:
data = message[sock].pop()
if data:
sock.sendall(data)
for sock in exceptional:
rlist.remove(sock)
print "error occur."
sock.close()
del message[sock]
上面代码看着变得复杂起来了,其实细细看来还是很好理解的,select.select
接受三个参数,
rlist
可读的文件描述符wlist
可写的文件描述符xlist
异常的对象
并且select.select
会阻塞,直到有可操作的对象(文件描述符),然后返回readable, writable, exceptional
,然后通过循环遍历这三个对象(文件描述符数组)进行操作。
在readable
循环里,如果可读的对象是当前socket
对象,说明有新的连接进入了,通过sock.accept
得到的连接对象,这个连接对象也是一个可读可写的文件描述符,所以要得到新的连接的数据,将这个对象也放入到rlist
里面去。当readable
循环里的对象不是当前socket
对时,那肯定就是建立的连接有了客户端发来的数据了,通过sock.recv
就可以得到数据了。同理writable
。
使用select
,其中需要监视的文件描述符随着连接的加入,会不断的变多,因为select
是遍历的数组,数组的长度是有限制的,并且对于不活跃的文件描述符,select
还是会不停的遍历,这样效率极低。
epoll
由于select
会有各种限制,所有在 Linux 中,又出现了epoll
。
import socket
import select
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', 5000))
s.listen(1)
s.setblocking(0)
print "waiting for connect"
epoll = select.epoll()
epoll.register(s.fileno(), select.EPOLLIN)
connections = {}
requests = {}
responses = {}
clients = {}
try:
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == s.fileno(): # sock
conn, addr = s.accept()
conn.setblocking(0)
epoll.register(conn.fileno(), select.EPOLLIN) # add conn to read
connections[conn.fileno()] = conn
clients[conn.fileno()] = addr
print "client {} joined".format(addr)
elif event & select.EPOLLIN:
data = connections[fileno].recv(1024)
print "recv data {} from client {}".format(data, clients[fileno])
if len(data) == 1:
if ord(data) == 4:
responses[fileno] = "bye\n"
else:
responses[fileno] = "hello\n"
epoll.modify(fileno, select.EPOLLOUT)
elif event & select.EPOLLOUT:
if fileno in responses:
try:
connections[fileno].sendall(responses[fileno])
if responses[fileno] == "bye\n":
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
else:
epoll.modify(fileno, select.EPOLLIN)
del responses[fileno]
except:
epoll.modify(fileno, 0)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
print "client {} left".format(clients[fileno])
del clients[fileno]
finally:
epoll.unregister(s.fileno())
epoll.close()
s.close()
上面无限循环之前,是常规的 socket 对象的创建,然后是epoll
对象的创建和 socket 对象的文件描述符的fileno
作为EPOLLIN
事件加入到epoll
里去监视。
然后就进入了无限次的循环中,然后通过epoll.poll
系统调用,获取可操作的文件描述符,和对应的事件,因为只注册了一个EPOLLIN
事件,所以当有 client 要连接的时候,就可以通过socket.accpet
来建立连接,然后再把建立的连接(也是一个文件描述符)加入到epoll
中去监视读数据。当有EPOLLIN
进入时,就可以从可读连接里去读数据了,在这里我设定如果读到数据就给 client 发送hello
,所以就把当前连接(文件描述符)修改为EPOLLOUT
事件,然后在进入EPOLLOUT
事件时,就把数据发送给客户端去。
那么 epoll
不会不停的扫描所有的文件描述符,而是在有事件发生时,才会处理这个事件。
tornado.ioloop
为什么看tornado.ioloop
会有上面这些,因为特么的直接看tornado.ioloop
,我看不太懂,经过上面一步一步下来,对于理解tornado.ioloop
里的代码有很大的帮助。
因为是以学习为目的,所以从 Tornado 的v1.2这个版本的tornado.ioloop
来看,因为少,才557行,所以荣誉理解。
tornado.ioloop
是以epoll
为基础的,但是在不支持epoll
的平台,会选择该平台对应的实现来做。
从这里开始
# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
if hasattr(select, "epoll"):
# Python 2.6+ on Linux
_poll = select.epoll
elif hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
_poll = _KQueue
else:
try:
# Linux systems with our C module installed
import epoll
_poll = _EPoll
except:
# All other systems
import sys
if "linux" in sys.platform:
logging.warning("epoll module not found; using select()")
_poll = _Select
如果是 Python 2.6+ 在 Linux 上,选择select
下的 epoll
,如果是在 BSD 或者 Mac,则选择kqueue
,如果没有则首先尝试 Tronado 写的 C 扩展的epoll
模块,最后实在不行,就使用select
。
Tronado 将不同的平台不同的 poll 实现了相同的接口,_Select
、_KQueue
、_EPoll
,方便在IOLoop
里使用。
在IOLoop
add_handler
相当于epoll.register
update_handler
相当于epoll.modify
remove_handler
相当于epoll.unregister
然后就是IOLoop.start
了,也是一个无限次循环,然后抛弃其他的不看,直接进入243行的event_pairs = self._impl.poll(poll_timeout)
,然后是267行这里,得到文件描述符fd
和events
,接下来就像上述的epoll
操作一样了,只是tornado.ioloop
增加了一些 helper ,方便了操作而已。简化下来就是下述的代码了
class IOLoop(object):
def __init__(self, impl=None):
self._impl = impl or _poll()
def add_handler(self, fd, handler, events):
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
self._impl.unregister(fd)
def start(self):
while True:
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
self._handlers[fd](fd, events)
上面就是极其简化后的IOLoop
了,看起来是不是跟epoll
很像啊。
然后结合tornado.ioloop
的示例代码
import errno
import functools
import ioloop
import socket
def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error, e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", port))
sock.listen(128)
io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
在之前,完全是不懂上面的示例代码的意思,经过层层剥茧,从最原始的 socket 开始理解,就很好的理解了tornado.ioloop
对于网络的处理了。