preface

This chapter is the Netty source code of the last chapter, learn Netty out of the stack event processing. The example still uses the custom protocol from the previous chapter.

  • ChannelOutboundBuffer Buffer.
  • The user code uses channel.write, which triggers an out-of-stack write event.
  • The user code uses channel.flush to trigger the off-stack flush event.
  • How to use Netty to provide MessageToByteEncoder to achieve custom coding.

A, ChannelOutboundBuffer

ChannelOutboundBuffer and Channel are one-to-one. A client and a server communicate with each other and hold an outgoing buffer, which has the same lifetime as a Channel.

public final class ChannelOutboundBuffer {
    private final Channel channel;
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final Unsafe unsafe;
    protected abstract class AbstractUnsafe implements Unsafe {
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); }}Copy the code

The ChannelOutboundBuffer is a buffer to which the user code write only writes bytes. Flush is required to write the data in the buffer to the underlying Channel.

Even with writeAndFlush, the underlying execution is still write first, followed by Flush.

// AbstractChannelHandlerContext#invokeWriteAndFlush
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    }
    / /...
}
Copy the code

1, the flow control

ChannelOutboundBuffer The out-of-stack buffer controls the user write rate. The unwritable variable identifies whether a Channel is readable.

private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

private volatile int unwritable; 
Copy the code

To check whether a Channel is readable, read the ChannelOutboundBuffer instance through the unsafeassociated Channel and call the isWritable method of the buffer.

// Channel
@Override
public boolean isWritable(a) {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    returnbuf ! =null && buf.isWritable();
}
Copy the code

ChannelOutboundBuffer Checks whether unwritable is 0.

// ChannelOutboundBuffer
public boolean isWritable(a) {
    return unwritable == 0;
}
Copy the code

The last bit of unwritable is used by the Netty framework to determine whether it is writable.

// ChannelOutboundBuffer
// Netty is writable
private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        // Set the lowest bit of 1 to 0, indicating that it can be written
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            // ...}}}// The Netty setting cannot be written
private void setUnwritable(boolean invokeLater) {
  for (;;) {
    final int oldValue = unwritable;
    // oldValue or 1, set the lowest bit to 1, indicating that it cannot be written
    final int newValue = oldValue | 1;
    / / try to update the status to write, and trigger fireChannelWritabilityChanged
    if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
      / /...}}}Copy the code

In addition, the first 31 bits of unwritable are used to define whether the user can write or not, which is used for user code extension.

// ChannelOutboundBuffer
private static int writabilityMask(int index) {
  if (index < 1 || index > 31) {
    throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
  }
  return 1 << index;
}
// Sets the user-defined writable flag bit
private void setUserDefinedWritability(int index) {
    final int mask = ~writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            / /...}}}// Clear the user-defined writable flag bit
private void clearUserDefinedWritability(int index) {
    final int mask = writabilityMask(index);
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | mask;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            / /...}}}// Check whether the flag bit of index position is 0
public boolean getUserDefinedWritability(int index) {
  return (unwritable & writabilityMask(index)) == 0;
}
Copy the code

When the number of bytes pending totalPendingSize changes, an update of the unwritable flag may be triggered.

// ChannelOutboundBuffer
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// Number of bytes to be processed
private volatile long totalPendingSize;

// Increase the number of bytes to be processed
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // Update the data size for waiting flush
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    / / if waiting for flush data is greater than the high water level (default 64 KB), set the flag bit cannot be written, and fireChannelWritabilityChanged
    if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}// Reduce the number of bytes to be processed
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
      return;
    }
    // Update the number of bytes to be processed
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    / / if the number of bytes to be processed is less than the low water level (default 32 k), set to write and can trigger fireChannelWritabilityChanged
    if(notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); }}Copy the code

Whether the writable flag bit needs to be changed depends on the water level of the ChannelConfig configuration. By default, the writable flag bit is set to unwritable when the number of bytes to be processed exceeds 64K and is reset to writable when the number of bytes to be processed falls below 32K.

public class DefaultChannelConfig implements ChannelConfig {
    private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
}

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
}
Copy the code

** What happens when you write more than 64K data? ** Writes 1 MB data, and no impact is found.

