Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓƵ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Hadoop µÄ Server ¼°ÆäÏß³ÌÄ£ÐÍ·ÖÎö
 
À´Ô´£ºÂëÅ©Íø ·¢²¼ÓÚ£º2017-8-17
  8402  次浏览      27
 

Ò»¡¢Listener

ListenerỊ̈߳¬µ±Server´¦ÓÚÔËÐÐ״̬ʱ£¬Æ为Ôð¼àÌýÀ´×Ô¿Í»§¶ËµÄÁ¬½Ó£¬²¢Ê¹ÓÃSelectģʽ´¦ÀíAcceptʼþ¡£

ͬʱ£¬Ëü¿ªÆôÁËÒ»¸ö¿ÕÏÐÁ¬½Ó£¨Idle Connection£©´¦ÀíÀý³Ì£¬Èç¹ûÓйýÆڵĿÕÏÐÁ¬½Ó£¬¾Í¹Ø±Õ¡£Õâ¸öÀý³Ìͨ¹ýÒ»¸ö¼ÆʱÆ÷À´ÊµÏÖ¡£

µ±select²Ù×÷µ÷ÓÃʱ£¬Ëü¿ÉÄÜ»á×èÈû£¬Õâ¸øÁËÆäËüÏß³ÌÖ´ÐеĻú»á¡£µ±ÓÐacceptʼþ·¢Éú£¬Ëü¾Í»á±»»½ÐÑÒÔ´¦ÀíÈ«²¿µÄʼþ£¬´¦ÀíʼþÊǽøÐÐÒ»¸ödoAcceptµÄµ÷Óá£

doAccept£º

void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {

channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);

Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}

ÓÉÓÚ¶à¸öÁ¬½Ó¿ÉÄÜͬʱ·¢ÆðÉêÇ룬ËùÒÔÕâÀï²ÉÓÃÁËwhileÑ­»·´¦Àí¡£

ÕâÀï×î¹Ø¼üµÄÊÇÉèÖÃÁËн¨Á¢µÄsocketΪ·Ç×èÈû£¬ÕâÒ»µãÊÇ»ùÓÚÐÔÄܵĿ¼ÂÇ£¬·Ç×èÈûµÄ·½Ê½¾¡¿ÉÄܵĶÁÈ¡socket½ÓÊÕ»º³åÇøÖеÄÊý¾Ý£¬ÕâÒ»µã±£Ö¤Á˽«À´»áµ÷ÓÃÕâ¸ösocket½øÐнÓÊÕµÄReaderºÍ½øÐз¢Ë͵ÄResponderÏ̲߳»»áÒòΪ·¢ËͺͽÓÊÕ¶ø×èÈû£¬Èç¹ûÕû¸öͨѶ¹ý³Ì¶¼±È½Ï·±Ã¦£¬ÄÇôReaderºÍResponderÏ̵߳ľͿÉÒÔ¾¡Á¿²»×èÈûÔÚI/OÉÏ£¬ÕâÑù¿ÉÒÔÏÔÖø¼õÉÙÏß³ÌÉÏÏÂÎÄÇл»µÄ´ÎÊý£¬Ìá¸ßcpuµÄÀûÓÃÂÊ¡£

×îºó£¬»ñÈ¡ÁËÒ»¸öReader£¬½«´ËÁ¬½Ó¼ÓÈëReaderµÄ»º³å¶ÓÁУ¬Í¬Ê±ÈÃÁ¬½Ó¹ÜÀíÆ÷¼àÊÓ²¢¹ÜÀíÕâ¸öÁ¬½ÓµÄÉú´æÆÚ¡£

»ñÈ¡ReaderµÄ·½Ê½ÈçÏ£º

//×î¼òµ¥µÄ¸ºÔؾùºâ
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}

¶þ¡¢Reader

µ±Ò»¸öн¨Á¢µÄÁ¬½Ó±»¼ÓÈëReaderµÄ»º³å¶ÓÁÐpendingConnectionsÖ®ºó£¬ReaderÒ²±»»½ÐÑ£¬ÒÔ´¦Àí´ËÁ¬½ÓÉϵÄÊý¾Ý½ÓÊÕ¡£

public void addConnection(Connection conn) throws InterruptedException {
pendingConnections.put(conn);
readSelector.wakeup();
}

ServerÖÐÅäÖÃÁ˶à¸öReaderỊ̈߳¬ÏÔÈ»ÊÇΪÁËÌá¸ß²¢·¢·þÎñÁ¬½ÓµÄÄÜÁ¦¡£

