一.
Tornado是什么?
Facebook发布了开源网络服务器框架Tornado,该平台基于Facebook刚刚收购的社交聚合网站FriendFeed的实时信息服务开发而来.Tornado由Python编写,是一款轻量级的Web服务器,同时又是一个开发框架。采用非阻塞I/O模型(epoll),主要是为了应对高并发
访问量而被开发出来,尤其适用于comet应用。
二. 为什么要阅读Tornado的源代码
Tornado由前google员工开发, 代码非常精练, 实现也很轻巧,
加上清晰的注释和丰富的demo, 我们可以很容易的阅读分析tornado. 通过阅读Tornado的源码,
你将学到:
1.理解Tornado的内部实现, 使用tornado进行web开发将更加得心应手
2.如何实现一个高性能,非阻塞的http服务器
3.如何实现一个web框架
4.各种网络编程的知识, 比如epoll
5.python编程的绝佳实践
三. 从http服务器开始
Tornado不仅是一个web开发框架, 还自己实现了一个http服务器.
谈到http服务器, 我们自然想到C10K.
其中介绍了很多种服务器的编程模型, tornado的http服务器采用的是:
多进程 + 非阻塞 + epoll + pre-fork 模型
在分析tornado服务器之前, 有必要了解web服务器的工作流程.
四 http服务器工作三部曲
从实现上来说, web服务器是这样工作的:
(1) 创建listen socket, 在指定的监听端口, 等待客户端请求的到来
(2) listen socket接受客户端的请求, 得到client
socket, 接下来通过client socket与客户端通信
(3) 处理客户端的请求, 首先从client socket读取http请求的协议头,
如果是post协议, 还可能要
读取客户端上传的数据, 然后处理请求, 准备好客户端需要的数据, 通过client
socket写给客户端
五 Hello World from Http Server
为了更加理解web服务器的工作流程, 我们使用python编写一个简单的http服务器,
返回Hello, World给浏览器
Python代码
import socket def handle_request(client): buf = client.recv(1024) print buf client.send("HTTP/1.1 200 OK\r\n\r\n") client.send("Hello, World") def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost',8080)) sock.listen(5) while True: connection, address = sock.accept() handle_request(connection) connection.close() if __name__ == '__main__': main() |
运行如下:
六. Hello World from Tornado Http Server
Tornado不能算是一个完整的http服务器, 它只实现小部分的http协议,
大部分要靠用户去实现.
tornado其实是一个服务器开发框架, 使用它我们可以快速的开发一个高效的http服务器.
下面我们
就使用tornado再写一个Hello, World的Http服务器.
Python代码
#!/usr/bin/env python # -*- coding:utf-8 -*- import tornado.httpserver import tornado.ioloop def handle_request(request): message = "Hello World from Tornado Http Server" request.write("HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s" % ( len(message), message)) request.finish() http_server = tornado.httpserver.HTTPServer(handle_request) http_server.listen(8080) tornado.ioloop.IOLoop.instance().start() |
运行如下:
实现非常简单, 只需要定义自己的处理方法, 其它的东西全部交给Tornado完成.
简单看一下Tornado做了哪些工作.
首先创建HTTPServer类, 并把我们的处理方法传递过去然后在8080开始监听最后启动事件循环,
开始监听网络事件. 主要是socket的读和写
到了这里, 我有点等不及了, 迫切想了解tornado的内部实现是怎么样的.
特别是想知道Tornado的IOLoop到底是如何工作的. 接下来我们开始解剖Tornado
七. Tornado服务器概览
理解了web服务器的工作流程之后, 我们再来看看Tornado服务器是如何实现这些处理流程的.
Tornado服务器有3大核心模块:
(1) IOLoop
与我们上面那个简陋的http服务器不同, Tornado为了实现高并发和高性能,
使用了一个IOLoop来处理socket的读写事件, IOLoop基于epoll, 可以高效的响应网络事件.
这是Tornado高效的保证.
(2) IOStream
为了在处理请求的时候, 实现对socket的异步读写, Tornado实现了IOStream类,
用来处理socket的异步读写.
(3) HTTPConnection
这个类用来处理http的请求, 包括读取http请求头, 读取post过来的数据,
调用用户自定义的处理方法,以及把响应数据写给客户端socket
下面这幅图描述了tornado服务器的大体处理流程, 接下来我们将会详细分析每一步流程的实现
八. 创建listen socket
httpserver.py, 定位到bind方法:
Python代码
for res in socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, socket.AI_PASSIVE | socket.AI_ADDRCONFIG): af, socktype, proto, canonname, sockaddr = res # 创建listen socket sock = socket.socket(af, socktype, proto) # 设置socket的属性 flags = fcntl.fcntl(sock.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, flags) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) # bind 和 listen sock.bind(sockaddr) sock.listen(128) # 加入ioloop self._sockets[sock.fileno()] = sock if self._started: self.io_loop.add_handler(sock.fileno(), self._handle_events, ioloop.IOLoop.READ)<span style="white-space: normal;"> </span> |
这是实现web服务器的标准步骤, 首先getaddrinfo返回服务器的所有网卡信息,
每块网卡上都要创建监听客户端的请求.
按照socket -> bind -> listen步骤走下来,
最后把新建的listen socket加入ioloop. 那么ioloop又是个什么东西呢?
暂时我们把ioloop理解为一个事件容器. 用户把socket和回调函数注册到容器中,
容器内部会轮询socket, 一旦某个socket可以读写, 就调用回调函数来处理socket的读写事件.
这里, 我们只监听listen socket的读事件, 回调函数为_handle_events,
一旦listen socket可读, 说明客户端请求到来, 然后调用_handle_events接受客户端的请求.
九. accept
httpserver.py, 定位到_handle_events. 这个方法接受客户端的请求.
为了便于分析, 我把处理ssl那部分代码剥离出去了.
Python代码
def _handle_events(self, fd, events): while True: try: connection, address = self._sockets[fd].accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise try: stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True) |
accept方法返回客户端的socket(注意connection的类型是socket),
以及客户端的地址然后创建IOStream对象, 用来处理socket的异步读写. 这一步会调用ioloop.add_handler把client
socket加入ioloop再然后创建HTTPConnection, 处理用户的请求.
十. 创建IOStream
10.1 何为IOStream
accept完成后, 我们就可以用client socket与客户端通信了.
为了实现对client socket的异步读写, 我们为client socket创建两个缓冲区: _read_buffer和_write_buffer,
写: 先写到_write_buffer, 读: 从_read_buffer读. 这样我们就不用直接读写socket,
进而实现异步读写. 这些操作都封装在IOStream类中, 概括来说,IOStream对socket的读写做了一层封装,
通过使用两个缓冲区, 实现对socket的异步读写.
10.2 IOStream的初始化
IOStream与socket是一一对应的, 初始化主要做4个工作
(1) 初始化IOStream对应的socket
(2) 分配输入缓冲区_write_buffer
(3) 分配输出缓冲区_read_buffer
(4) 把socket加入ioloop, 这样当socket可读写的时候,
调用回调函数_handle_events把数据从socket读入buffer, 或者把数据从buffer发送给socket找到iosteram.py,
定位到__init__方法
Python代码
self.socket = socket self.io_loop = io_loop or ioloop.IOLoop.instance() self._read_buffer = collections.deque() self._write_buffer = collections.deque() self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) |
10.3 IOStream提供的接口
IOStream对外提供了3个接口, 用来对socket的读写
(1) write(data)
把数据写入IOStream的_write_buffer
(2) read_until(delimiter, callback)
从_read_buffer读取数据, delimiter作为读取结束符,
完了调用callback
(3) read_bytes(num_of_bytes, callback)
从_read_buffer读取指定大小的数据, 完了调用callback
read_until和read_bytes都会调用_read_from_buffer把从buffer读取数据,
然后调用_consume消耗掉buffer中
的数据.
10.4 体验异步IO
下面我们来看一个异步IO的实例, 这是一个异步http client的例子,
使用IOStream来下载http://nginx.net/index.html
Python代码
#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado import ioloop from tornado import iostream import socket def send_request(): stream.write("GET /index.html HTTP/1.0\r\nHost: nginx.net\r\n\r\n") stream.read_until("\r\n\r\n", on_headers) def on_headers(data): headers = {} for line in data.split("\r\n"): parts = line.split(":") if len(parts) == 2: headers[parts[0].strip()] = parts[1].strip() stream.read_bytes(int(headers["Content-Length"]), on_body) def on_body(data): print data stream.close() ioloop.IOLoop.instance().stop() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) stream = iostream.IOStream(s) stream.connect(("nginx.net", 80), send_request) ioloop.IOLoop.instance().start() |
首先调用connect连接服务器, 完成后回调send_request发出请求,
并读取服务器返回的http协议头, 然后回调on_headers解析协议头, 然后调用read_bytes读取数据体,
然后回调on_body把数据打印出来. 最后关闭stream可以看到, 这一系列的调用都是通过回调函数实现的,
这就是异步的处理方式.
10.5 IOStream响应ioloop事件
上面提到, IOStream初始化的时候, 把socket加入ioloop,
一旦socket可读写, 就调用回调函数_handle_events处理IO事件. 打开iostream.py,
定位到_handle_events
Python代码
def _handle_events(self, fd, events): if not self.socket: logging.warning("Got events for closed stream %d", fd) return try: if events & self.io_loop.READ: self._handle_read() if not self.socket: return if events & self.io_loop.WRITE: if self._connecting: self._handle_connect() self._handle_write() if not self.socket: return if events & self.io_loop.ERROR: # We may have queued up a user callback in _handle_read or # _handle_write, so don't close the IOStream until those # callbacks have had a chance to run. self.io_loop.add_callback(self.close) return state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state) except: logging.error("Uncaught exception, closing connection.", exc_info=True) self.close() raise |
可以看到_handle_events根据IO事件的类型, 来调用不同的处理函数,
对于可读事件, 调用handle_read来处理.handle_read会从socket读取数据, 然后把数据存到_read_buffer.
十一. 处理请求 -- HTTPConnection
HttpConnection类专门用来处理http请求, 处理http请求的一般流程是:
HTTPConnection实现了一系列的函数用来处理这些流程, 参见下图:
至于每个函数是如何实现的, 可以参考代码
十二. IOLoop
在Tornado服务器中, IOLoop是调度的核心模块, Tornado服务器回把所有的socket描述符都注册到IOLoop,
注册的时候指明回调处理函数, IOLoop内部不断的监听IO事件, 一旦发现某个socket可读写, 就调用其注册时指定的回调函数.
IOLoop的结构图如下所示:
下面我们使用IOLoop实现一个简单的TCP服务器, 看完之后相信可以对IOLoop有一个大概的了解.
12.1 A Simple TCP Server Using IOLoop
Python代码
#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado import ioloop from tornado import iostream import socket import errno import functools def handle_connection(client, address): client.send("Hello World from A Simple TCP Server") client.close() def connection_ready(sock, fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return connection.setblocking(0) handle_connection(connection, address) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(("localhost", 8080)) sock.listen(128) io_loop = ioloop.IOLoop.instance() callback = functools.partial(connection_ready, sock) io_loop.add_handler(sock.fileno(), callback, io_loop.READ) io_loop.start() |
创建完listen socket后, 再得到IOLoop的实例, 后面回介绍IOLoop的单例模式.然后调用add_handle把listen
socket注册到ioloop中, 指定监听事件为READ, 指定回调函数为connection_ready.
这样客户端来了一个连接后, 就会调用connecion_ready来处理连接.
12.2 单例模式
看了很多IOLoop的代码, 有一个地方相信大家注意到了, 得到IOLoop对象的时候,
都是通过instance()返回的. 事实上,IOLoop使用了单例模式. 在Tornado运行的整个过程中,
只有一个IOLoop实例. 仅需一个 IOLoop实例, 就可以处理全部的IO事件. 以前学习J2EE的时候接触过Java的单例模式,
接下来看看Python是如何实现单例模式的.
Python代码
#!/usr/bin/env python # -*- coding:utf-8 -*- import os class IOLoop(object): @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance @classmethod def initialized(cls): """Returns true if the singleton instance has been created.""" return hasattr(cls, "_instance") def service(self): print 'Hello,World' print IOLoop.initialized(), ioloop = IOLoop.instance() ioloop.service() if os.fork() == 0: print IOLoop.initialized(), ioloop = IOLoop.instance() ioloop.service() |
代码直接从ioloop.py文件抽取下来的, 演示了Python单例模式的实现方法.
实现相当简洁, 这得益于python强大的自省功能. 代码中使用了cls, 这不是一个关键字, 像self一样,
cls是python的一个built-in变量. self表示类的实例, 而cls表示类,cls一般用于static
method, 因为static method无须实例化就可以调用, 所以传递cls给static method.
然后调用cls()可以创建对象. 就像调用IOLoop()一样.
最后两句话:
Always use 'self' for the first argument
to instance methods.
Always use 'cls' for the first argument
to class methods.
|