// Example
private static void write64K(Channel channel) throws IOException {
    for (int i = 0; i < 1024; i++) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        buffer.writeBytes(new byte[1024]);
        channel.write(buffer);
    }
    channel.flush();
    channel.close();
}
Copy the code

So how do you use the flow control functionality provided by ChannelOutboundBuffer?

When the write data is found to be unwritable, it synchronously waits for the Channel to be writable. Note that this is not the IO thread of Netty, but the user business thread.

public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
    
    @Override
    public void writeQueryData(final ChannelHandlerContext context,
                               final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
        while (queryCommandExecutor.next()) {
            count++;
            // If a channel is not writable, flush once and block until the channel is writable
            while(! context.channel().isWritable() && context.channel().isActive()) { context.flush();/ / 1
                backendConnection.getResourceSynchronizer().doAwait(); / / 2
            }
            // ...
            / / DatabaseCommunicationEngine MergedResult of cache line out into the Packet
            DatabasePacket dataValue = queryCommandExecutor.getQueryData();
            context.write(dataValue);
            // ...}}}Copy the code

The IO thread wakes up the blocking waiting thread when it finds the channel writable.

public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelWritabilityChanged(final ChannelHandlerContext context) {
        if (context.channel().isWritable()) {
            backendConnection.getResourceSynchronizer().doNotify(); / / 3}}}Copy the code

Problem: Given that execution context.flush is executed on the user business thread, thread A must submit an asynchronous task to the EventLoop (code 1). If the asynchronous task completes immediately in thread B of the EventLoop, use Notify to wake up other blocked threads (code 3). Then thread A will actually call await blocking (code 2), and thread A will never wake up.

Therefore, ShardingProxy will only carry out flow control when the amount of data in the outstack buffer is very large. Configure high water level of 16MB and low water level of 8MB. Theoretically, thread B will wake up only after 8MB data is written to the peer end, and thread A should be waiting for synchronization.

private void groupsNio(final ServerBootstrap bootstrap) {
    workerGroup = new NioEventLoopGroup();
    bootstrap.group(bossGroup, workerGroup)
             // ...
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024.16 * 1024 * 1024))
             // ...
}
Copy the code

Therefore, the ResourceSynchronizer in ShardingProxy is only a lightweight synchronizer whose use is restricted by scenarios.

2. Data structure

The ChannelOutboundBuffer uses two linked list structured queues to store data.

UnflushedEntry means that no entry is sent. The write method writes data to the entry and puts it in the linked list.

FlushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry

public final class ChannelOutboundBuffer {
    // Entry in the unflushedEntry is moved to the flushedEntry before flush is sent
    private Entry flushedEntry;
    If no entry is sent, the write method adds ByteBuf to the list
    private Entry unflushedEntry;
    // Last entry, representing the last bytebuf written to the buffer
    private Entry tailEntry;
    // The number of entries to flush
    private int flushed;
}
Copy the code

Each list node is encapsulated as an Entry that holds the data to be sent and the list pointer.

static final class Entry {
		// The object collector hook uses the object pool
    private final Handle<Entry> handle;
    // Point to the next entry
    Entry next;
    // It can be considered as ByteBuf
    Object msg;
    / /... Omit the other
}
Copy the code

When a user writes data using the write method, WriteTask eventually passes in ByteBuf as MSG, encapsulating it as an entry to join the unflushedEntry.

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // Encapsulate ByteBuf into Entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    / / tailEntry processing
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // Increase the number of bytes to be processed
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
Copy the code

The user calls flush to flush the buffer to the peer end. FlushedEntry first moves all unflushedentries to flushedentries. As you can see, just by moving a pointer to the flushed flushed queue, all data from the unflushed queue is flushed to the flushed queue.

// Move all unflushedentries to flushedEntry
public void addFlush(a) {
    Entry entry = unflushedEntry;
    if(entry ! =null) {
        FlushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        // Count the number of pending flush
        do {
            flushed ++;
            entry = entry.next;
        } while(entry ! =null);
        unflushedEntry = null; }}Copy the code

