Ò»¡¢Listener
ListenerỊ̈߳¬µ±Server´¦ÓÚÔËÐÐ״̬ʱ£¬Æ为Ôð¼àÌýÀ´×Ô¿Í»§¶ËµÄÁ¬½Ó£¬²¢Ê¹ÓÃSelectģʽ´¦ÀíAcceptʼþ¡£
ͬʱ£¬Ëü¿ªÆôÁËÒ»¸ö¿ÕÏÐÁ¬½Ó£¨Idle Connection£©´¦ÀíÀý³Ì£¬Èç¹ûÓйýÆڵĿÕÏÐÁ¬½Ó£¬¾Í¹Ø±Õ¡£Õâ¸öÀý³Ìͨ¹ýÒ»¸ö¼ÆʱÆ÷À´ÊµÏÖ¡£

µ±select²Ù×÷µ÷ÓÃʱ£¬Ëü¿ÉÄÜ»á×èÈû£¬Õâ¸øÁËÆäËüÏß³ÌÖ´ÐеĻú»á¡£µ±ÓÐacceptʼþ·¢Éú£¬Ëü¾Í»á±»»½ÐÑÒÔ´¦ÀíÈ«²¿µÄʼþ£¬´¦ÀíʼþÊǽøÐÐÒ»¸ödoAcceptµÄµ÷Óá£
doAccept£º
void
doAccept(SelectionKey key) throws InterruptedException,
IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel)
key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null)
{
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection
can get the object
reader.addConnection(c);
}
} |
ÓÉÓÚ¶à¸öÁ¬½Ó¿ÉÄÜͬʱ·¢ÆðÉêÇ룬ËùÒÔÕâÀï²ÉÓÃÁËwhileÑ»·´¦Àí¡£
ÕâÀï×î¹Ø¼üµÄÊÇÉèÖÃÁËн¨Á¢µÄsocketΪ·Ç×èÈû£¬ÕâÒ»µãÊÇ»ùÓÚÐÔÄܵĿ¼ÂÇ£¬·Ç×èÈûµÄ·½Ê½¾¡¿ÉÄܵĶÁÈ¡socket½ÓÊÕ»º³åÇøÖеÄÊý¾Ý£¬ÕâÒ»µã±£Ö¤Á˽«À´»áµ÷ÓÃÕâ¸ösocket½øÐнÓÊÕµÄReaderºÍ½øÐз¢Ë͵ÄResponderÏ̲߳»»áÒòΪ·¢ËͺͽÓÊÕ¶ø×èÈû£¬Èç¹ûÕû¸öͨѶ¹ý³Ì¶¼±È½Ï·±Ã¦£¬ÄÇôReaderºÍResponderÏ̵߳ľͿÉÒÔ¾¡Á¿²»×èÈûÔÚI/OÉÏ£¬ÕâÑù¿ÉÒÔÏÔÖø¼õÉÙÏß³ÌÉÏÏÂÎÄÇл»µÄ´ÎÊý£¬Ìá¸ßcpuµÄÀûÓÃÂÊ¡£
×îºó£¬»ñÈ¡ÁËÒ»¸öReader£¬½«´ËÁ¬½Ó¼ÓÈëReaderµÄ»º³å¶ÓÁУ¬Í¬Ê±ÈÃÁ¬½Ó¹ÜÀíÆ÷¼àÊÓ²¢¹ÜÀíÕâ¸öÁ¬½ÓµÄÉú´æÆÚ¡£
»ñÈ¡ReaderµÄ·½Ê½ÈçÏ£º
//×î¼òµ¥µÄ¸ºÔؾùºâ
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
} |
¶þ¡¢Reader
µ±Ò»¸öн¨Á¢µÄÁ¬½Ó±»¼ÓÈëReaderµÄ»º³å¶ÓÁÐpendingConnectionsÖ®ºó£¬ReaderÒ²±»»½ÐÑ£¬ÒÔ´¦Àí´ËÁ¬½ÓÉϵÄÊý¾Ý½ÓÊÕ¡£
public
void addConnection(Connection conn) throws InterruptedException
{
pendingConnections.put(conn);
readSelector.wakeup();
} |
ServerÖÐÅäÖÃÁ˶à¸öReaderỊ̈߳¬ÏÔÈ»ÊÇΪÁËÌá¸ß²¢·¢·þÎñÁ¬½ÓµÄÄÜÁ¦¡£
ÏÂÃæÊÇReaderµÄÖ÷ÒªÂß¼£º
while(true)
{
...
//È¡³öÒ»¸öÁ¬½Ó£¬¿ÉÄÜ×èÈû
Connection conn = pendingConnections.take();
//Ïòselect×¢²áÒ»¸ö¶Áʼþ
conn.channel.register(readSelector, SelectionKey.OP_READ,
conn);
...
//½øÐÐselect£¬¿ÉÄÜ×èÈû
readSelector.select();
...
//ÒÀ´Î¶ÁÈ¡Êý¾Ý
for(keys){
doRead(key);
}
...
} |
µ±Server»¹ÔÚÔËÐÐʱ£¬ReaderÏ߳̾¡¿ÉÄܶàµØ´¦Àí»º³å¶ÓÁÐÖеÄÁ¬½Ó£¬×¢²áÿһ¸öÁ¬½ÓµÄREADʼþ£¬²ÉÓÃselectģʽÀ´»ñÈ¡Á¬½ÓÉÏÓÐÊý¾Ý½ÓÊÕµÄ֪ͨ¡£µ±ÓÐÊý¾ÝÐèÒª½ÓÊÕʱ£¬Ëü¾¡×î´ó¿ÉÄܶÁÈ¡select·µ»ØµÄÁ¬½ÓÉϵÄÊý¾Ý£¬ÒÔ·ÀÖ¹ListenerÏß³ÌÒòΪûÓÐÔËÐÐʱ¼ä¶ø·¢Éú¼¢¶ö£¨starving£©¡£
Èç¹ûListenerÏ̼߳¢¶ö£¬Ôì³ÉµÄ½á¹ûÊDz¢·¢ÄÜÁ¦¼±¾çϽµ£¬À´×Ô¿Í»§¶ËµÄÐÂÁ¬½ÓÇëÇó³¬Ê±»òÎÞ·¨½¨Á¢¡£
×¢ÒâÔÚ´Ó»º³å¶ÓÁÐÖлñÈ¡Á¬½Óʱ£¬Reader¿ÉÄܻᷢÉú×èÈû£¬ÒòΪËü²ÉÓÃÁËLinkedBlockingQueueÀàÖеÄtake·½·¨£¬Õâ¸ö·½·¨ÔÚ¶ÓÁÐΪ¿Õʱ»á×èÈû£¬ÕâÑùReaderÏ̵߳ÃÒÔ×èÈû£¬ÒÔ¸øÆäËüÏß³ÌÖ´ÐеÄʱ¼ä¡£
ReaderÏ̵߳Ļ½ÐÑʱ»úÓÐÁ½¸ö£º
Listener½¨Á¢ÁËÐÂÁ¬½Ó£¬²¢½«´ËÁ¬½Ó¼ÓÈë1¸öReaderµÄ»º³å¶ÓÁÐ;
selectµ÷Ó÷µ»Ø¡£
ÔÚReaderµÄdoReadµ÷ÓÃÖÐ,ÆäÖ÷Òªµ÷ÓÃÁËreadAndProcess·½·¨£¬´Ë·½·¨Ñ»·´¦ÀíÊý¾Ý£¬½ÓÊÕÊý¾Ý°üµÄÍ·²¿¡¢ÉÏÏÂÎÄÍ·²¿ºÍÕæÕýµÄÊý¾Ý¡£
Õâ¸ö¹ý³ÌÖÐÖµµÃÒ»ÌáµÄÊÇÏÂÃæµÄÕâ¸öchannelRead·½·¨£º
private
int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT)
?
channel.read(buffer) : channelIO(channel, null,
buffer);
if (count > 0) {
rpcMetrics.incrReceivedBytes(count);
}
return count;
} |
channelRead»áÅжÏÊý¾Ý½ÓÊÕÊý×ébufferÖеÄÊ£Óàδ¶ÁÊý¾Ý£¬Èç¹û´óÓÚÒ»¸öÁÙ½çÖµNIO_BUFFER_LIMIT£¬¾Í²ÉÈ¡·ÖƬµÄ¼¼ÇÉÀ´¶à´ÎµØ¶Á£¬ÒÔ·ÀÖ¹jdk¶Ôlarge
buffer²ÉÈ¡±äΪdirect bufferµÄÓÅ»¯¡£
ÕâÒ»µã£¬Ò²ÐíÊÇ¿¼Âǵ½direct bufferÔÚ½¨Á¢Ê±»áÓÐһЩ¿ªÏú£¬Í¬Ê±ÔÚjdk1.6֮ǰdirect
buffer²»»á±»GC»ØÊÕ£¬ÒòΪËüÃÇ·ÖÅäÔÚJVMµÄ¶ÑÍâµÄÄÚ´æ¿Õ¼äÖС£
µ½µ×ÕâÑùÓÅ»¯µÄЧ¹ûÈçºÎ£¬Ã»ÓвâÊÔ£¬Ò²¾ÍÂÔ¹ý¡£Ò²ÐíÊÇΪÁ˼õÉÙGCµÄ¸ºµ£¡£
ÔÚReader¶ÁÈ¡µ½Ò»¸öÍêÕûµÄRpcRequest°üÖ®ºó£¬»áµ÷ÓÃprocessOneRpc·½·¨£¬´Ëµ÷Óý«½øÈëÒµÎñÂß¼»·½Ú¡£Õâ¸ö·½·¨£¬»á´Ó½ÓÊܵ½µÄÊý¾Ý°üÖУ¬·´ÐòÁл¯³öRpcRequestµÄÍ·²¿ºÍÊý¾Ý£¬ÒÀ´Ë¹¹ÔìÒ»¸öRpcRequest¶ÔÏó£¬ÉèÖÿͻ§¶ËÐèÒªµÄ¸ú×ÙÐÅÏ¢£¨trace
info£©£¬È»ºó¹¹ÔìÒ»¸öCall¶ÔÏó£¬ÈçÏÂͼËùʾ£º

