In order to reduce the pressure of GC and avoid the frequent application and release of memory to OS, Netty implemented a set of memory management scheme based on the idea of JeMalloc. Both heap memory and direct memory can be managed by Netty. This brings two benefits. First, it can reduce the pressure on GC, and second, it can avoid the frequent request and release of memory to the OS. At the same time, the downside is that developers have to release resources immediately after they are used, otherwise memory leaks will occur.

In summary, managing your own memory leads to better performance, but also increases the likelihood of memory leaks. To minimize memory leaks, Netty provides the ResourceLeakDetector ResourceLeakDetector, which detects allocated resources and reports leaks so that developers can detect and rectify them in a timely manner.

How did Netty do it? In the case of ByteBuf, in order to detect resource leakage, Netty creates a WeakReference to the Object and passes in a refQueue. If ByteBuf is collected by GC without a release call, Then JVM will add WeakReference to the refQueue, through which Netty can determine whether there is a resource leak. Once it detects a leak, It will call reportLeak() to report the leak.

The author spent a flow chart to describe the general workflow of resource Detector.

1. ResourceLeakDetector

To avoid frequent creation or destruction of Recycler, Netty can recycle Recycler for reuse. We can use Recycler to recycle Recycler. At the same time, Netty uses JeMalloc technology to manage memory in order to avoid frequent memory allocation and release.

ByteBuf is not memory. ByteBuf is a Java object that needs memory to work. ByteBuf itself can be recycled through Recycler, and memory can be managed through JeMalloc.

When a Channel has data to read, Netty by default creates a ByteBuf using PooledByteBufAllocator, writes the data to the ByteBuf, and propagates the ChannelRead event through Pipeline. Here is a simple usage example:

/ / Args:-Dio.net ty VM. LeakDetection. Level = the PARANOID 100% sampling inspection
public class LeakDemo {
	public static void main(String[] args) throws InterruptedException {
		ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1024);
		buf = null;
		System.gc();
		Thread.sleep(1000);
		// Apply again, the leak will be detected and reported
		PooledByteBufAllocator.DEFAULT.buffer(1024);
	}

	@Override
	protected void finalize(a) throws Throwable {
		System.out.println("finalize..."); }}Copy the code

Run the program, and the console reports the resource leak with the following output:

16:02:31.409 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	io.netty.example.a.LeakDemo.main(LeakDemo.java:21)
Copy the code

1.1 Test level

Netty provides four detection levels. Each level has different sampling rates and costs. You can select an appropriate level based on actual conditions.

Test level instructions
DISABLED Disable the detection
SIMPLE Simple detection, small sampling, no reporting of leak location
ADVANCED Advanced detection, with small samples, reports the location of the leak
PARANOID Paranoid detection, 100% sampling, will report the location of the leak

By setting the JVM parameter – Dio.net ty. LeakDetection. Adjust detection level by level = the PARANOID.

1.2 Source Code Analysis

Take a look at the source code and see how Netty detects resource leaks and reports them to users. Before looking at Resource Detecto, let’s look at a few of the more important classes.

1.2.1 DefaultResourceLeak

DefaultResourceLeak is the DefaultResourceLeak tracker that inherits fromWeakReferenceIt establishes a weak-reference connection for the trace object, which the JVM will use when the trace object is collected by GCWeakReferenceJoin the refQueue to determine if there is a resource leak.Let’s look at its properties first:

/* The first node to track records, one-way linked list. When an object is accessed, the accessed stack information */ is logged
@SuppressWarnings("unused")
private volatile TraceRecord head;
@SuppressWarnings("unused")
private volatile int droppedRecords;

// Active resource collection
private finalSet<DefaultResourceLeak<? >> allLeaks;// Trace object consistency hash code to ensure that the closed object and trace object consistency
private final int trackedHash;
Copy the code

Now look at the constructor:

/ * * *@paramReferent refers to itself, ByteBuf *@paramRefQueue Weak reference queue *@paramAllLeaks is an active collection of resources */DefaultResourceLeak( Object referent, ReferenceQueue<Object> refQueue, Set<DefaultResourceLeak<? >> allLeaks) {super(referent, refQueue);

    assertreferent ! =null;

    // Calculate the consistency hash of the traced object. When close, the traced object and the closed object are the same
    trackedHash = System.identityHashCode(referent);
    // Add the current DefaultResourceLeak to the active resource collection
    allLeaks.add(this);
    Tracerecord. BOTTOM represents the end of the chain
    headUpdater.set(this.new TraceRecord(TraceRecord.BOTTOM));
    this.allLeaks = allLeaks;
}
Copy the code

The creation of DefaultResourceLeak is relatively simple, but the important thing is the creation of TraceRecord, which is the feature class that records the trace stack.

1.2.2 TraceRecord

TraceRecord keeps track of the stack trace for tracing object access. It inherits from Throwable so that it can get an array of elements for the stack trace by calling throwable.getStackTrace ().