Then write the entry loop in the flushedEntry to the peer end and remove it.

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // If the number of packets sent returns to 0
        // The linked list was sent clear
        flushedEntry = null;
        // If the node to be removed is the last one, clear the unsent list and tail nodes
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null; }}else {
        // Move the sent node pointer to the next oneflushedEntry = e.next; }}Copy the code

Second, write,

Write Event processing. Write Data is written into the ChannelOutboundBuffer but not immediately sent to the peer end. You need to use Flush or writeAndflush to send data in the memory to the peer end.

// Example
private static void justWrite(Channel channel) throws InterruptedException {
    while (channel.isActive()) {
        String json = "{\"data\": \"hello\"}"; ByteBuf magic = ByteBufAllocator.DEFAULT.buffer().writeShort(XHeader.MAGIC); channel.write(magic); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeByte(XHeader.VERSION_1).writeByte(XHeader.REQUEST).writeInt(json.length()).writeB ytes(json.getBytes()); channel.write(byteBuf); Thread.sleep(1000); channel.flush(); }}Copy the code

Write out-of-stack events propagate along a similar path to other out-of-stack events, but tend to be asynchronous because the business thread and the IO thread are usually two threads. So when channel. write is executed, the actual TailContex submits a WriteTask to the EventLoop corresponding to the Channel and wakes up the Selector.

// TailContext(AbstractChannelHandlerContext)
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...
    // Find the next Handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        / /... I won't go there for the moment, because business threads are different from IO threads
    } else {
        // The business thread performs write and submits the WriteTask to the EventLoop corresponding to the channel
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}
Copy the code

1. Create WriteTask

WriteTask binds the next Handler (Context), MSG to write, channelPromise, and whether to flush. Because WriteTask uses object pools, the init method is used to assign values to member variables after each creation.

static final class WriteTask implements Runnable {
    // WriteTask object pool
    private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
        @Override
        public WriteTask newObject(Handle<WriteTask> handle) {
            return newWriteTask(handle); }});static WriteTask newInstance(AbstractChannelHandlerContext ctx,
            Object msg, ChannelPromise promise, boolean flush) {
        WriteTask task = RECYCLER.get();
        // Initialize WriteTask
        init(task, ctx, msg, promise, flush);
        return task;
    }
  
   	private final Handle<WriteTask> handle;
    private AbstractChannelHandlerContext ctx;
  	private Object msg;
  	private ChannelPromise promise;
  	private int size; // sign bit controls flush
    protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
                                   Object msg, ChannelPromise promise, boolean flush) {
        task.ctx = ctx;
        task.msg = msg;
        task.promise = promise;

        if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {// Defaults to true
          // Calculate write object size in bytes, default = MSG size + 32
          task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
          ctx.pipeline.incrementPendingOutboundBytes(task.size);
        } else {
          task.size = 0;
        }
        // The task is marked with bits to see if it needs to be flush
        Task. size < 0 if MIN_VALUE is specified
        if(flush) { task.size |= Integer.MIN_VALUE; }}}Copy the code

WriteTask initialization, the size of the projected into the MSG object, invoke the DefaultChannelPipeline incrementPendingOutboundBytes method is recorded. This calls the channel’s outbound buffer to increase the size of the bytes to be processed.

// DefaultChannelPipeline
protected void incrementPendingOutboundBytes(long size) {
    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
    if(buffer ! =null) { buffer.incrementPendingOutboundBytes(size); }}Copy the code

If ChannelOutboundBuffer think too much data to be processed, will modify channel cannot be written, and through the pipeline trigger fireChannelWritabilityChanged.

// ChannelOutboundBuffer
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // Update the size of the data to be processed
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    / / if data waiting to be processed is greater than the high water level (default 64 KB), set the flag bit cannot be written, and fireChannelWritabilityChanged
    if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}private void setUnwritable(boolean invokeLater) {
    for (;;) {
      final int oldValue = unwritable;
      final int newValue = oldValue | 1;
      / / try to update the channel are written and trigger fireChannelWritabilityChanged
      if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
        if (oldValue == 0&& newValue ! =0) {
          fireChannelWritabilityChanged(invokeLater);
        }
        break; }}}Copy the code

2. Submit WriteTask

TailContext submits the created WriteTask to the EventLoop corresponding to the Channel, which wakes up the Selector blocking on the select method.

private static boolean safeExecute(EventExecutor executor, Runnable runnable,
        ChannelPromise promise, Object msg, boolean lazy) {
    // ...
    executor.execute(runnable);
}
Copy the code

