编辑推荐: |
本文来自于oschina,本文介绍了经典的ServerSocket监听循环、Reactor
模式以及Worker线程等相关内容。 |
|
Java可扩展IO
Doug Lee
大纲
可扩展的网络服务
事件驱动
Reactor 模式
基础版
多线程版
其他变体
java.io包中分阻塞IO API一览
网络应用服务器
Web服务器,分布式对象系统等等
它们的共同特点
Read请求
解码请求报文
业务处理
编码响应报文
发送响应
实际应用中每一个步骤的不一样
XML解析
文件传输
动态生成网页
计算型服务
经典的服务设计 ![](images/2018122531.png)
每个线程运行一个handler
经典的ServerSocket监听循环
class Server
implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// 这里使用单线程或者线程池
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */
}
}
} |
Note: 异常处理省略
高可扩展性目标
压力持续增大时服务优雅的降级(客户端增多)
性能随着资源(CPU,内存,磁盘,带宽)的提升持续增加
高可用和性能的目标
低延迟
应对请求尖峰
服务质量可控
分而治之是解决扩展性问题的常用方法
分而治之
把处理流程切分成更小的task,每个task都是非阻塞的
只有当任务准备好才去执行,IO事件模型通常是触发器机制
java.nio包包含基本的非阻塞机制
非阻塞读和写
分发任务 把task关联到IO事件模型
更多想法
事件驱动设计
事件驱动设计
通常更高效
更少的资源 不需要为每一个client启动一个线程
减少开销 减少上下文切换以及更少的锁竞争
事件分发较慢 必须把action和时间绑定在一起
编程难度较高
必须把流程分割为一系列非阻塞的任务单元
与GUI的事件模型类似
不能消除所有的阻塞,如:GC、内存页错误等等
必须注意服务的状态的变化
Java AWT图形编程的事件驱动
![](images/2018122532.png)
事件驱动的IO模型与此类似,但设计上有所不同
Reactor模型
Reacttor负责分发事件到对应的handler,类似AWT线程
Handlers是非阻塞的任务,类似AWT中的ActionListeners
Manage负责把handler绑定到事件上
参见施密特等人所著的《Pattern-Oriented Software Architecture》卷2
以及Richard Stevens关于网络的数以及MattWelsh的SEDA框架等等
Reactor基础版 ![](images/2018122533.png)
单线程版
java.nio提供的支持
Channels
通道是文件、socket之间的连接, 支持非阻塞读取
Buffers
数组对象,可以被Channel直接读写
Selectors
负责筛选那些Channel有IO事件
SelectionKeys
保存IO事件状态和绑定对象
Reactor 1:创建
class Reactor
implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind( new InetSocketAddress(port)
);
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register( selector,
SelectionKey.OP_ACCEPT );
sk.attach(new Acceptor());
}
/*
也可以用SPI provider,更明确一些:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/ |
Reactor 2:分发循环
public void
run() { // 通常启动一个新线程
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
} |
Reactor 3:Acceptor
class Acceptor
implements Runnable { // 内置类
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
} |
![](images/2018122534.png)
Reactor 4:建立Handler
final class
Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws
IOException {
socket = c;
c.configureBlocking(false);
// 尝试监听读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ } |
Reactor 5:处理请求
public void
run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 读完之后,通常监听写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
} |
Handlers状态流转
GoF的状态模式的简单应用
class Handler
{ // ...
public void run() { // 初始状态是read
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
} |
多线程版
启动适当的线程数,尤其是多核情况下
工作线程
Reactors必须快速触发handlers
Handler任务重,会拖慢Reactor
把计算型的工作交给其他的线程
多个Readctor线程
Reactor线程专注于IO操作
把压力分摊到其他的reactor
负载均衡要考虑适配CPU和IO速率
Worker线程
只处理计算型任务,加速Reactor线程
类似于POSA2书上的Proactor模式
比改成事件驱动模式简单
只处理计算型任务
重复IO比较难处理
最好在第一次读的时候就全部读出来到缓存中
使用线程池,方便控制
通常只需少量的线程,比客户端数量少的多
Worker线程池
![](images/2018122535.png)
带线程池的Handler
class Handler
implements Runnable {
// 使用 util.concurrent中的线程池
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
} |
调用任务的方式
链式传递
每个任务负责调用下一个任务,通常是效率最高的,但容易出问题
由到每个handler的分发器回调
通过设置状态,绑定对象等方式实现,GoF Mediator模式的变体
队列
就像上面的例子,通过队列传递buffer中的数据
Future
调用方通过join或者wait/notify方法获取每个task的执行结果
使用PooledExecutor
调度worker线程
主要方法:execute(Runnable r)
有如下控制参数
队列类型
最大线程数
最小线程数
"Warm" versus on-demand threads
自动回收空闲线程
需要的时候再创建新的线程
多种策略应对任务饱和
阻塞,丢弃,producer-runs,等等
多个Reactor线程
使用Reactor池
适配CPU和IO速率
静态创建或动态创建
每一个reactor都有自己的Selector,线程,分发循环
主acceptor分发给其他的reactors
Selector[] selectors;
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
} |
多Reactor示例 ![](images/2018122536.png)
其它的java.nio特性
每个Reactor包含多个Selector
把不同的handler绑定到不同的IO事件时需要特别小心同步问题
文件传输
自动文件传输:file-to-net或者net-to-file复制
内存映射文件
通过buffers访问文件
直接访问buffer
有时可以达到零拷贝的目的
But have setup and finalization overhead
非常适合长连接应用
扩展网络连接
同时收到多个请求
客户端建立连接
客户端发送一连串的消息/请求
客户端断开连接
举个例子
数据库事务监视器
多人在线游戏、聊天室等等
扩展上文的基础网络模型
保持许多相对长时间的存活的客户端
跟踪客户端,保持会话状态(包括丢弃)
分布式服务,横跨多个主机
API一览
Buffer
ByteBuffer
(CharBuffer、LongBuffer等等)
Channel
SelectableChannel
SocketChannel
ServerSocketChannel
FileChannel
Selector
SelectionKey
Buffer
abstract class
Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
} |
![](images/2018122537.png)
ByteBuffer
abstract class
ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset,
int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
} |
Channel
interface Channel
{
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel
{
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel
{
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel
{
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel
{
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
SelectableChannel
abstract class
SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops) throws
ClosedChannelException;
void configureBlocking(boolean block) throws IOException;
boolean isBlocking();
Object blockingLock();
} |
SocketChannel
abstract class
SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
ServerSocketChannel
abstract class
ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
} |
FileChannel
abstract class
FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean
shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean
shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position,
int size);
}
NOTE: 所有的方法都抛IOException |
Selector
abstract class
Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
} |
SelectionKey
abstract class
SelectionKey {
static final int OP_READ, OP_WRITE, OP_CONNECT,
OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
} |
|