The ByteBuf advantage in Netty

What are the disadvantages of the ByteBuffer used by NIO

1: The capacity cannot be expanded dynamically. The length of the ByteBuffer is fixed and is the initial specified value. The capacity cannot be expanded any more

When reading data, you need to call the buffer.flip() method and switch to read mode. If you are not careful, you may get an error. You can’t read the data or the data you read is wrong

ByteBuf’s strengths and enhancements

1: API operation is more convenient, you can directly write or directly read

2: Supports dynamic capacity expansion. When the data written is larger than the capacity of ByteBuf, the system dynamically expands the capacity without reporting an error

3: Provides a variety of ByteBuf implementation, can be more flexible use

4: Provides an efficient zero-copy mechanism

5: Memory overcommitment is enabled for ByteBuf

ByteBuf Example

ByteBuf operation

There are three important properties in ==ByteBuf: == 1: Capacity capacity, the size of the ByteBuf initially specified

2: readIndex Indicates the read position. During sequential reads, the index value of the read data is recorded

3: writeIndex Indicates the write position. When sequential write is performed, the index value of the data is recorded

==ByteBuf The common methods are as follows: == 1: getByte and setByte. The data at the specified index is obtained randomly and the values of readIndex and writeIndex do not change

2: read*, which changes the value of readIndex

3: write*, which changes the value of writeIndex

4: discardReadBytes: clears the read content

5: clear: clears the buffer

6: Search operation

7: Mark and reset

8: reference counting and release

Simple Demo example

Example of ByteBuf */
public class ByteBufDemo {

    public static void main(String[] args) {
        // Allocate a non-pooled, 10-byte ByteBuf
        ByteBuf buf = Unpooled.buffer(10);

        / / see ByteBuf
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the original ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Write content to ByteBuf
        byte[] bytes = {1.2.3.4.5};
        buf.writeBytes(bytes);
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- write content after ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Read from ByteBuf
        buf.readByte();
        buf.readByte();
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- after reading some content of ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Clear the read content
        // After the read data is cleared, the readIndex changes to 0 and the writeIndex changes to 3
        // The content that is not read later will be copied to the previous value, overwriting the original value
        // When written again,4 and 5 will be overwritten
        buf.discardReadBytes();
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - clear read data after ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Write the content to ByteBuf again
        byte[] bytesO = {6};
        buf.writeBytes(bytesO);
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- writing again after ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Clear the read and write indexes
        //readIndex and writeIndex are reset to 0. ByteBuf contents are not reset
        buf.clear();
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- empty after the read and write the index values of ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Write the content to ByteBuf again
        byte[] bytes2 = {1.2.3};
        buf.writeBytes(bytes2);
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- writing again after ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Empty the contents of ByteBuf
        // Do not reset readIndex and writeIndex
        buf.setZero(0, buf.capacity());
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the content of the empty ByteBuf ByteBuf after -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n");

        // Write data exceeding the specified capacity to ByteBuf again
        // Expansion will be performed
        byte[] bytes3 = {1.2.3.4.5.6.7.8.9.10.11.12};
        buf.writeBytes(bytes3);
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- write again beyond the specified amount of data after ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
        System.out.println("ByteBuf parameter:" + buf.toString());
        System.out.println("Contents of ByteBuf:" + Arrays.toString(buf.array()) + "\n"); }}Copy the code

Output results:The above example uses an in-heap ByteBuf, but let’s look at an out-of-heap ByteBuf example:

 // Allocate a non-pooled, 10-byte directBuffer
ByteBuf buf = Unpooled.directBuffer(10);

/ / see ByteBuf
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the original ByteBuf -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
System.out.println("ByteBuf parameter:" + buf.toString());
Copy the code

DirectBuffer array methods can not use, otherwise an error: Java. Lang. UnsupportedOperationException: direct buffer; And when you use ByteBuf, you use its underlying allocator, not a new one, which we’ll talk about.In the figure above, you can see that readIndex and writeIndex split the buffer into three chunks, and readIndex is going to be less than or equal to writeIndex, which should make sense, because I haven’t even written there yet, and you’re going to read, what can you read.

In-heap and out-of-heap memory