The TraceRecord class itself is not complicated; what is important is its toString() method, which builds up the access stack information for tracing objects.

The attributes are as follows:

// Additional prompts
private final String hintString;
// Next node
private final TraceRecord next;
private final int pos;
Copy the code

Constructor:

/ * * *@paramNext Next node *@paramHint Additional hints */
TraceRecord(TraceRecord next, Object hint) {
    // This needs to be generated even if toString() is never called as it may change later on.
    hintString = hint instanceof ResourceLeakHint ? ((ResourceLeakHint) hint).toHintString() : hint.toString();
    this.next = next;
    this.pos = next.pos + 1;
}
Copy the code

The toString() method, which builds a stack of traceable objects:

// Build trace stack information
@Override
public String toString(a) {
    StringBuilder buf = new StringBuilder(2048);
    if(hintString ! =null) {
        buf.append("\tHint: ").append(hintString).append(NEWLINE);
    }

    // Get stack information
    StackTraceElement[] array = getStackTrace();
    // Skip the first three elements. The first three stack information is related to ResourceLeakDetector
    out: for (int i = 3; i < array.length; i++) {
        StackTraceElement element = array[i];
        String[] exclusions = excludedMethods.get();
        for (int k = 0; k < exclusions.length; k += 2) {
            if (exclusions[k].equals(element.getClassName())
                && exclusions[k + 1].equals(element.getMethodName())) { // lgtm[java/index-out-of-bounds]
                continue out;
            }
        }
        buf.append('\t');
        buf.append(element.toString());
        buf.append(NEWLINE);
    }
    return buf.toString();
}
Copy the code

1.2.3 DefaultResourceLeak

Back to DefaultResourceLeak to PooledByteBufAllocator. NewDirectBuffer () to apply for pooling of direct memory as an example, it will not return immediately after creating the ByteBuf, it needs to be perceived when ByteBuf leakage, So you need to make a wrapper around ByteBuf.

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // Apply a pooled, direct memory-based ByteBuf, leaving the details behind
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    final ByteBuf buf;
    if(directArena ! =null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
            UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    // Try to sense a Buf resource leak
    return toLeakAwareBuffer(buf);
}
Copy the code

ToLeakAwareBuffer () determines whether it is a simple test or an advanced test and returns a different wrapper class, which will be said later.

// Try to sense a Buf resource leak
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    // Get the detection level
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:// A simple check returns the SimpleLeakAwareByteBuf wrapper class
            leak = AbstractByteBuf.leakDetector.track(buf);
            if(leak ! =null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:// Advanced detection, returns AdvancedLeakAwareByteBuf wrapper class
            leak = AbstractByteBuf.leakDetector.track(buf);
            if(leak ! =null) {
                // Wrap ByteBuf as AdvancedLeakAwareByteBuf,
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:// Disable detection and return directly without wrapping
            break;
    }
    return buf;
}
Copy the code

AbstractByteBuf. LeakDetector. Track (buf) method is the core, it returns a leak trackers, buf when buf is normally released, wrapper classes will automatically close the tracker, whereas resource leaks, the tracker can be perceived, and issue a report.

public final ResourceLeakTracker<T> track(T obj) {
    return track0(obj);
}
Copy the code

It’s handed over to Track0 (), which does two main things: create a tracker and report leaks.

// Create a leak tracker for obJ
@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
    // Get the detection level
    Level level = ResourceLeakDetector.level;
    if (level == Level.DISABLED) {
        // Disable detection
        return null;
    }

    // Is less than the PARANOID level and needs to determine whether to sample
    if (level.ordinal() < Level.PARANOID.ordinal()) {
        if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            reportLeak();
            return new DefaultResourceLeak(obj, refQueue, allLeaks);
        }
        return null;
    }
    // Report a leak
    reportLeak();
    // Level PARANOID, 100% detection
    return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
Copy the code

Several attributes are used here, to illustrate:

/* when ByteBuf is detected, a WeakReference is created to point to it. If ByteBuf has no strong reference collected during GC, the JVM places WeakReference in the refQueue, which determines whether a memory leak has occurred. * /
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();

// A collection of leak objects that have been reported
private final Set<String> reportedLeaks =
    Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());


// Sampling interval, default 128
private final int samplingInterval;

// Active resource collection
private finalSet<DefaultResourceLeak<? >> allLeaks = Collections.newSetFromMap(newConcurrentHashMap<DefaultResourceLeak<? >, Boolean>());Copy the code

Having said the creation of DefaultResourceLeak, let’s look at how it reports leaks.

1.2.3.1 reportLeak ()

The function of reportLeak() is to report the leakage of resources. As mentioned above, when the tracing object is recycled by GC, JVM will add WeakReference to the refQueue, so it will iterate over the refQueue and call its toString() to obtain stack information after taking out the leaking object. ReportUntracedLeak () is easy, just output it to logger.

// Report the leak
private void reportLeak(a) {
    if(! needReport()) {// No reporting required, references removed from the refQueue are cleared
        clearRefQueue();
        return;
    }

    // Walk through the refQueue to report leaks
    for (;;) {
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            // There are no more leaking objects, exit the loop
            break;
        }

        // Clean yourself up first
        if(! ref.dispose()) {continue;
        }

        // toString() is the leaked information
        String records = ref.toString();
        // Add to the collection of reported objects
        if (reportedLeaks.add(records)) {
            // Call logger.error() to report a resource leak
            if (records.isEmpty()) {
                reportUntracedLeak(resourceType);
            } else{ reportTracedLeak(resourceType, records); }}}}Copy the code

DefaultResourceLeak. ToString () is used to build a stack information, let the user perceived resources where leakage happened. It is simple to iterate through TraceRecord and concatenate the stack information for method calls.

@Override
public String toString(a) {
    TraceRecord oldHead = headUpdater.getAndSet(this.null);
    if (oldHead == null) {
        // Already closed
        return EMPTY_STRING;
    }

    final int dropped = droppedRecordsUpdater.get(this);
    int duped = 0;

    int present = oldHead.pos + 1;
    StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE);
    buf.append("Recent access records: ").append(NEWLINE);

    int i = 1;
    Set<String> seen = new HashSet<String>(present);
    // Walk through TraceRecord to assemble stack information
    for(; oldHead ! = TraceRecord.BOTTOM; oldHead = oldHead.next) { String s = oldHead.toString();if (seen.add(s)) {
            if (oldHead.next == TraceRecord.BOTTOM) {
                buf.append("Created at:").append(NEWLINE).append(s);
            } else {
                buf.append(The '#').append(i++).append(':').append(NEWLINE).append(s); }}else{ duped++; }}if (duped > 0) {
        buf.append(":")
            .append(duped)
            .append(" leak records were discarded because they were duplicates")
            .append(NEWLINE);
    }

    if (dropped > 0) {
        buf.append(":")
            .append(dropped)
            .append(" leak records were discarded because the leak record count is targeted to ")
            .append(TARGET_RECORDS)
            .append(". Use system property ")
            .append(PROP_TARGET_RECORDS)
            .append(" to increase the limit.")
            .append(NEWLINE);
    }

    buf.setLength(buf.length() - NEWLINE.length());
    return buf.toString();
}
Copy the code

At this point, the process of creating a resource leak tracker and a leak report is complete.

2. Simple detection and advanced detection

After creating the ResourceLeakTracker, Netty also needs to wrap ByteBuf using the decorator pattern. The decorator class has two functions:

  1. After the trace object is released, the tracker is closed.
  2. After the trace object is accessed, the stack is recorded.

For the second function point, only advanced detection is required, so Netty provides two wrapper classes: SimpleLeakAwareByteBuf and AdvancedLeakAwareByteBuf, simple detection and advanced detection. The difference between the two is that simple detection does not record the stack information accessed by the tracing object, but simply reports a leak, which has the advantage of low overhead and the disadvantage of not being able to determine the location of the leak.Decorator classes rely on a native ByteBuf, delegating all operations to ByteBuf, which inserts extensions before and after methods that need to be enhanced.

I won’t post the entire code for space reasons, just the enhanced Release () method.

// Object release enhancements
@Override
public boolean release(a) {
    if (super.release()) {// The object was successfully released
        closeLeak();// Close the tracker
        return true;
    }
    return false;
}
Copy the code

DefaultResourceLeak closeLeak () call. The close () close tracking:

// Turn off the trace
@Override
public boolean close(a) {
    // Remove yourself from the active resource collection
    if (allLeaks.remove(this)) {
        // Clear weak references
        clear();
        / / empty TraceRecord
        headUpdater.set(this.null);
        return true;
    }
    return false;
}
Copy the code

For AdvancedLeakAwareByteBuf, it also needs to log the accessed stack, which is required for a large number of method calls, such as the Touch () method:

/ * * *@paramHint Trace additional information *@return* /
@Override
public ByteBuf touch(Object hint) {
    leak.record(hint);
    return this;
}
Copy the code

Leak. Record () will call DefaultResourceLeak. Record0 () method to record the stack information, create a TraceRecord join chain table, the tracking code is not posted.

3. Summary

Netty determines whether the object has memory leak according to WeakReference WeakReference, which is enhanced by creating a decorative class for tracking object. When the tracking object is released, the tracker will be automatically closed, otherwise it will be reported when leakage occurs.

If resource leak detection is enabled, Netty creates a leak tracker ResourceLeakTracker for the trace object. ResourceLeakTracker contains a one-way linked list consisting of a series of Tracerecords that represent the stack records accessed by the object. If a resource leak occurs, Netty builds and logs the location of the leak from the linked list.

Netty provides two detection mechanisms, one is simple and the other is advanced. For advanced detection, Netty also records the access stack information of the tracing object and can quickly locate the specific location of resource leakage during reporting. However, this mechanism incurs large overhead and is not recommended for online detection.