ÏÂÃæÊÇReaderµÄÖ÷ÒªÂß¼­£º

while(true) {
...
//È¡³öÒ»¸öÁ¬½Ó£¬¿ÉÄÜ×èÈû
Connection conn = pendingConnections.take();
//Ïòselect×¢²áÒ»¸ö¶Áʼþ
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
...
//½øÐÐselect£¬¿ÉÄÜ×èÈû
readSelector.select();
...
//ÒÀ´Î¶ÁÈ¡Êý¾Ý
for(keys){
doRead(key);
}
...
}

µ±Server»¹ÔÚÔËÐÐʱ£¬ReaderÏ߳̾¡¿ÉÄܶàµØ´¦Àí»º³å¶ÓÁÐÖеÄÁ¬½Ó£¬×¢²áÿһ¸öÁ¬½ÓµÄREADʼþ£¬²ÉÓÃselectģʽÀ´»ñÈ¡Á¬½ÓÉÏÓÐÊý¾Ý½ÓÊÕµÄ֪ͨ¡£µ±ÓÐÊý¾ÝÐèÒª½ÓÊÕʱ£¬Ëü¾¡×î´ó¿ÉÄܶÁÈ¡select·µ»ØµÄÁ¬½ÓÉϵÄÊý¾Ý£¬ÒÔ·ÀÖ¹ListenerÏß³ÌÒòΪûÓÐÔËÐÐʱ¼ä¶ø·¢Éú¼¢¶ö£¨starving£©¡£

Èç¹ûListenerÏ̼߳¢¶ö£¬Ôì³ÉµÄ½á¹ûÊDz¢·¢ÄÜÁ¦¼±¾çϽµ£¬À´×Ô¿Í»§¶ËµÄÐÂÁ¬½ÓÇëÇó³¬Ê±»òÎÞ·¨½¨Á¢¡£

×¢ÒâÔÚ´Ó»º³å¶ÓÁÐÖлñÈ¡Á¬½Óʱ£¬Reader¿ÉÄܻᷢÉú×èÈû£¬ÒòΪËü²ÉÓÃÁËLinkedBlockingQueueÀàÖеÄtake·½·¨£¬Õâ¸ö·½·¨ÔÚ¶ÓÁÐΪ¿Õʱ»á×èÈû£¬ÕâÑùReaderÏ̵߳ÃÒÔ×èÈû£¬ÒÔ¸øÆäËüÏß³ÌÖ´ÐеÄʱ¼ä¡£

ReaderÏ̵߳Ļ½ÐÑʱ»úÓÐÁ½¸ö£º

Listener½¨Á¢ÁËÐÂÁ¬½Ó£¬²¢½«´ËÁ¬½Ó¼ÓÈë1¸öReaderµÄ»º³å¶ÓÁÐ;

selectµ÷Ó÷µ»Ø¡£

ÔÚReaderµÄdoReadµ÷ÓÃÖÐ,ÆäÖ÷Òªµ÷ÓÃÁËreadAndProcess·½·¨£¬´Ë·½·¨Ñ­»·´¦ÀíÊý¾Ý£¬½ÓÊÕÊý¾Ý°üµÄÍ·²¿¡¢ÉÏÏÂÎÄÍ·²¿ºÍÕæÕýµÄÊý¾Ý¡£

Õâ¸ö¹ý³ÌÖÐÖµµÃÒ»ÌáµÄÊÇÏÂÃæµÄÕâ¸öchannelRead·½·¨£º

private int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {

int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.incrReceivedBytes(count);
}
return count;
}

channelRead»áÅжÏÊý¾Ý½ÓÊÕÊý×ébufferÖеÄÊ£Óàδ¶ÁÊý¾Ý£¬Èç¹û´óÓÚÒ»¸öÁÙ½çÖµNIO_BUFFER_LIMIT£¬¾Í²ÉÈ¡·ÖƬµÄ¼¼ÇÉÀ´¶à´ÎµØ¶Á£¬ÒÔ·ÀÖ¹jdk¶Ôlarge buffer²ÉÈ¡±äΪdirect bufferµÄÓÅ»¯¡£