Socket is a network communication API provided by the operating system to upper-layer applications. When the data to be read or written is in the JVM heap, the JVM heap needs to copy the data to the operating system, and then the socket can read the data. The advantage of direct memory is that the socket can read directly. The copy step is missing.

ByteBuf Dynamic capacity expansion

Dynamic expansion must be written to the data, ByteBuf capacity is not enough, to expand, so you need to track the following code:

buf.writeBytes(bytes);
Copy the code

To trace the above writeBytes, we first go into the abstract class ByteBuf and enter the following abstract method:

public abstract ByteBuf writeBytes(byte[] src);
Copy the code

Its implementation class is as follows:To enter the first AbstractByteBuf:

 @Override
 public ByteBuf writeBytes(byte[] src) {
      writeBytes(src, 0, src.length);
      return this;
  }
Copy the code

The following methods are called again:

 @Override
 public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
      // Check whether it can be written
      ensureWritable(length);
      setBytes(writerIndex, src, srcIndex, length);
      // Add the current write position to the length of the data to be written
      writerIndex += length;
      return this;
  }
Copy the code

SRC is the length of the data to be written, and length is the length of the data to be written

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    // Parameter verification
    checkPositiveOrZero(minWritableBytes, "minWritableBytes");
    // Check whether the capacity can write so much data
    ensureWritable0(minWritableBytes);
    return this;
}
Copy the code
// Check whether the parameter is less than 0
public static int checkPositiveOrZero(int i, String name) {
    if (i < 0) {
         throw new IllegalArgumentException(name + ":" + i + " (expected: >= 0)");
     }
     return i;
 }
Copy the code

EnsureWritable0 method:

final void ensureWritable0(int minWritableBytes) {
        // Make sure the buffer is accessible
        ensureAccessible();
        // If the length of the written data is less than or equal to the capacity of the remaining writable data, return the data directly
        // That is, the capacity is sufficient for writing, and no expansion is required
        if (minWritableBytes <= writableBytes()) {
            return;
        }
        if (checkBounds) {
            //maxCapacity is the maximum value of int
            // Check whether the length of the data to be written is larger than the maximum capacity that can be written
            // If yes, throw an exception
            if (minWritableBytes > maxCapacity - writerIndex) {
                throw new IndexOutOfBoundsException(String.format(
                        "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                        writerIndex, minWritableBytes, maxCapacity, this)); }}// The formal expansion method
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        // Add the new capacity
        capacity(newCapacity);
}
Copy the code

Enter the AbstractByteBufAllocator class to expand:

/ / constant 4 m
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page

 @Override
 public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // Check parameters
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        //minNewCapacity = writerIndex + minWritableBytes
        // The index of the data that has been written plus the length of the data currently written is the minimum required capacity
        // If it is larger than the maximum capacity, throw an exception
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
        // If the minimum capacity required is equal to 4M, return 4M as the expanded capacity
        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If the minimum capacity is larger than 4M, expand the capacity as follows
        if (minNewCapacity > threshold) {
            //newCapacity = 15 / 4194304 * 4194304 
            int newCapacity = minNewCapacity / threshold * threshold;
            // If the calculated capacity is greater than the maximum capacity minus 4M, assign the maximum capacity to the new capacity
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // If the minimum capacity required is less than 4M, expand the capacity as follows
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
}
Copy the code

Take a look at the capacity method: The next step is to put the expanded capacity into ByteBuf, using the ArrayCopy method

 @Override
    public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int oldCapacity = array.length;
        byte[] oldArray = array;
        if (newCapacity > oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
            setArray(newArray);
            freeArray(oldArray);
        } else if (newCapacity < oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
            freeArray(oldArray);
        }
        return this;
    }
Copy the code

Here are the code steps for tracing:== The dynamic capacity expansion mechanism is summarized as follows: == 1: When the write* method is called, the ensureWritable0 method is checked. 2: The calculateNewCapacity method is used to calculate capacity

== Capacity expansion Calculation method: == 1: If the required capacity is not more than 4M, the capacity will be expanded from 64 bytes and doubled each time until the calculated capacity meets the required minimum capacity. If the current size is 256, 200 bytes have been written, and 60 bytes have been written again, the minimum capacity required is 260 bytes. In this case, the capacity after expansion is 64 x 2 x 2 x 2=512. 2: The required capacity exceeds 4M. Calculate the capacity expansion as follows: New capacity = minimum requirement for new capacity / 4M * 4M + 4M. If the current capacity is 3M, and then 3M has been written, and the required minimum capacity is 5 m, the expanded capacity is 5/4 * 4 + 4 = 8M

Figure 1: The required capacity is less than 4M:Figure 2: The required capacity is greater than 4M:

What ByteBuf implementations are

ByteBuf can be implemented in 8 ways from 3 dimensions:

ByteBuf class diagram

/ / in pile
ByteBuf buf = Unpooled.buffer(10);
/ / heap
ByteBuf buf = Unpooled.directBuffer(10);
Copy the code

ByteBuf provides Unpooled classes, which can be used directly. There are no pooled classes, so let’s trace the source code to see how ByteBuf allocates:

Unpooled. Buffer Allocation mode

First, go to the Unpooled class:

private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
// Allocate the buffer in the heap using the default allocator
public static ByteBuf buffer(int initialCapacity) {
  return ALLOC.heapBuffer(initialCapacity);
}
Copy the code

We enter the interface class ByteBufAllocator:

// Allocate a specified amount of buf in the heap
ByteBuf heapBuffer(int initialCapacity);
Copy the code

Then enter AbstractByteBufAllocator abstract class:

// If no initial capacity is specified, the default initial capacity is 256
static final int DEFAULT_INITIAL_CAPACITY = 256;
// Max capacity, which is the maximum value of int
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;

@Override
public ByteBuf heapBuffer(int initialCapacity) {
   return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

 @Override
 public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
     // If the initialized capacity is 0 and the maximum capacity is 0, return an empty Buf
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     validate(initialCapacity, maxCapacity);
     return newHeapBuffer(initialCapacity, maxCapacity);
 }

// Check parameters
private static void validate(int initialCapacity, int maxCapacity) {
   // Check the parameters
   checkPositiveOrZero(initialCapacity, "initialCapacity");
   // If the initialized capacity is greater than the maximum capacity, throw an exception
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity: %d (expected: not greater than maxCapacity(%d)", initialCapacity, maxCapacity)); }}Copy the code

Then there is the newHeapBuffer abstract method:

protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
Copy the code

Because the initialization here is unpooled, we enter the UnpooledByteBufAllocator class:

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
     / / PlatformDependent hasUnsafe () is to check whether the current operating system supports the unsafe operation
     // Enter different classes depending on whether they are supported or not
     return PlatformDependent.hasUnsafe() ?
             new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
             new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
 }
Copy the code

For the Unsafe operation, go to:

 InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
         super(alloc, initialCapacity, maxCapacity);
 }
Copy the code

Unsupported, enter the following:

InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
         super(alloc, initialCapacity, maxCapacity);
}
Copy the code

Now below to support the Unsafe operation go into UnpooledUnsafeHeapByteBuf class:

 UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
}
Copy the code

The parent class UnpooledHeapByteBuf is called again:

/ / distributor
private final ByteBufAllocator alloc;
// The byte array, ByteBuf, is used as the underlying storage for data
byte[] array;

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        // Check whether the allocator is empty
        checkNotNull(alloc, "alloc");
        // If the initialized capacity is greater than the maximum capacity, throw an exception
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        // Set the current array to be the allocated array
        setArray(allocateArray(initialCapacity));
        // Initialize readIndex and writeIndex
        setIndex(0.0);
 }

// Allocate the array
protected byte[] allocateArray(int initialCapacity) {
     // Return an array of bytes with the capacity of initialCapacity
     return new byte[initialCapacity];
}

/ / set the array
 private void setArray(byte[] initialArray) {
        array = initialArray;
        tmpNioBuf = null;
}
Copy the code

SetIndex method in AbstractByteBuf class:

// Initialize readerIndex and writerIndex
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
    if (checkBounds) {
        checkIndexBounds(readerIndex, writerIndex, capacity());
    }
    setIndex0(readerIndex, writerIndex);
    return this;
}

final void setIndex0(int readerIndex, int writerIndex) {
      this.readerIndex = readerIndex;
      this.writerIndex = writerIndex;
}
Copy the code

AbstractByteBuf is allocated to an unpooled ByteBuf in the heap. Here is the code to trace it:As you can see, allocate a non-pooled, heap-contained ByteBuf whose underlying value is an array of bytes

