编辑推荐: |
本文来自于oschina,本文介绍了经典的ServerSocket监听循环、Reactor
模式以及Worker线程等相关内容。 |
|
Java可扩展IO
Doug Lee
大纲
可扩展的网络服务
事件驱动
Reactor 模式
基础版
多线程版
其他变体
java.io包中分阻塞IO API一览
网络应用服务器
Web服务器,分布式对象系统等等
它们的共同特点
Read请求
解码请求报文
业务处理
编码响应报文
发送响应
实际应用中每一个步骤的不一样
XML解析
文件传输
动态生成网页
计算型服务
经典的服务设计
每个线程运行一个handler
经典的ServerSocket监听循环
class Server
implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// 这里使用单线程或者线程池
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */
}
}
} |
Note: 异常处理省略
高可扩展性目标
压力持续增大时服务优雅的降级(客户端增多)
性能随着资源(CPU,内存,磁盘,带宽)的提升持续增加
高可用和性能的目标
低延迟
应对请求尖峰
服务质量可控
分而治之是解决扩展性问题的常用方法
分而治之
把处理流程切分成更小的task,每个task都是非阻塞的
只有当任务准备好才去执行,IO事件模型通常是触发器机制
java.nio包包含基本的非阻塞机制
非阻塞读和写
分发任务 把task关联到IO事件模型
更多想法
事件驱动设计
事件驱动设计
通常更高效
更少的资源 不需要为每一个client启动一个线程
减少开销 减少上下文切换以及更少的锁竞争
事件分发较慢 必须把action和时间绑定在一起
编程难度较高
必须把流程分割为一系列非阻塞的任务单元
与GUI的事件模型类似
不能消除所有的阻塞,如:GC、内存页错误等等
必须注意服务的状态的变化
Java AWT图形编程的事件驱动
事件驱动的IO模型与此类似,但设计上有所不同
Reactor模型
Reacttor负责分发事件到对应的handler,类似AWT线程
Handlers是非阻塞的任务,类似AWT中的ActionListeners
Manage负责把handler绑定到事件上
参见施密特等人所著的《Pattern-Oriented Software Architecture》卷2
以及Richard Stevens关于网络的数以及MattWelsh的SEDA框架等等
Reactor基础版
单线程版
java.nio提供的支持
Channels
通道是文件、socket之间的连接, 支持非阻塞读取
Buffers
数组对象,可以被Channel直接读写
Selectors
负责筛选那些Channel有IO事件
SelectionKeys
保存IO事件状态和绑定对象
Reactor 1:创建
class Reactor
implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind( new InetSocketAddress(port)
);
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register( selector,
SelectionKey.OP_ACCEPT );
sk.attach(new Acceptor());
}
/*
也可以用SPI provider,更明确一些:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/ |
Reactor 2:分发循环
public void
run() { // 通常启动一个新线程
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
} |
Reactor 3:Acceptor
class Acceptor
implements Runnable { // 内置类
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
} |
Reactor 4:建立Handler
final class
Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws
IOException {
socket = c;
c.configureBlocking(false);
// 尝试监听读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ } |
Reactor 5:处理请求
public void
run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 读完之后,通常监听写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
} |
Handlers状态流转
GoF的状态模式的简单应用
class Handler
{ // ...
public void run() { // 初始状态是read
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
} |
多线程版
启动适当的线程数,尤其是多核情况下
工作线程
Reactors必须快速触发handlers
Handler任务重,会拖慢Reactor
把计算型的工作交给其他的线程
多个Readctor线程
Reactor线程专注于IO操作
把压力分摊到其他的reactor
负载均衡要考虑适配CPU和IO速率
Worker线程
只处理计算型任务,加速Reactor线程
类似于POSA2书上的Proactor模式
比改成事件驱动模式简单
只处理计算型任务
重复IO比较难处理
最好在第一次读的时候就全部读出来到缓存中
使用线程池,方便控制
通常只需少量的线程,比客户端数量少的多
Worker线程池
带线程池的Handler
class Handler
implements Runnable {
// 使用 util.concurrent中的线程池
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
} |
调用任务的方式
链式传递
每个任务负责调用下一个任务,通常是效率最高的,但容易出问题
由到每个handler的分发器回调
通过设置状态,绑定对象等方式实现,GoF Mediator模式的变体
队列
就像上面的例子,通过队列传递buffer中的数据
Future
调用方通过join或者wait/notify方法获取每个task的执行结果
使用PooledExecutor
调度worker线程
主要方法:execute(Runnable r)
有如下控制参数
队列类型
最大线程数
最小线程数
"Warm" versus on-demand threads
自动回收空闲线程
需要的时候再创建新的线程
多种策略应对任务饱和
阻塞,丢弃,producer-runs,等等
多个Reactor线程
使用Reactor池
适配CPU和IO速率
静态创建或动态创建
每一个reactor都有自己的Selector,线程,分发循环
主acceptor分发给其他的reactors
Selector[] selectors;
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
} |
多Reactor示例
其它的java.nio特性
每个Reactor包含多个Selector
把不同的handler绑定到不同的IO事件时需要特别小心同步问题
文件传输
自动文件传输:file-to-net或者net-to-file复制
内存映射文件
通过buffers访问文件
直接访问buffer
有时可以达到零拷贝的目的
But have setup and finalization overhead
非常适合长连接应用
扩展网络连接
同时收到多个请求
客户端建立连接
客户端发送一连串的消息/请求
客户端断开连接
举个例子
数据库事务监视器
多人在线游戏、聊天室等等
扩展上文的基础网络模型
保持许多相对长时间的存活的客户端
跟踪客户端,保持会话状态(包括丢弃)
分布式服务,横跨多个主机
API一览
Buffer
ByteBuffer
(CharBuffer、LongBuffer等等)
Channel
SelectableChannel
SocketChannel
ServerSocketChannel
FileChannel
Selector
SelectionKey
Buffer
abstract class
Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
} |
ByteBuffer
abstract class
ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset,
int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
} |
Channel
interface Channel
{
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel
{
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel
{
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel
{
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel
{
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
SelectableChannel
abstract class
SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops) throws
ClosedChannelException;
void configureBlocking(boolean block) throws IOException;
boolean isBlocking();
Object blockingLock();
} |
SocketChannel
abstract class
SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
ServerSocketChannel
abstract class
ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
} |
FileChannel
abstract class
FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean
shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean
shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position,
int size);
}
NOTE: 所有的方法都抛IOException |
Selector
abstract class
Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
} |
SelectionKey
abstract class
SelectionKey {
static final int OP_READ, OP_WRITE, OP_CONNECT,
OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
} |
|