ÕâÒ»µã£¬Ò²ÐíÊÇ¿¼Âǵ½direct bufferÔÚ½¨Á¢Ê±»áÓÐһЩ¿ªÏú£¬Í¬Ê±ÔÚjdk1.6֮ǰdirect buffer²»»á±»GC»ØÊÕ£¬ÒòΪËüÃÇ·ÖÅäÔÚJVMµÄ¶ÑÍâµÄÄÚ´æ¿Õ¼äÖС£

µ½µ×ÕâÑùÓÅ»¯µÄЧ¹ûÈçºÎ£¬Ã»ÓвâÊÔ£¬Ò²¾ÍÂÔ¹ý¡£Ò²ÐíÊÇΪÁ˼õÉÙGCµÄ¸ºµ£¡£

ÔÚReader¶ÁÈ¡µ½Ò»¸öÍêÕûµÄRpcRequest°üÖ®ºó£¬»áµ÷ÓÃprocessOneRpc·½·¨£¬´Ëµ÷Óý«½øÈëÒµÎñÂß¼­»·½Ú¡£Õâ¸ö·½·¨£¬»á´Ó½ÓÊܵ½µÄÊý¾Ý°üÖУ¬·´ÐòÁл¯³öRpcRequestµÄÍ·²¿ºÍÊý¾Ý£¬ÒÀ´Ë¹¹ÔìÒ»¸öRpcRequest¶ÔÏó£¬ÉèÖÿͻ§¶ËÐèÒªµÄ¸ú×ÙÐÅÏ¢£¨trace info£©£¬È»ºó¹¹ÔìÒ»¸öCall¶ÔÏó£¬ÈçÏÂͼËùʾ£º

´Ëºó£¬ÔÚHandler´¦Àíʱ£¬¾ÍÒÔCallΪµ¥Î»£¬ÕâÊÇÒ»¸ö°üº¬ÁËÓëÁ¬½ÓÏà¹ØÐÅÏ¢µÄ·â×°¶ÔÏó¡£

ÓÐÁËCall¶ÔÏóºó£¬½«Æä¼ÓÈëServerµÄcallQueue¶ÓÁУ¬ÒÔ¹©Handler´¦Àí¡£ÒòΪ²ÉÓÃÁËput·½·¨£¬ËùÒÔµ±callQueueÂúʱ£¨Handler棩£¬Reader»á·¢Éú×èÈû£¬ÈçÏÂËùʾ£º

callQueue.put(call); // queue the call; maybe blocked here

Èý¡¢Handler

Handler¾ÍÊǸù¾ÝrpcÇëÇóÖеķ½·¨£¨Call£©¼°²ÎÊý£¬À´µ÷ÓÃÏàÓ¦µÄÒµÎñÂß¼­½Ó¿ÚÀ´´¦ÀíÇëÇó¡£

Ò»¸öServerÖÐÓжà¸öHandler,¶ÔÓ¦¶à¸öÒµÎñ½Ó¿Ú£¬±¾Æª²»ÌÖÂÛ¾ßÌåÒµÎñÂß¼­¡£

handlerµÄÂß¼­»ù±¾ÈçÏ£¨ÂÔÈ¥Òì³£ºÍÆäËü´ÎÒªÐÅÏ¢£©£º

public void run() {
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
CurCall.set(call);
try {
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs(...);
}
} catch (Throwable e) {
//ÂÔ ...
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
responder.doRespond(call);
}
}

¿É¼û£¬Handler´ÓcallQueueÖÐÈ¡³öÒ»¸öCall£¬È»ºóµ÷ÓÃÕâ¸öServer.call·½·¨£¬×îºóµ÷ÓÃResponderµÄdoResponde·½·¨½«½á¹û·¢Ë͸ø¿Í»§¶Ë¡£

Server.call·½·¨£º

public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}

ËÄ¡¢Responder

Ò»¸öServerÖ»ÓÐ1¸öResponderÏ̡߳£

´ËÏ̲߳»¶Ï½øÐÐÈçϼ¸¸öÖØÒªµ÷ÓÃÒÔºÍHandlerЭµ÷²¢·¢ËÍÊý¾Ý£º

//Õâ¸öwaitÊÇͬ²½×÷Ó㬾ßÌå¼ûÏÂÃæ·ÖÎö
waitPending();
...
//¿ªÊ¼select£¬»òÐí»á×èÈû
writeSelector.select(PURGE_INTERVAL);
...
//Èç¹ûselectKeysÓÐÊý¾Ý£¬¾ÍÒÀ´ÎÒì²½·¢ËÍÊý¾Ý
for(selectorKeys){
doAsyncWrite(key);
}
...
//µ±µ½´ï¶ªÆúʱ¼ä£¬»á´ÓselectedKeys¹¹Ôìcalls£¬²¢ÒÀ´Î¶ªÆú
for(Call call : calls) {
doPurge(call, now);
}