Unpooled. DirectBuffer allocation mode

The Unpooled class is also entered first:

public static ByteBuf directBuffer(int initialCapacity) {
     return ALLOC.directBuffer(initialCapacity);
}
Copy the code

Then go to the ByteBufAllocator abstract class:

ByteBuf directBuffer(int initialCapacity);
Copy the code

Then go to the AbstractByteBufAllocator class:

@Override
public ByteBuf directBuffer(int initialCapacity) {
     return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
 }
 
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
     // Return an empty Buf if both the initialized capacity and the maximum capacity are 0
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     // Check parameters
     validate(initialCapacity, maxCapacity);
     return newDirectBuffer(initialCapacity, maxCapacity);
 }
 
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
Copy the code

Since the allocation is also unpooled, newDirectBuffer goes to the implementation of the UnpooledByteBufAllocator class:

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
     final ByteBuf buf;
     // Also, it determines whether the unsafe operation is supported
     if (PlatformDependent.hasUnsafe()) {
         buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                 new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
     } else {
         buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
     }
     return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
 }
Copy the code

Actually InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf, for example, the latter two were similar, also enter UnpooledUnsafeNoCleanerDirectByteBuf constructor of a class:

 UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }
Copy the code

Once again call the superclass UnpooledUnsafeDirectByteBuf:

ByteBuffer buffer;

public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        // Check parameters
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        checkPositiveOrZero(maxCapacity, "maxCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
}

// Allocate a NIO ByteBuffer
protected ByteBuffer allocateDirect(int initialCapacity) {
      return ByteBuffer.allocateDirect(initialCapacity);
}

final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if(oldBuffer ! =null) {
                if (doNotFree) {
                    doNotFree = false;
                } else{ freeDirect(oldBuffer); }}}this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
}
Copy the code

AllocateDirect under ByteBuffer class:

 public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
 }
Copy the code

Code tracking diagram:== Summary: == Allocates unpooled, out-of-heap ByteBuf, as shown in the underlying NIO DirectByteBuffer implementation

ByteBufAllocator class diagram

ByteBuf memory overcommitment

Allocate pooled memory

In the above source code we know how to allocate non-pooled memory, so how to allocate pooled memory? Look at the diagram below:These are the steps for allocating pooled memory, which will be analyzed according to the source code

Memory cache pool

Jemalloc Memory allocation mechanism 1: There are three regions in the memory pool: tiny, Small, and Normal 2: Each region is divided into different size cells, each cell can cache only 3 memory blocks of the corresponding size: Tiny: 512, small: 256, normal: 512 64. For example, the tiny area has 512 cells per size. If it is full, it will not be reclaimed and the memory will be freed

Reclaim pooled memory

The process of allocating pooled memory

Allocate unpooled memory. Allocate pooled memory.

 ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
 // The maximum memory size allocated is 496
 ByteBuf buf1 = allocator.ioBuffer(495);
 System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
 // This will be reclaimed into tiny's 512b cell
 buf1.release();

 // Get it from Tiny's 512b cell
 ByteBuf buf2 = allocator.ioBuffer(495);
 System.out.printf("buf2: 0x%X%n", buf2.memoryAddress());
 buf2.release();
Copy the code

Let’s start with the ByteBufAllocator class:

// The default ByteBuf allocator, initialized in ByteBufUtil
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
Copy the code

To trace the first allocator. IoBuffer (495) code, first enter the AbstractByteBufAllocator class:

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    // If Unsafe is supported, out-of-heap memory is allocated
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    Unsafe is not supported, so heap memory is allocated
    return heapBuffer(initialCapacity);
}
Copy the code

The directBuffer method below the class is then called:

@Override
public ByteBuf directBuffer(int initialCapacity) {
     return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
 }

 @Override
 public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
     // Return an empty ByteBuf if the initialized capacity and maximum capacity are equal to 0
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     validate(initialCapacity, maxCapacity);
     return newDirectBuffer(initialCapacity, maxCapacity);
 }
 // Check parameters
 private static void validate(int initialCapacity, int maxCapacity) {
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity: %d (expected: not greater than maxCapacity(%d)", initialCapacity, maxCapacity)); }}Copy the code

This then goes to the PooledByteBufAllocator class PooledByteBufAllocator, which overuses memory:

// cache sizes Cache default value
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize".512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize".256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize".64);
Copy the code
 @Override
 protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
     // Get the cache object from the current thread
     PoolThreadCache cache = threadCache.get();
     // Get the Arena from the cache
     //Arena can be understood as a netty provided by the actual buF allocation and management tool
     PoolArena<ByteBuffer> directArena = cache.directArena;

     final ByteBuf buf;
     // Allocate pooled memory if directArena is available
     if(directArena ! =null) {
         buf = directArena.allocate(cache, initialCapacity, maxCapacity);
     } else { If directArena is not available, Unpooled Unpooled is used
         buf = PlatformDependent.hasUnsafe() ?
                 UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                 new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
     }

     return toLeakAwareBuffer(buf);
 }
Copy the code

After tracking again, enter the PoolArena class: you can see the following three types: Tiny, Small, and Normal

enum SizeClass {
     Tiny,
     Small,
    Normal
}
Copy the code
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
      // Get a ByteBuf object
      PooledByteBuf<T> buf = newByteBuf(maxCapacity);
      // Allocate memory
      allocate(cache, buf, reqCapacity);
      return buf;
}

@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
    / / if support the Unsafe, it initializes a PooledUnsafeDirectByteBuf
    if (HAS_UNSAFE) {
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
    } else { // Without Unsafe, initialize a PooledDirectByteBuf
        returnPooledDirectByteBuf.newInstance(maxCapacity); }}Copy the code

Below into the PooledUnsafeDirectByteBuf categories: recycling stack from the thread gets a buf, if not in the stack, can create a new, if you have, the buf will return stack

// When we call recycler.get () and there is nothing to reuse in the thread stack, we call newObject, and the created BUF is empty
 private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
	  @Override
	   protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
	       return new PooledUnsafeDirectByteBuf(handle, 0); }};static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
       // We have RECYCLER
	   PooledUnsafeDirectByteBuf buf = RECYCLER.get();
	   // It may be the previous buf, clean it up before using it
	   buf.reuse(maxCapacity);
	   return buf;
}
Copy the code