Recalling the EventLoop commit task, the immediate input parameter is true, meaning that the blocked Selector is immediately woken up.

// SingleThreadEventExecutor
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // Add the task to the task queue
    addTask(task);
    if(! inEventLoop) {// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
        startThread();
        if (isShutdown()) {
					 / /...}}/ / wake EventLoop
    / / key NioEventLoop wakeup method, awakens Java nio. Channels. Selector# wakeup
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
// NioEventLoop
protected void wakeup(boolean inEventLoop) {
  // If another thread wakes up and was not previously awakened, call the selector
  if(! inEventLoop && nextWakeupNanos.getAndSet(AWAKE) ! = AWAKE) { selector.wakeup(); }}Copy the code

3. Execute WriteTask

Because submitting the WriteTask woke up the EventLoop thread that might block on Selector. Select, the EventLoop must be able to handle the IO event Start processing the WriteTask in the task queue immediately (see Chapter 11 EventLoop).

Focus here on the run method of WriteTask.

@Override
public void run(a) {
    try {
        // Reduce the number of bytes to be sent
        decrementPendingOutboundBytes();
        if (size >= 0) {
            // If size>=0 indicates write, just write buffer
            ctx.invokeWrite(msg, promise);
        } else {
            // If size<0, writeAndFlush is requiredctx.invokeWriteAndFlush(msg, promise); }}finally {
        // WriteTask returns to the object poolrecycle(); }}Copy the code

In contrast to create WriteTask, decrementPendingOutboundBytes will reduce ChannlOutboundBuffer recorded in the number of bytes to be processed.

// WriteTask
private void decrementPendingOutboundBytes(a) {
    if(ESTIMATE_TASK_SIZE_ON_SUBMIT) { ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE); }}// DefaultChannelPipeline
protected void decrementPendingOutboundBytes(long size) {
    ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
    if(buffer ! =null) { buffer.decrementPendingOutboundBytes(size); }}Copy the code

Likewise, if a stack buffer of the number of bytes to be processed is less than the water, try to set up channels to write, and may trigger fireChannelWritabilityChanged.

// ChannelOutboundBuffer
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    // Update the number of bytes to be processed
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    / / if the number of bytes to be processed is less than the low water level (default 32 k), set to write and can trigger fireChannelWritabilityChanged
    if(notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); }}private void setWritable(boolean invokeLater) {
    for (;;) {
      final int oldValue = unwritable;
      final int newValue = oldValue & ~1;
      if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
        if(oldValue ! =0 && newValue == 0) {
          fireChannelWritabilityChanged(invokeLater);
        }
        break; }}}Copy the code

After updating the ChannelOutboundBuffer, WriteTask calls the next Context of TailContext to handle writing objects. The next Context is the encoder XRequestEncoder.

/ * * * * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | * * magic number 2 byte | protocol version number 1 byte 1 byte | | message type Data length 4 byte | * * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + * /
public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}Copy the code

After passing through the XRequestEncoder, it is finally propagated to HeadContext, which calls Unsafe to perform the write method.

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}
Copy the code

AbstractChannel filterOutboundMessage hook (AbstractChannel filterOutboundMessage hook); MSG size (AbstractChannel filterOutboundMessage); MSG size (AbstractChannel filterOutboundMessage); Add the linked list to the ChannelOutboundBuffer (Adding the ChannelOutboundBuffer causes the pendingSIze of the outgoing buffer to rise, making the channel unwritable, as shown in Part 1).

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
      // ...
    }
    int size;
    try {
        // A hook to be executed before MSG is added to OutboundBuffer
        msg = filterOutboundMessage(msg);
        Size = ByteBuf#readableBytes
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0; }}catch (Throwable t) {
        // ...
        return;
    }
    // Adds the list of entries to the OutboundBuffer
    outboundBuffer.addMessage(msg, size, promise);
}
Copy the code

For NioSocketChannel, filterOutboundMessage mainly verifies that the MSG instance is a ByteBuf or FileRegion object, since MSG cannot be guaranteed to be a ByteBuf after various conversions of user code.

// AbstractNioByteChannel
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        // If you are not using direct memory, try converting to DirectBuffer
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw newUnsupportedOperationException(...) ; }Copy the code