µ±Handlerµ÷ÓÃdoRespond·½·¨ºó£¬handler´¦ÀíµÄ½á¹û±»¼ÓÈëresponseQueueµÄ¶Ó⣬¶ø²»ÊÇÁ¢¼´·¢Ëͻؿͻ§¶Ë£º

void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
//×¢ÒâÕâÀïisHandler = true,±íʾ¿ÉÄÜ»áÏòselect

×¢²áдʼþÒÔÔÚResponderÖ÷Ñ­»·ÖÐͨ¹ýselect´¦ÀíÊý¾Ý·¢ËÍ
processResponse(call.connection.responseQueue, true);
}
}
}

ÉÏÃæµÄsynchronized ¿ÉÒÔ¿´³ö£¬responseQueueÊÇÕùÓÃ×ÊÔ´£¬ÏàÓ¦µÄ£º

HandlerÊÇÉú²úÕߣ¬½«½á¹û¼ÓÈë¶ÓÁУ»

ResponderÊÇÏû·ÑÕߣ¬´Ó¶ÓÁÐÖÐÈ¡³ö½á¹û²¢·¢ËÍ¡£

processResponse½«Æô¶¯Responder½øÐз¢ËÍ£¬Ê×ÏÈ´ÓresponseQueueÖÐÒÔ·Ç×èÈû·½Ê½È¡³öÒ»¸öcall£¬È»ºóÒÔ·Ç×èÈû·½Ê½¾¡Á¦·¢ËÍcall.rpcResponse£¬Èç¹û·¢ËÍÍê±Ï£¬Ôò·µ»Ø¡£

µ±»¹ÓÐÊ£ÓàÊý¾Ýδ·¢ËÍ£¬½«call²åÈë¶ÓÁеĵÚÒ»¸öλÖã¬ÓÉÓÚisHandler²ÎÊý£¬ÔÚÀ´×ÔHandlerµÄµ÷ÓÃÖд«ÈëΪtrue£¬ËùÒԻỽÐÑwriteSelector£¬²¢×¢²áÒ»¸öдʼþ£¬ÆäÖÐincPending()·½·¨£¬ÊÇΪÁËÔÚÏòselector×¢²áдʼþʱ£¬×èÈûResponderỊ̈߳¬ºóÃæÓзÖÎö¡£

call.connection.responseQueue.addFirst(call);

if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = Time.now();

incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}

Ôٻص½ResponderµÄÖ÷Ñ­»·£¬¿´¿´Èç¹ûÏòselect×¢²áÁËдʼþ»á·¢Éúʲô£º

//Ö´ÐÐÕâ¾äʱ£¬Èç¹ûHandlerµ÷ÓõÄresponder.doResonde()

ÕýÔÚÏòselect×¢²áдʼþ£¬ÕâÀï¾Í»á×èÈû
//Ä¿µÄºÜÏÔÈ»£¬ÊÇΪÁËϾäµÄselectÄÜ»ñÈ¡Êý¾Ý²¢Á¢¼´·µ»Ø£¬

Õâ¾Í¼õÉÙÁË×èÈû·¢ÉúµÄ´ÎÊý
waitPending(); // If a channel is being registered, wait.

//ÕâÀïÓó¬Ê±×èÈûÀ´select,ÊÇΪÁËÄܹ»ÔÚûÓÐÊý¾Ý·¢ËÍʱ£¬

¶¨ÆÚ»½ÐÑ£¬ÒÔ´¦Àí³¤ÆÚδµÃµ½´¦ÀíµÄCall
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
//Òì²½·¢ËÍ
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.info(Thread.currentThread().getName() + ":

doAsyncWrite threw exception " + e);
}
}

ÖصãÄÚÈݶ¼×öÁË×¢ÊÍ£¬²»ÔÙ׸Êö¡£¿ÉÒÔ¿´³ö£¬¼È¿¼ÂÇͬ²½£¬ÓÖ¿¼ÂÇÐÔÄÜ£¬ÕâÊÇÖµµÃѧϰµÄµØ·½¡£

Îå¡¢×ܽá