Then go back to the allocate method in the PoolArena class again and allocate memory:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
         // Calculate the required memory size as 2 ^ n
        final int normCapacity = normalizeCapacity(reqCapacity);
        // Whether the memory type to be allocated is tiny or Small
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512 // allocate a tiny memory
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

          
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if(s ! = head) {assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return; }}synchronized (this) {
                // Allocate a new chunk of memory
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; }}else {
            // Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity); }}Copy the code

AllocateTiny method in PoolThreadCache:

boolean allocateTiny(PoolArena<? > area, PooledByteBuf<? > buf,int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// Get buf from cache
 privateMemoryRegionCache<? > cacheForTiny(PoolArena<? > area,int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }
Copy the code

To get the desired size, go to the tinyIdx method under the PoolArena class:

static int tinyIdx(int normCapacity) {
        return normCapacity >>> 4;
}
Copy the code

The allocate method of the PoolThreadCache class allocates the cache cell to buF

private boolean allocate(MemoryRegionCache<? > cache, PooledByteBuf buf,int reqCapacity) {
      if (cache == null) {
          // no cache found so just return false here
          return false;
      }
      boolean allocated = cache.allocate(buf, reqCapacity);
      if (++ allocations >= freeSweepAllocationThreshold) {
          allocations = 0;
          trim();
      }
      return allocated;
  }
Copy the code

Here is a step diagram for tracking the code:The above source code is tiny type as an example, the other two types are similar, when the first allocation created a new memory, and then successfully recycled to the memory buffer pool, re-allocation of the corresponding size of memory, will be directly from the memory buffer pool, will not be re-allocated a new memory

The process of memory reclamation

Next, trace the release() method to see how the memory is reclaimed

buf1.release();
Copy the code

For the first time into the AbstractReferenceCountedByteBuf classes: The reference counter of Buf, used for memory overcommitment, has a counter refCnt, an retain() counter is added by one, and a release() counter is subtracted by one. Deallocate () is not called until the counter is zero. The deallocate() method is implemented by the specific Buf itself.

 @Override
 public boolean release(a) {
     return release0(1);
 }
Copy the code
 private boolean release0(int decrement) {
        int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
        // Determine if buf is currently referenced, and if not, call deallocate
        if (decrement == realCnt) {
            if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                deallocate();
                return true;
            }
            return retryRelease0(decrement);
        }
        return releaseNonFinal0(decrement, rawCnt, realCnt);
}
Copy the code

Enter the PooledByteBuf class:

@Override
protected final void deallocate(a) {
    if (handle >= 0) {
        final long handle = this.handle;
        // Indicates that the current BUF does not use any memory area
        this.handle = -1;
        // Set memory to NULL
        memory = null;
        // Free the memory of buf
        chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
        tmpNioBuf = null;
        chunk = null;
        // Put the buf object on the object reclaim stackrecycle(); }}Copy the code

Enter the PoolArena class again:

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
        // Check whether it is unpooled
        if (chunk.unpooled) {
            int size = chunk.chunkSize();
            destroyChunk(chunk);
            activeBytesHuge.add(-size);
            deallocationsHuge.increment();
        } else {
            Tiny, small, normal
            SizeClass sizeClass = sizeClass(normCapacity);
            // Put it in the cache
            if(cache ! =null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                // cached so not free it.
                return; } freeChunk(chunk, handle, sizeClass, nioBuffer); }}// Calculate the type of the memory area
private SizeClass sizeClass(int normCapacity) {
        if(! isTinyOrSmall(normCapacity)) {return SizeClass.Normal;
        }
        return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
Copy the code

Then go to the PoolThreadCache class:

boolean add(PoolArena<? > area, PoolChunk chunk, ByteBuffer nioBuffer,long handle, int normCapacity, SizeClass sizeClass) { MemoryRegionCache<? > cache = cache(area, normCapacity, sizeClass);if (cache == null) {
         return false;
     }
     // Add to the cache queue
     return cache.add(chunk, nioBuffer, handle);
}

 privateMemoryRegionCache<? > cache(PoolArena<? > area,int normCapacity, SizeClass sizeClass) {
        // Determine which type of memory it is, and then recycle the memory to which part
        switch (sizeClass) {
        case Normal:
            return cacheForNormal(area, normCapacity);
        case Small:
            return cacheForSmall(area, normCapacity);
        case Tiny:
            return cacheForTiny(area, normCapacity);
        default:
            throw newError(); }}privateMemoryRegionCache<? > cacheForTiny(PoolArena<? > area,int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }
Copy the code

The above tracing code step diagram:

ByteBuf zero copy mechanism

Netty zero copy mechanism, is an application layer implementation, and the underlying JVM, operating system memory mechanism does not have too much association

Several sample

== 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf == 1: CompositeByteBuf ==

public static void test1(a) {
       ByteBuf buf1 = Unpooled.buffer(4);
       ByteBuf buf2 = Unpooled.buffer(3);
       byte[] bytes1 = {1.2};
       byte[] bytes2 = {3.4.5};
       buf1.writeBytes(bytes1);
       buf2.writeBytes(bytes2);
       CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
       byteBuf = byteBuf.addComponents(true, buf1, buf2);
       System.out.println("byteBuf: " + byteBuf.toString());
}
Copy the code

In the output above, ridx is the read position for sequential reads, widx is the write position for sequential writes, cap is the capacity of the new ByteBuf, and components means that the new ByteBuf is composed of several ByteBuFsThe wrappedBuffer() method wraps the byte[] array into a ByteBuf object ==

public static void test2(a) {
      byte[] bytes = {1.2.3.4.5};
      ByteBuf buf = Unpooled.wrappedBuffer(bytes);
      System.out.println("buf:" + buf.toString());
}
Copy the code

In the output: ridx is the read position of the sequential read, widx is the write position of the sequential write, cap is the capacity of the ByteBuf, the new ByteBuf is the address of the array, the actual operation is still the original arrayThe slice() method slices a ByteBuf object into multiple ByteBuf objects ==

public static void test3(a) {
     ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
     ByteBuf byteBuf = buf.slice(1.2);
     System.out.println("byteBuf:" + byteBuf.toString());
}
Copy the code

In the output, you can see that there are two ByteBuFs, one of which is the original. The new ByteBuf contains the reference address of the original ByteBuf, and the other is the reference address of the split ByteBuf