Third, flush

The flush process is similar to write, in that it addresses the Unsafe flush method directly.

// AbstractUnsafe
@Override
public final void flush(a) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    // Move unflushEntry to flushedEntry
    outboundBuffer.addFlush();
    // loop to flushedEntry ByteBuf to the peer end
    flush0();
}
Copy the code

First, we call the addFlush method in the stack buffer to place the outgoing Entry into the outgoing Entry, as described above. The flush0 method calls the abstract AbstractChannel#doWrite method.

// AbstractUnsafe
protected void flush0(a) {
    if (inFlush0) {
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
    inFlush0 = true;
    if(! isActive()) {// ...
        return;
    }
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        // ...
    } finally {
        inFlush0 = false; }}/**
* Flush the content of the given buffer to the remote peer.
*/
// AbstractChannel
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
Copy the code

NioSocketChannel’s doWrite loops out all flushedentries in the stack buffer, unless the writeSpinCount loop is exceeded 16 times.

// NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    // By default, only 16 cycles of flush are allowed
    int writeSpinCount = config().getWriteSpinCount();
    do {
        // If there is no Entry to flush in the buffer, end
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }
        // Get the upper limit of the number of bytes written to the peer in a batch
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        // Convert NettyByteBuf to JDKByteBuffer
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        // The number of bytebuffers
        int nioBufferCnt = in.nioBufferCount();

        switch (nioBufferCnt) {
            case 0:
                // There is FileRegion in, not just bytebuf
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // A single ByteBuffer is written to the peer end
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                long attemptedBytes = in.nioBufferSize();
                // Write ByteBuffer to the peer end in batches
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                // Adjust the maximum number of bytes written to the peer end in a batch
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                // Remove flushedEntry based on how many bytes are written
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break; }}}while (writeSpinCount > 0);
    // If the processing is not complete, set attention to the WRITE event
    incompleteWrite(writeSpinCount < 0);
}
Copy the code

The doWrite loop does several things inside:

  • ChannelOutboundBuffer. NioBuffers: the conversion of NettyByteBuf JDKByteBuffer, such ability can be called the underlying JDK write data Channel.
  • Channel.write: Calls the write method of JDK’s Channel to write the JDKByteBuffer in batches to the peer end.
  • AdjustMaxBytesPerGatheringWrite: adjusting batch write the number of bytes to the ceiling.
  • ChannelOutboundBuffer. RemoveBytes: according to the writing of the number of bytes, remove flushedEntry.
  • IncompleteWrite: If the loop is not completed 16 times, set SelectionKey to focus on the WRITE event.

AdjustMaxBytesPerGatheringWrite how to adjust each time to write the number of bytes to the limit?

private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
    if (attempted == written) {
        if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
            ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1); }}else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
        ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1); }}Copy the code

The runtime is the number of bytes remaining to be flushed each time in the loop. This number becomes smaller after several loops. Written Indicates the number of bytes actually written to the peer end. OldMaxBytesPerGatheringWrite said current flush to end cap on the number of bytes.

When attempted and written agreement, if attempted * 2 > oldMaxBytesPerGatheringWrite, can be the upper limit for the attempted * 2, this shows that the OS may support more bytes written to the end, Adjust the upper limit dynamically to increase the rate of subsequent brushes.

When the runtime is larger than 4KB and written is smaller than runtime /2, the upper limit is tightened to RUNTIME /2. This indicates that the OS cannot meet the current write rate and needs to tighten the write rate.

How much is the initial value MaxBytesPerGatheringWrite?

When the Channel structure, will construct NioSocketChannelConfig configuration at the same time, through the upper limit of calculateMaxBytesPerGatheringWrite method to calculate the default here.

private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
    private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
    private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        super(channel, javaSocket);
        calculateMaxBytesPerGatheringWrite();
    }
    private void calculateMaxBytesPerGatheringWrite(a) {
      // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
        int newSendBufferSize = getSendBufferSize() << 1;
        if (newSendBufferSize > 0) { setMaxBytesPerGatheringWrite(newSendBufferSize); }}}Copy the code

The number depends on the SO_SNDBUF JDKSocket, depends on the operating system, my local is equal to 128 k, the MaxBytesPerGatheringWrite = 128 k < < 1 = 256 k.