±¾Æª×ÅÖØ·ÖÎöÁËhadoopµÄrpcµ÷ÓÃÖÐserver²¿·Ö£¬¿ÉÒÔ¿´³ö£¬ÕâÊÇÒ»¸ö¾«Á¼µÄÉè¼Æ£¬¿¼ÂǵĺÜϸ¡£

1.¹ØÓÚͬ²½£º

ListenerÉú²ú£¬ReaderÏû·Ñ£»ReaderÉú²ú£¬HandlerÏû·Ñ£¬HandlerÉú²ú£¬ResponderÏû·Ñ¡£

ËùÒÔËüÃÇÖ®¼ä±ØÐëͬ²½.ÔÚ¾ßÌåµÄhadoopʵÏÖÖУ¬¼ÈÓÐÀûÓÃBlockingQueueµÄput&take²Ù×÷ʵÏÖ×èÈû£¬ÒԴﵽͬ²½Ä¿µÄ£¬Ò²¶ÔÕùÓÃ×ÊԴʹÓÃsynchronizedÀ´ÊµÏÖͬ²½¡£

2.¹ØÓÚ»º³å£º

ÆäÖм¸¸ö»º³å¶ÓÁÐÒ²ÖµµÃ¹Ø×¢.ServerµÄ²¢·¢ÇëÇó»áÌرð¶à£¬¶øHandlerÔÚÖ´ÐÐcall½øÐÐÒµÎñÂß¼­Ê±£¬¿Ï¶¨»áÂýÏÂÀ´£¬ËùÒÔ±ØÐ뽨Á¢ÇëÇóºÍ´¦ÀíÖ®¼äµÄ»º³å¡£

ÁíÍ⣬´¦ÀíºÍ·¢ËÍÖ®¼äҲͬÑù»á³öÏÖËÙÂʲ»Æ¥ÅäµÄÏÖÏó£¬Í¬ÑùÐèÒª»º³å¡£

3.¹ØÓÚÏß³ÌÄ£ÐÍ£º

Listenerµ¥Ị̈߳¬Reader¶àỊ̈߳¬Handler¶àỊ̈߳¬Responderµ¥Ị̈߳¬ÎªÊ²Ã´»áÕâÑùÉè¼Æ£¿Listener²ÉÓÃselectģʽ´¦Àíacceptʼþ£¬Ò»¸ö¿Í»§¶ËÔÚÒ»¶Îʱ¼äÄÚÒ»°ãÖ»½¨Á¢ÓÐÏ޴εÄÁ¬½Ó£¬¶øÇÒÁ¬½ÓµÄ½¨Á¢ÊDZȽϿìµÄ£¬ËùÒÔµ¥Ïß³Ì×ã¹»Ó¦¸¶£¬½¨Á¢ºóÖ±½Ó¶ª¸øReader£¬´Ó¶ø×Ô¼ººÜ´ÓÈݵØÓ¦¸¶ÐÂÁ¬½Ó¡£Handler¶àỊ̈߳¬ÒµÎñÂß¼­ÊÇ´óÍ·£¬Óֺܴó¿ÉÄÜ»áÇ£ÉæI/OÃܼ¯(HDFS)£¬Èç¹ûÏß³ÌÉÙ£¬ºÄʱ¹ý³¤µÄÒµÎñÂß¼­¿ÉÄܾͻáÈô󲿷ֵÄHandlerÏ̴߳¦ÓÚ×èÈû£¬ÕâÑùÇá¿ìµÄÒµÎñÂß¼­Ò²±ØÐëÅŶӣ¬¿ÉÄܻᷢÉú¼¢¶ö¡£Èç¹ûReaderÊÕ¼¯µÄÇëÇó¶ÓÁг¤Ê±¼ä´¦ÓÚÂúµÄ״̬£¬Õû¸öͨѶ±ØÈ»¶ñ»¯£¬ËùÒÔÕâÊǵäÐ͵ÄÐèÒª½µµÍÏìӦʱ¼ä¡¢ÌáÉýÍÌÍÂÁ¿µÄ¸ß²¢·¢Ê±¿Ì£¬Õâ¸öʱ¿ÌµÄÉÏÏÂÎÄÇл»ÊDZØÐëµÄ£¬²»¾À½á£¬²¢·¢ÎªÖØ¡£ResponderÊǵ¥Ị̈߳¬ÏÔÈ»£¬Responder»á±È½ÏÇáËÉ£¬ÒòΪËäÈ»ÇëÇóºÜ¶à£¬µ«¾­¹ýReader->HandlerµÄ»º³åºÍHandlerµÄ´¦Àí£¬ÉÏÒ»ÅúÄÜ·¢ËÍÍêµÄ½á¹ûÒѾ­·¢ËÍÁË¡£Responder¸ü¶àµÄÊÇËѼ¯²¢´¦ÀíÄÇЩ³¤½á¹û£¬²¢Í¨¹ý¸ßЧselectģʽÀ´»ñÈ¡½á¹û²¢·¢ËÍ¡£

