使用选择器时,需要将一个或多个可选择的通道注册到选择器对象中,注册后会返回一个选择键,选择器会记住这些通道以及这些通道感兴趣的操作,还会追踪对应的通道是否已经就绪。调用选择器对象的select( )方法,当有通道就绪时,相关的键会被更新。可以获取选择键的集合,从而找到已经就绪的通道。
ServerSocketChannel serverChannel = ServerSocketChannel.open;serverChannel.configureBlocking(false);serverChannel.socket().bind(new InetSocketAddress(1234));Selector selector = Selector.open();serverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) { selector.select(); Iteratoritor = selector.selectedKeys().iterator(); while (itor.hasNext()) { SelectionKey key = itor.next(); itor.remove(); if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.write(ByteBuffer.wrap("hello".getBytes())); channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //read(); } }}
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
//WindowsSelectorProvider.javapublic AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}//WindowsSelectorImpl.javaWindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0);}//PollArrayWrapper.javavoid addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, POLLIN);}
public final SelectionKey register(Selector sel,int ops) throws ClosedChannelException第二个参数ops指示选择键的interest集,可以是OP_READ、OP_WRITE、OP_ACCEPT等,分别表示对读、写、连接到来感兴趣。 注册通道的核心方法是implRegister,仍然以windows为例
protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; }}先看看growIfNeeded方法
private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } }做了两件事:
protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated;}processDeregisterQueue方法主要是对已取消的键集合进行处理,通过调用cancel()方法将选择键加入已取消的键集合中,这个键并不会立即注销,而是在下一次select操作时进行注销,注销操作在implDereg完成
protected void implDereg(SelectionKeyImpl ski) throws IOException{ int i = ski.getIndex(); assert (i >= 0); if (i != totalChannels - 1) { // Copy end one over it SelectionKeyImpl endChannel = channelArray[totalChannels-1]; channelArray[i] = endChannel; endChannel.setIndex(i); pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i); } channelArray[totalChannels - 1] = null; totalChannels--; ski.setIndex(-1); if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { totalChannels--; threadsCount--; // The last thread has become redundant. } fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys keys.remove(ski); selectedKeys.remove(ski); deregister(ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill();}
private void adjustThreadsCount() { if (threadsCount > threads.size()) { // More threads needed. Start more threads. for (int i = threads.size(); i < threadsCount; i++) { SelectThread newThread = new SelectThread(i); threads.add(newThread); newThread.setDaemon(true); newThread.start(); } } else if (threadsCount < threads.size()) { // Some threads become redundant. Remove them from the threads List. for (int i = threads.size() - 1 ; i >= threadsCount; i--) threads.remove(i).makeZombie(); } }
当前线程如果比threadsCount小就新建,如果比threadsCount大就移除,比较容易理解,来 看看线程的run方法
public void run() { while (true) { // poll loop // wait for the start of poll. If this thread has become // redundant, then exit. if (startLock.waitForStart(this)) return; // call poll() try { subSelector.poll(index); } catch (IOException e) { // Save this exception and let other threads finish. finishLock.setException(e); } // notify main thread, that this thread has finished, and // wakeup others, if this thread is the first to finish. finishLock.threadFinished(); }}
private synchronized boolean waitForStart(SelectThread thread) { while (true) { while (runsCounter == thread.lastRun) { try { startLock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (thread.isZombie()) { // redundant thread return true; // will cause run() to exit. } else { thread.lastRun = runsCounter; // update lastRun return false; // will cause run() to poll. } }}
// Triggers threads, waiting on this lock to start polling.private synchronized void startThreads() { runsCounter++; // next run notifyAll(); // wake up threads.}
private synchronized void waitForHelperThreads() { if (threadsToFinish == threads.size()) { // no helper threads finished yet. Wakeup them up. wakeup(); } while (threadsToFinish != 0) { try { finishLock.wait(); } catch (InterruptedException e) { // Interrupted - set interrupted state. Thread.currentThread().interrupt(); } }}
private synchronized void threadFinished() { if (threadsToFinish == threads.size()) { // finished poll() first // if finished first, wakeup others wakeup(); } threadsToFinish--; if (threadsToFinish == 0) // all helper threads finished poll(). notify(); // notify the main thread }
private int updateSelectedKeys() { updateCount++; int numKeysUpdated = 0; numKeysUpdated += subSelector.processSelectedKeys(updateCount); for (SelectThread t: threads) { numKeysUpdated += t.subSelector.processSelectedKeys(updateCount); } return numKeysUpdated; }
private int processSelectedKeys(long updateCount) { int numKeysUpdated = 0; numKeysUpdated += processFDSet(updateCount, readFds, PollArrayWrapper.POLLIN, false); numKeysUpdated += processFDSet(updateCount, writeFds, PollArrayWrapper.POLLCONN | PollArrayWrapper.POLLOUT, false); numKeysUpdated += processFDSet(updateCount, exceptFds, PollArrayWrapper.POLLIN | PollArrayWrapper.POLLCONN | PollArrayWrapper.POLLOUT, true); return numKeysUpdated;}
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds){ int numKeysUpdated = 0; for (int i = 1; i <= fds[0]; i++) { int desc = fds[i]; if (desc == wakeupSourceFd) { synchronized (interruptLock) { interruptTriggered = true; } continue; } MapEntry me = fdMap.get(desc); // If me is null, the key was deregistered in the previous // processDeregisterQueue. if (me == null) continue; SelectionKeyImpl sk = me.ski; // The descriptor may be in the exceptfds set because there is // OOB data queued to the socket. If there is OOB data then it // is discarded and the key is not added to the selected set. if (isExceptFds && (sk.channel() instanceof SocketChannelImpl) && discardUrgentData(desc)) { continue; } if (selectedKeys.contains(sk)) { // Key in selected set if (me.clearedCount != updateCount) { if (sk.channel.translateAndSetReadyOps(rOps, sk) && (me.updateCount != updateCount)) { me.updateCount = updateCount; numKeysUpdated++; } } else { // The readyOps have been set; now add if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && (me.updateCount != updateCount)) { me.updateCount = updateCount; numKeysUpdated++; } } me.clearedCount = updateCount; } else { // Key is not in selected set yet if (me.clearedCount != updateCount) { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } } else { // The readyOps have been set; now add sk.channel.translateAndUpdateReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } } me.clearedCount = updateCount; } } return numKeysUpdated; } }
3、忽略oob data(搜了一下:out of band data指带外数据,有时也称为加速数据, 是指连接双方中的一方发生重要事情,想要迅速地通知对方 ),这也不是用户关心的;
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { return translateReadyOps(ops, sk.nioReadyOps(), sk); }
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes int oldOps = sk.nioReadyOps(); int newOps = initialOps; if ((ops & PollArrayWrapper.POLLNVAL) != 0) { // This should only happen if this channel is pre-closed while a // selection operation is in progress // ## Throw an error if this channel has not been pre-closed return false; } if ((ops & (PollArrayWrapper.POLLERR | PollArrayWrapper.POLLHUP)) != 0) { newOps = intOps; sk.nioReadyOps(newOps); // No need to poll again in checkConnect, // the error will be detected there readyToConnect = true; return (newOps & ~oldOps) != 0; } if (((ops & PollArrayWrapper.POLLIN) != 0) && ((intOps & SelectionKey.OP_READ) != 0) && (state == ST_CONNECTED)) newOps |= SelectionKey.OP_READ; if (((ops & PollArrayWrapper.POLLCONN) != 0) && ((intOps & SelectionKey.OP_CONNECT) != 0) && ((state == ST_UNCONNECTED) || (state == ST_PENDING))) { newOps |= SelectionKey.OP_CONNECT; readyToConnect = true; } if (((ops & PollArrayWrapper.POLLOUT) != 0) && ((intOps & SelectionKey.OP_WRITE) != 0) && (state == ST_CONNECTED)) newOps |= SelectionKey.OP_WRITE; sk.nioReadyOps(newOps); return (newOps & ~oldOps) != 0; }
public final boolean isAcceptable() { return (readyOps() & OP_ACCEPT) != 0;}public final boolean isConnectable() { return (readyOps() & OP_CONNECT) != 0;}public final boolean isWritable() { return (readyOps() & OP_WRITE) != 0;}public final boolean isReadable() { return (readyOps() & OP_READ) != 0;}
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; }
//WindowsSelectorImpl.javaprivate void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd);}private native void setWakeupSocket0(int wakeupSinkFd); //WindowsSelectorImpl.c JNIEXPORT void JNICALLJava_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd){ /* Write one byte into the pipe */ send(scoutFd, (char*)&POLLIN, 1, 0);}
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe();}public Pipe openPipe() throws IOException { return new PipeImpl(this);}PipeImpl(final SelectorProvider sp) throws IOException { try { AccessController.doPrivileged(new Initializer(sp)); } catch (PrivilegedActionException x) { throw (IOException)x.getCause(); }}
创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法
public Void run() throws IOException { ServerSocketChannel ssc = null; SocketChannel sc1 = null; SocketChannel sc2 = null; try { // loopback address InetAddress lb = InetAddress.getByName(""); assert (lb.isLoopbackAddress()); // bind ServerSocketChannel to a port on the loopback address ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0)); // Establish connection (assumes connections are eagerly // accepted) InetSocketAddress sa = new InetSocketAddress(lb, ssc.socket().getLocalPort()); sc1 = SocketChannel.open(sa); ByteBuffer bb = ByteBuffer.allocate(8); long secret = rnd.nextLong(); bb.putLong(secret).flip(); sc1.write(bb); // Get a connection and verify it is legitimate for (;;) { sc2 = ssc.accept(); bb.clear(); sc2.read(bb); bb.rewind(); if (bb.getLong() == secret) break; sc2.close(); } // Create source and sink channels source = new SourceChannelImpl(sp, sc1); sink = new SinkChannelImpl(sp, sc2); } catch (IOException e) { }}
该方法创建了两个通道sc1和sc2,这两个通道都绑定了本地ip,然后sc1向sc2写入了一个随机长整型的数,这两个通道分别做为管道的source与sink端。这相当于利用了回送地址(loopback address)自己向自己写数据,来达到通知的目的。 看看sun solaris的实现
PipeImpl(SelectorProvider sp) { int[] fdes = new int[2]; IOUtil.initPipe(fdes, true); FileDescriptor sourcefd = new FileDescriptor(); IOUtil.setfdVal(sourcefd, fdes[0]); source = new SourceChannelImpl(sp, sourcefd); FileDescriptor sinkfd = new FileDescriptor(); IOUtil.setfdVal(sinkfd, fdes[1]); sink = new SinkChannelImpl(sp, sinkfd);}JNIEXPORT void JNICALLJava_sun_nio_ch_IOUtil_initPipe(JNIEnv *env, jobject this, jintArray intArray, jboolean block){ int fd[2]; jint *ptr = 0; if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return; } if (block == JNI_FALSE) { if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); } } ptr = (*env)->GetPrimitiveArrayCritical(env, intArray, 0); ptr[0] = fd[0]; ptr[1] = fd[1]; (*env)->ReleasePrimitiveArrayCritical(env, intArray, ptr, 0);}