´Ëºó£¬ÔÚHandler´¦Àíʱ£¬¾ÍÒÔCallΪµ¥Î»£¬ÕâÊÇÒ»¸ö°üº¬ÁËÓëÁ¬½ÓÏà¹ØÐÅÏ¢µÄ·â×°¶ÔÏó¡£
ÓÐÁËCall¶ÔÏóºó£¬½«Æä¼ÓÈëServerµÄcallQueue¶ÓÁУ¬ÒÔ¹©Handler´¦Àí¡£ÒòΪ²ÉÓÃÁËput·½·¨£¬ËùÒÔµ±callQueueÂúʱ£¨Handler棩£¬Reader»á·¢Éú×èÈû£¬ÈçÏÂËùʾ£º
callQueue.put(call);
// queue the call; maybe blocked here |
Èý¡¢Handler
Handler¾ÍÊǸù¾ÝrpcÇëÇóÖеķ½·¨£¨Call£©¼°²ÎÊý£¬À´µ÷ÓÃÏàÓ¦µÄÒµÎñÂß¼½Ó¿ÚÀ´´¦ÀíÇëÇó¡£
Ò»¸öServerÖÐÓжà¸öHandler,¶ÔÓ¦¶à¸öÒµÎñ½Ó¿Ú£¬±¾Æª²»ÌÖÂÛ¾ßÌåÒµÎñÂß¼¡£
handlerµÄÂß¼»ù±¾ÈçÏ£¨ÂÔÈ¥Òì³£ºÍÆäËü´ÎÒªÐÅÏ¢£©£º
public
void run() {
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
try {
final Call call = callQueue.take(); // pop the
queue; maybe blocked here
CurCall.set(call);
try {
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName,
call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs(...);
}
} catch (Throwable e) {
//ÂÔ ...
}
CurCall.set(null);
synchronized (call.connection.responseQueue)
{
responder.doRespond(call);
}
} |
¿É¼û£¬Handler´ÓcallQueueÖÐÈ¡³öÒ»¸öCall£¬È»ºóµ÷ÓÃÕâ¸öServer.call·½·¨£¬×îºóµ÷ÓÃResponderµÄdoResponde·½·¨½«½á¹û·¢Ë͸ø¿Í»§¶Ë¡£
Server.call·½·¨£º
public
Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws
Exception {
return getRpcInvoker(rpcKind).call(this, protocol,
rpcRequest,
receiveTime);
} |
ËÄ¡¢Responder
Ò»¸öServerÖ»ÓÐ1¸öResponderÏ̡߳£
´ËÏ̲߳»¶Ï½øÐÐÈçϼ¸¸öÖØÒªµ÷ÓÃÒÔºÍHandlerе÷²¢·¢ËÍÊý¾Ý£º
//Õâ¸öwaitÊÇͬ²½×÷Ó㬾ßÌå¼ûÏÂÃæ·ÖÎö
waitPending();
...
//¿ªÊ¼select£¬»òÐí»á×èÈû
writeSelector.select(PURGE_INTERVAL);
...
//Èç¹ûselectKeysÓÐÊý¾Ý£¬¾ÍÒÀ´ÎÒì²½·¢ËÍÊý¾Ý
for(selectorKeys){
doAsyncWrite(key);
}
...
//µ±µ½´ï¶ªÆúʱ¼ä£¬»á´ÓselectedKeys¹¹Ôìcalls£¬²¢ÒÀ´Î¶ªÆú
for(Call call : calls) {
doPurge(call, now);
} |
µ±Handlerµ÷ÓÃdoRespond·½·¨ºó£¬handler´¦ÀíµÄ½á¹û±»¼ÓÈëresponseQueueµÄ¶Ó⣬¶ø²»ÊÇÁ¢¼´·¢Ëͻؿͻ§¶Ë£º
void
doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue)
{
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() ==
1) {
//×¢ÒâÕâÀïisHandler = true,±íʾ¿ÉÄÜ»áÏòselect
×¢²áдʼþÒÔÔÚResponderÖ÷Ñ»·ÖÐͨ¹ýselect´¦ÀíÊý¾Ý·¢ËÍ
processResponse(call.connection.responseQueue,
true);
}
}
} |
ÉÏÃæµÄsynchronized ¿ÉÒÔ¿´³ö£¬responseQueueÊÇÕùÓÃ×ÊÔ´£¬ÏàÓ¦µÄ£º
HandlerÊÇÉú²úÕߣ¬½«½á¹û¼ÓÈë¶ÓÁУ»
ResponderÊÇÏû·ÑÕߣ¬´Ó¶ÓÁÐÖÐÈ¡³ö½á¹û²¢·¢ËÍ¡£
processResponse½«Æô¶¯Responder½øÐз¢ËÍ£¬Ê×ÏÈ´ÓresponseQueueÖÐÒÔ·Ç×èÈû·½Ê½È¡³öÒ»¸öcall£¬È»ºóÒÔ·Ç×èÈû·½Ê½¾¡Á¦·¢ËÍcall.rpcResponse£¬Èç¹û·¢ËÍÍê±Ï£¬Ôò·µ»Ø¡£
µ±»¹ÓÐÊ£ÓàÊý¾Ýδ·¢ËÍ£¬½«call²åÈë¶ÓÁеĵÚÒ»¸öλÖã¬ÓÉÓÚisHandler²ÎÊý£¬ÔÚÀ´×ÔHandlerµÄµ÷ÓÃÖд«ÈëΪtrue£¬ËùÒԻỽÐÑwriteSelector£¬²¢×¢²áÒ»¸öдʼþ£¬ÆäÖÐincPending()·½·¨£¬ÊÇΪÁËÔÚÏòselector×¢²áдʼþʱ£¬×èÈûResponderỊ̈߳¬ºóÃæÓзÖÎö¡£
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// set the serve time when the response has
to be sent later
call.timestamp = Time.now();
incPending();
try {
// Wakeup the thread blocked on select, only
then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE,
call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
} |
Ôٻص½ResponderµÄÖ÷Ñ»·£¬¿´¿´Èç¹ûÏòselect×¢²áÁËдʼþ»á·¢Éúʲô£º
//Ö´ÐÐÕâ¾äʱ£¬Èç¹ûHandlerµ÷ÓõÄresponder.doResonde()
ÕýÔÚÏòselect×¢²áдʼþ£¬ÕâÀï¾Í»á×èÈû
//Ä¿µÄºÜÏÔÈ»£¬ÊÇΪÁËϾäµÄselectÄÜ»ñÈ¡Êý¾Ý²¢Á¢¼´·µ»Ø£¬
Õâ¾Í¼õÉÙÁË×èÈû·¢ÉúµÄ´ÎÊý
waitPending(); // If a channel is being registered,
wait.
//ÕâÀïÓó¬Ê±×èÈûÀ´select,ÊÇΪÁËÄܹ»ÔÚûÓÐÊý¾Ý·¢ËÍʱ£¬
¶¨ÆÚ»½ÐÑ£¬ÒÔ´¦Àí³¤ÆÚδµÃµ½´¦ÀíµÄCall
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable())
{
//Òì²½·¢ËÍ
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.info(Thread.currentThread().getName() +
":
doAsyncWrite threw exception " + e);
}
} |
ÖصãÄÚÈݶ¼×öÁË×¢ÊÍ£¬²»ÔÙ׸Êö¡£¿ÉÒÔ¿´³ö£¬¼È¿¼ÂÇͬ²½£¬ÓÖ¿¼ÂÇÐÔÄÜ£¬ÕâÊÇÖµµÃѧϰµÄµØ·½¡£
Îå¡¢×ܽá
±¾Æª×ÅÖØ·ÖÎöÁËhadoopµÄrpcµ÷ÓÃÖÐserver²¿·Ö£¬¿ÉÒÔ¿´³ö£¬ÕâÊÇÒ»¸ö¾«Á¼µÄÉè¼Æ£¬¿¼ÂǵĺÜϸ¡£
1.¹ØÓÚͬ²½£º
ListenerÉú²ú£¬ReaderÏû·Ñ£»ReaderÉú²ú£¬HandlerÏû·Ñ£¬HandlerÉú²ú£¬ResponderÏû·Ñ¡£
ËùÒÔËüÃÇÖ®¼ä±ØÐëͬ²½.ÔÚ¾ßÌåµÄhadoopʵÏÖÖУ¬¼ÈÓÐÀûÓÃBlockingQueueµÄput&take²Ù×÷ʵÏÖ×èÈû£¬ÒԴﵽͬ²½Ä¿µÄ£¬Ò²¶ÔÕùÓÃ×ÊԴʹÓÃsynchronizedÀ´ÊµÏÖͬ²½¡£
2.¹ØÓÚ»º³å£º
ÆäÖм¸¸ö»º³å¶ÓÁÐÒ²ÖµµÃ¹Ø×¢.ServerµÄ²¢·¢ÇëÇó»áÌرð¶à£¬¶øHandlerÔÚÖ´ÐÐcall½øÐÐÒµÎñÂ߼ʱ£¬¿Ï¶¨»áÂýÏÂÀ´£¬ËùÒÔ±ØÐ뽨Á¢ÇëÇóºÍ´¦ÀíÖ®¼äµÄ»º³å¡£
ÁíÍ⣬´¦ÀíºÍ·¢ËÍÖ®¼äҲͬÑù»á³öÏÖËÙÂʲ»Æ¥ÅäµÄÏÖÏó£¬Í¬ÑùÐèÒª»º³å¡£
3.¹ØÓÚÏß³ÌÄ£ÐÍ£º
Listenerµ¥Ị̈߳¬Reader¶àỊ̈߳¬Handler¶àỊ̈߳¬Responderµ¥Ị̈߳¬ÎªÊ²Ã´»áÕâÑùÉè¼Æ£¿Listener²ÉÓÃselectģʽ´¦Àíacceptʼþ£¬Ò»¸ö¿Í»§¶ËÔÚÒ»¶Îʱ¼äÄÚÒ»°ãÖ»½¨Á¢ÓÐÏ޴εÄÁ¬½Ó£¬¶øÇÒÁ¬½ÓµÄ½¨Á¢ÊDZȽϿìµÄ£¬ËùÒÔµ¥Ïß³Ì×ã¹»Ó¦¸¶£¬½¨Á¢ºóÖ±½Ó¶ª¸øReader£¬´Ó¶ø×Ô¼ººÜ´ÓÈݵØÓ¦¸¶ÐÂÁ¬½Ó¡£Handler¶àỊ̈߳¬ÒµÎñÂß¼ÊÇ´óÍ·£¬Óֺܴó¿ÉÄÜ»áÇ£ÉæI/OÃܼ¯(HDFS)£¬Èç¹ûÏß³ÌÉÙ£¬ºÄʱ¹ý³¤µÄÒµÎñÂß¼¿ÉÄܾͻáÈô󲿷ֵÄHandlerÏ̴߳¦ÓÚ×èÈû£¬ÕâÑùÇá¿ìµÄÒµÎñÂß¼Ò²±ØÐëÅŶӣ¬¿ÉÄܻᷢÉú¼¢¶ö¡£Èç¹ûReaderÊÕ¼¯µÄÇëÇó¶ÓÁг¤Ê±¼ä´¦ÓÚÂúµÄ״̬£¬Õû¸öͨѶ±ØÈ»¶ñ»¯£¬ËùÒÔÕâÊǵäÐ͵ÄÐèÒª½µµÍÏìӦʱ¼ä¡¢ÌáÉýÍÌÍÂÁ¿µÄ¸ß²¢·¢Ê±¿Ì£¬Õâ¸öʱ¿ÌµÄÉÏÏÂÎÄÇл»ÊDZØÐëµÄ£¬²»¾À½á£¬²¢·¢ÎªÖØ¡£ResponderÊǵ¥Ị̈߳¬ÏÔÈ»£¬Responder»á±È½ÏÇáËÉ£¬ÒòΪËäÈ»ÇëÇóºÜ¶à£¬µ«¾¹ýReader->HandlerµÄ»º³åºÍHandlerµÄ´¦Àí£¬ÉÏÒ»ÅúÄÜ·¢ËÍÍêµÄ½á¹ûÒѾ·¢ËÍÁË¡£Responder¸ü¶àµÄÊÇËѼ¯²¢´¦ÀíÄÇЩ³¤½á¹û£¬²¢Í¨¹ý¸ßЧselectģʽÀ´»ñÈ¡½á¹û²¢·¢ËÍ¡£
ÕâÀHandlerÔÚÒµÎñÂß¼µ÷ÓÃÍê±ÏÖ±½Óµ÷ÓÃÁËresponder.doRespond·¢ËÍ£¬ÊÇÒòΪÕâÊǸöÁ¢¼´·µ»ØµÄµ÷Óã¬Õâ¸öµ÷ÓõĺÄʱÊǺÜÉٵģ¬ËùÒÔ²»±ØÈÃHandlerÒòΪ·¢ËͶø×èÈû£¬½øÒ»²½³ä·Ö·¢»ÓÁËHandler¶àÏ̵߳ÄÄÜÁ¦£¬¼õÉÙÁËÏß³ÌÇл»µÄ»ú»á£¬Ç¿µ÷ÁËÆä¶àÏ̲߳¢·¢µÄÓÅÊÆ£¬Í¬Ê±ÓÖΪresponder¼õ¸º£¬ÒÔÔöÇ¿Responderµ¥Ïß³Ì×÷Õ½µÄÐÅÐÄ¡£
4.¹ØÓÚËø
¶ÔHadoopÀ´½²£¬ÒòΪͬ²½ÐèÇó£¬ËùÒÔ¼ÓËøÊDZز»¿ÉÉٵġ£ÐÔÄÜÊÇÐèÒª¿¼ÂÇ£¬µ«ÊÇ´Ó¹¤³ÌµÄ½Ç¶ÈÉÏÀ´¿´£¬Í¨Ñ¶²ãµÄÎȶ¨ÐÔ¡¢´úÂë¿Éά»¤ÐÔ¡¢±£³Ö´úÂë½á¹¹µÄÏà¶Ô¼òµ¥ÐÔ£¨Æä´úÂëÒòÀúÊ·ÔÒòÒѷdz£¸´ÔÓ£©£¬´ó²¿·Ö²ÉÓÃÁËsynchronizedÕâÖÖ±¯¹ÛµÃ¡¢ÖØÐ͵ļÓËø·½Ê½£¬ÕâÑù£¬¿ÉÒÔÏÔÖø¼õÉÙ¶ÔÏóÖ®¼äͬ²½µÄ¸´ÔÓÐÔ£¬¼õÉÙ´íÎóµÄ·¢Éú¡£
Áù¡¢£¨²¹³ä£©RpcServer Ïß³ÌÄ£ÐÍ
NameNodeÆô¶¯¹ý³Ì£º