ÕâÀHandlerÔÚÒµÎñÂß¼­µ÷ÓÃÍê±ÏÖ±½Óµ÷ÓÃÁËresponder.doRespond·¢ËÍ£¬ÊÇÒòΪÕâÊǸöÁ¢¼´·µ»ØµÄµ÷Óã¬Õâ¸öµ÷ÓõĺÄʱÊǺÜÉٵģ¬ËùÒÔ²»±ØÈÃHandlerÒòΪ·¢ËͶø×èÈû£¬½øÒ»²½³ä·Ö·¢»ÓÁËHandler¶àÏ̵߳ÄÄÜÁ¦£¬¼õÉÙÁËÏß³ÌÇл»µÄ»ú»á£¬Ç¿µ÷ÁËÆä¶àÏ̲߳¢·¢µÄÓÅÊÆ£¬Í¬Ê±ÓÖΪresponder¼õ¸º£¬ÒÔÔöÇ¿Responderµ¥Ïß³Ì×÷Õ½µÄÐÅÐÄ¡£

4.¹ØÓÚËø

¶ÔHadoopÀ´½²£¬ÒòΪͬ²½ÐèÇó£¬ËùÒÔ¼ÓËøÊDZز»¿ÉÉٵġ£ÐÔÄÜÊÇÐèÒª¿¼ÂÇ£¬µ«ÊÇ´Ó¹¤³ÌµÄ½Ç¶ÈÉÏÀ´¿´£¬Í¨Ñ¶²ãµÄÎȶ¨ÐÔ¡¢´úÂë¿Éά»¤ÐÔ¡¢±£³Ö´úÂë½á¹¹µÄÏà¶Ô¼òµ¥ÐÔ£¨Æä´úÂëÒòÀúÊ·Ô­ÒòÒѷdz£¸´ÔÓ£©£¬´ó²¿·Ö²ÉÓÃÁËsynchronizedÕâÖÖ±¯¹ÛµÃ¡¢ÖØÐ͵ļÓËø·½Ê½£¬ÕâÑù£¬¿ÉÒÔÏÔÖø¼õÉÙ¶ÔÏóÖ®¼äͬ²½µÄ¸´ÔÓÐÔ£¬¼õÉÙ´íÎóµÄ·¢Éú¡£

Áù¡¢£¨²¹³ä£©RpcServer Ïß³ÌÄ£ÐÍ

NameNodeÆô¶¯¹ý³Ì£º

Ïß³ÌÄ£ÐÍ

Listener 1¸ö£º

1.¼àÌý²¢½ÓÊÜÀ´×Ô¿Í»§¶ËµÄÁ¬½Ó.½«Ð½¨Á¬½Ó·ÅÈëpendingConnections.

2.ÇåÀí¿ÕÏÐÁ¬½Ó.

3.»½ÐÑReader.

Reader N¸ö : ´ÓpendingConnectionsÖлñÈ¡Á¬½Ó£¬¶ÁÈ¡Êý¾Ý£¬´ÓRpcRequest¹¹ÔìCall£¬²¢·ÅÈëcallQueue.

Handler N ¸ö£º

1.´ÓcallQueue»ñÈ¡¿Í»§¶Ëµ÷ÓÃcall£¬²¢Ö´ÐÐ.

2.µ÷ÓÃResponder£¬½«½á¹û¼ÓÈëresponseQueueµÄβ²¿.ÕâÀï»áµ÷ÓÃÒ»´Î·¢ËÍ.Èç¹ûÊý¾Ýδ·¢ËÍÍ꣬ע²áWRITEʼþµ½selector.²¢»½ÐÑResponder.

Responder 1¸ö£º

1.´ÓresponseQueueÖа´ÕÕFIFO˳Ðò·¢ËÍÊý¾Ý.

2.´¦Àíselector select³öµÄÊý¾Ý.

3.ɨÃècallQueue,²¢¶ªÆú¹ýÆÚµÄCall.

 

   
8402 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