// DefaultSocketChannelConfig
@Override
public int getSendBufferSize(a) {
    try {
        return javaSocket.getSendBufferSize();
    } catch (SocketException e) {
        throw newChannelException(e); }}// Socket
public synchronized int getSendBufferSize(a) throws SocketException {
  	if (isClosed())
    		throw new SocketException("Socket is closed");
  	int result = 0;
  	Object o = getImpl().getOption(SocketOptions.SO_SNDBUF);
  	if (o instanceof Integer) {
    		result = ((Integer)o).intValue();
  	}
  	return result;
}
Copy the code

How do you configure it?

Configure channeloption. SO_SNDBUF, in bytes. You can call the Socket native method to set the SO_SNDBUF size.

public class XClient {
    public static void main(String[] args) throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
              // ...}}}Copy the code

Upon initial Channel, will be called DefaultSocketChannelConfig# setSendBufferSize setting SO_SNDBUF options.

@Override
public SocketChannelConfig setSendBufferSize(int sendBufferSize) {
    try {
        javaSocket.setSendBufferSize(sendBufferSize);
    } catch (SocketException e) {
        throw new ChannelException(e);
    }
    return this;
}
Copy the code

How ChannelOutboundBuffer. RemoveBytes according to the number of bytes written to end, remove flushedEntry?

The channel. write method is a method that writes to the peer side in batches, so it is used to determine which entries need to be removed from the list based on the number of bytes written in batches.

The logic is also common, loop to writtenBytes equals 0, each loop either removes the entry of the head of the list, or the last remaining writtenBytes are not full of readable bytes of the entry of the head node, then move the read pointer of the entry of the head node.

// writtenBytes Indicates the number of bytes written to the peer end in batches
public void removeBytes(long writtenBytes) {
    for (;;) {
        // Get entry to the flushedEntry
        Object msg = current();
        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            // If the number of bytes readable by ByteBuf is less than the number of bytes actually written to the peer end,
            // Indicates that the ByteBuf has been written and the corresponding entry can be removed
            if(writtenBytes ! =0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            // If the flushedEntry is removed from the current flushedEntry, it will trigger a low water level and the channel will become writable
            remove();
        } else { // readableBytes > writtenBytes
            // The remaining writtenBytes are not full of the head node's readable bytes, moving the head node's read pointer
            if(writtenBytes ! =0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}
Copy the code

Fourth, the encode

Netty provides an abstract class MessageToByteEncoder to facilitate user coding.

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {}Copy the code

After inheriting MessageToByteEncoder, the user only needs to implement the decode method. There’s nothing fancy about the implementation, just write the entity class to ByteBuf in its own encoding way.

For custom XRequestEncoder, write XRequest to ByteBuf.

public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}Copy the code

For custom XResponseEncoder, write XResponse to ByteBuf.

public class XResponseEncoder extends MessageToByteEncoder<XResponse> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XResponse msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.RESPONSE);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}Copy the code

1. The role of generic I

The generic I of MessageToByteEncoder represents the entity class that needs to be encoded. MessageToByteEncoder supports multiple encoders in a Pipeline, and only the entity class passed in by the write method needs to match the generic I. If channel.write passes XRequest, XRequestEncoder will be used, and if channel.write passes XResponse, XResponseEncdoer will be used.

When MessageToByteEncoder is constructed, a TypeParameterMatcher is created.

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

    private final TypeParameterMatcher matcher;
    private final boolean preferDirect;
    protected MessageToByteEncoder(boolean preferDirect) {
        // Pass in the current instance, the generic class, and the generic tag gets a TypeParameterMatcher
        matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
        this.preferDirect = preferDirect; }}Copy the code

TypeParameterMatcher is an abstract interface. Only a match method is used to check whether the input parameter matches. The implementation Class is ReflectiveMatcher.

private static final class ReflectiveMatcher extends TypeParameterMatcher {
    private finalClass<? > type; ReflectiveMatcher(Class<? > type) {this.type = type;
    }
    @Override
    public boolean match(Object msg) {
        returntype.isInstance(msg); }}Copy the code