Ïß³ÌÄ£ÐÍ
Listener 1¸ö£º
1.¼àÌý²¢½ÓÊÜÀ´×Ô¿Í»§¶ËµÄÁ¬½Ó.½«Ð½¨Á¬½Ó·ÅÈëpendingConnections.
2.ÇåÀí¿ÕÏÐÁ¬½Ó.
3.»½ÐÑReader.
Reader N¸ö : ´ÓpendingConnectionsÖлñÈ¡Á¬½Ó£¬¶ÁÈ¡Êý¾Ý£¬´ÓRpcRequest¹¹ÔìCall£¬²¢·ÅÈëcallQueue.
Handler N ¸ö£º
1.´ÓcallQueue»ñÈ¡¿Í»§¶Ëµ÷ÓÃcall£¬²¢Ö´ÐÐ.
2.µ÷ÓÃResponder£¬½«½á¹û¼ÓÈëresponseQueueµÄβ²¿.ÕâÀï»áµ÷ÓÃÒ»´Î·¢ËÍ.Èç¹ûÊý¾Ýδ·¢ËÍÍ꣬ע²áWRITEʼþµ½selector.²¢»½ÐÑResponder.
Responder 1¸ö£º
1.´ÓresponseQueueÖа´ÕÕFIFO˳Ðò·¢ËÍÊý¾Ý.
2.´¦Àíselector select³öµÄÊý¾Ý.
3.ɨÃècallQueue,²¢¶ªÆú¹ýÆÚµÄCall.

|