The write method of MessageToByteEncoder uses the responsibility chain mode to determine the MSG instanceof I. If the input MSG is an instanceof I, the code will be executed; otherwise, the next Handler (possibly another Encoder) will be passed.

// Use the matcher to determine whether MSG can be processed
public boolean acceptOutboundMessage(Object msg) throws Exception {
  	return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // If MSG is an instance of the generic I class, the current encoder will process it
        if (acceptOutboundMessage(msg)) {
            I cast = (I) msg;
            // ...
        } else {
            // Otherwise pass to the next Handlerctx.write(msg, promise); }}catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if(buf ! =null) { buf.release(); }}}Copy the code

2. Call encode

The logic of calling the user encode method is also relatively simple.

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            // Secure strong turn
            I cast = (I) msg;
            // Assign ByteBuf, subclasses can be overridden
            buf = allocateBuffer(ctx, cast, preferDirect);
            / / user encode
          	encode(ctx, cast, buf);
            if (buf.isReadable()) {
                // If the user writes data to Bytebuf, it continues propagating backwards,
                // It is propagated to TailContext and written to the out-of-stack buffer
                ctx.write(buf, promise);
            } else {
                // The user did not write data to ByteBuf, freeing resources and propagating an empty Buffer backwards
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null; }}}Copy the code

The extension point for the user is the allocateBuffer method, which allows the user to create an appropriate ByteBuf for writing data, default preferDirect=true.

/**
 * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
 * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
 */
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg,
                           boolean preferDirect) throws Exception {
    if (preferDirect) {
        return ctx.alloc().ioBuffer();
    } else {
        returnctx.alloc().heapBuffer(); }}Copy the code

conclusion

  • ChannelOutboundBuffer Buffer:

    The user writes to this buffer; User flush writes the buffer to the peer end; WriteAndFlush is a combination of these two operations.

    The ChannelOutboundBuffer also provides flow control. The default low watermark is 32K, and the default high watermark is 64K, based on the WriteBufferWaterMark configured by the user.

    Frame internal calls incrementPendingOutboundBytes increase in the current buffer pendingSize, if beyond the high water level, will set up the Channel cannot write, trigger channelWritabilityChanged stack events;

    Frame internal calls decrementPendingOutboundBytes reduce pendingSize in the current buffer, if under low water level, will set up the Channel can be written, trigger channelWritabilityChanged into the stack.

    The user code can handle channelWritabilityChanged events, control, the write speed follow ShardingProxy MySQLCommandExecuteEngine implementation.

    The ChannelOutboundBuffer uses two linked list structured queues to store data.

    UnflushedEntry means that no entry is sent. The write method writes data to the entry and puts it in the linked list.

    FlushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry flushedEntry

  • The user code uses channel.write, which triggers an out-of-stack write event.

    TailContext submits a WriteTask to the Channel’s corresponding EventLoop instead of writing directly to the peer end.

    PendingSize of ChannelOutboundBuffer increases when creating WriteTask.

    Committing the WriteTask reduces the pendingSize of the ChannelOutboundBuffer.

    Unsafe (AbstractUnsafe) Encapsulate Bytebuf as an Entry and put it into the ChannelOutboundBuffer buffer. PendingSize of ChannelOutboundBuffer will also be increased.

    Corresponding ChannelOutboundBuffer pendingSize increase and decrease of triggering channelWritabilityChanged events.

  • The user code uses channel.flush to trigger the off-stack flush event.

    The flush event is ultimately handled by the Unsafe, which writes the ByteBuf cached in the Entry in the ChannelOutboundBuffer to the peer through a Channel.

    ByteBuf finish after Entry, will be removed from the list structure, leads to ChannelOutboundBuffer pendingSize reduce, if under low water level, Channel into a writable, will trigger channelWritabilityChanged incident.

  • Inherit Netty to provide MessageToByteEncoder coder abstract class, can achieve custom coding.

    The generic type in MessageToByteEncoder, which represents the entity classes that the current encoder can handle.

    public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
      private final ObjectMapper objectMapper = new ObjectMapper();
    
      @Override
      protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}Copy the code

    A ChannelPipeline can have multiple encoders, each of which is responsible for different classes of coding work.

    If the encoder handles the encoding of the corresponding Class, but the encode method does not write ByteBuf out, the corresponding MSG will be discarded.