sequence

This paper mainly studies Netty’s Resource Detector

Abnormal LEAK

The 2019-04-02 15:23:17. 026 ERROR 1 - [reactor - HTTP - epoll - 2] io.net ty. Util. ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
	io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801)
	io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:601)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:227)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#3:
	io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
	io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801)
	io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:581)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:227)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#4:
	io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
	io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801)
	io.netty.handler.codec.http.HttpObjectDecoder$LineParser.parse(HttpObjectDecoder.java:850)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:208)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#5:
	io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedByte(AdvancedLeakAwareByteBuf.java:160)
	io.netty.handler.codec.http.HttpObjectDecoder.skipControlCharacters(HttpObjectDecoder.java:566)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:202)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#6:
	Hint: 'reactor.left.httpCodec' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
#7:
	Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutorA $5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:339)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
	io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
	io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
	io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:77)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:784)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutorA $5.run(SingleThreadEventExecutor.java:905)
	java.base/java.lang.Thread.run(Thread.java:835)
: 9 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.
Copy the code

ResourceLeakDetector

Netty – common – 4.1.33. Final – sources. The jar! /io/netty/util/ResourceLeakDetector.java

public class ResourceLeakDetector<T> {

    private static final String PROP_LEVEL_OLD = "io.netty.leakDetectionLevel";
    private static final String PROP_LEVEL = "io.netty.leakDetection.level";
    private static final Level DEFAULT_LEVEL = Level.SIMPLE;

    private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords";
    private static final int DEFAULT_TARGET_RECORDS = 4;

    private static final String PROP_SAMPLING_INTERVAL = "io.netty.leakDetection.samplingInterval";
    // There is a minor performance benefit in TLR ifthis is a power of 2. private static final int DEFAULT_SAMPLING_INTERVAL = 128; private static final int TARGET_RECORDS; static final int SAMPLING_INTERVAL; /** * Represents the level of resource leak detection. */ public enum Level { /** * Disables resource leak detection. */  DISABLED, /** * Enables simplistic sampling resource leak detectionwhich reports there is a leak or not,
         * at the cost of small overhead (default).
         */
        SIMPLE,
        /**
         * Enables advanced sampling resource leak detection which reports where the leaked object was accessed
         * recently at the cost of high overhead.
         */
        ADVANCED,
        /**
         * Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
         * at the cost of the highest possible overhead (for testing purposes only).
         */
        PARANOID;

        /**
         * Returns level based on string value. Accepts also string that represents ordinal number of enum.
         *
         * @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case.
         * @return corresponding level or SIMPLE level in case of no match.
         */
        static Level parseLevel(String levelStr) {
            String trimmedLevelStr = levelStr.trim();
            for (Level l : values()) {
                if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) {
                    returnl; }}return DEFAULT_LEVEL;
        }
    }

    private static Level level;

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ResourceLeakDetector.class);

    static {
        final boolean disabled;
        if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) {
            disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection".false);
            logger.debug("-Dio.netty.noResourceLeakDetection: {}", disabled);
            logger.warn(
                    "-Dio.netty.noResourceLeakDetection is deprecated. Use '-D{}={}' instead.",
                    PROP_LEVEL, DEFAULT_LEVEL.name().toLowerCase());
        } else {
            disabled = false;
        }

        Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL;

        // First read old property name
        String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name());

        // If new property name is present, use it
        levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
        Level level = Level.parseLevel(levelStr);

        TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
        SAMPLING_INTERVAL = SystemPropertyUtil.getInt(PROP_SAMPLING_INTERVAL, DEFAULT_SAMPLING_INTERVAL);

        ResourceLeakDetector.level = level;
        if (logger.isDebugEnabled()) {
            logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase());
            logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS);
        }
    }

    /**
     * @deprecated Use {@link #setLevel(Level)} instead.
     */
    @Deprecated
    public static void setEnabled(boolean enabled) {
        setLevel(enabled? Level.SIMPLE : Level.DISABLED);
    }

    /**
     * Returns {@code true} if resource leak detection is enabled.
     */
    public static boolean isEnabled() {
        return getLevel().ordinal() > Level.DISABLED.ordinal();
    }

    /**
     * Sets the resource leak detection level.
     */
    public static void setLevel(Level level) {
        if (level == null) {
            throw new NullPointerException("level");
        }
        ResourceLeakDetector.level = level;
    }

    /**
     * Returns the current resource leak detection level.
     */
    public static Level getLevel() {
        return level;
    }

    /** the collection of active resources */
    private final Set<DefaultResourceLeak<?>> allLeaks =
            Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());

    private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
    private final ConcurrentMap<String, Boolean> reportedLeaks = PlatformDependent.newConcurrentHashMap();

    private final String resourceType;
    private final int samplingInterval;

    //......

    /**
     * Creates a new {@link ResourceLeakTracker} which is expected to be closed via
     * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated.
     *
     * @return the {@link ResourceLeakTracker} or {@code null}
     */
    @SuppressWarnings("unchecked")
    public final ResourceLeakTracker<T> track(T obj) {
        return track0(obj);
    }

    private DefaultResourceLeak track0(T obj) {
        Level level = ResourceLeakDetector.level;
        if (level == Level.DISABLED) {
            return null;
        }

        if (level.ordinal() < Level.PARANOID.ordinal()) {
            if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
                reportLeak();
                return new DefaultResourceLeak(obj, refQueue, allLeaks);
            }
            return null;
        }
        reportLeak();
        return new DefaultResourceLeak(obj, refQueue, allLeaks);
    }

    private void reportLeak() {
        if(! logger.isErrorEnabled()) { clearRefQueue();return;
        }

        // Detect and report previous leaks.
        for (;;) {
            @SuppressWarnings("unchecked")
            DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
            if (ref == null) {
                break;
            }

            if(! ref.dispose()) {continue;
            }

            String records = ref.toString();
            if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
                if (records.isEmpty()) {
                    reportUntracedLeak(resourceType);
                } else {
                    reportTracedLeak(resourceType, records);
                }
            }
        }
    }

    /**
     * This method is called when a traced leak is detected. It can be overridden for tracking how many times leaks
     * have been detected.
     */
    protected void reportTracedLeak(String resourceType, String records) {
        logger.error(
                "LEAK: {}.release() was not called before it's garbage-collected. " +
                "See http://netty.io/wiki/reference-counted-objects.html for more information.{}",
                resourceType, records);
    }

    /**
     * This method is called when an untraced leak is detected. It can be overridden for tracking how many times leaks
     * have been detected.
     */
    protected void reportUntracedLeak(String resourceType) {
        logger.error("LEAK: {}.release() was not called before it's garbage-collected. " +
                "Enable advanced leak reporting to find out where the leak occurred. " +
                "To enable advanced leak reporting, " +
                "specify the JVM option '-D{}={}' or call {}.setLevel() " +
                "See http://netty.io/wiki/reference-counted-objects.html for more information.", resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this)); } / /... }Copy the code
  • The ResourceLeakDetector uses the Level enumeration to define four different leak detection levels: DISABLED, SIMPLE, ADVANCED, and PARANOID. The default level is SIMPLE. You can use – Dio.net ty. LeakDetection. Level = advanced Settings
  • ResourceLeakDetector static block of code will read io.net. Ty noResourceLeakDetection system property, if the show set to false, the defaultLevel to DISABLED; If this parameter is not set, disabled is false and defaultLevel is SIMPLE by default. ResourceLeakDetector TARGET_RECORDS (io.netty.leakDetection.targetRecords) and SAMPLING_INTERVAL (io.netty.leakDetection.samplingInterval), where targetRecords defaults to 4 and samplingInterval defaults to 128
  • ResourceLeakDetector provides the track method to create ResourceLeakTracker; The track method internally calls the Track0 method. If the level is PARANOID, call reportLeak immediately and create DefaultResourceLeak, otherwise use random numbers to determine (PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) call reportLeak and create DefaultResourceLeak; The reportLeak method has a for loop that continuously fetches DefaultResourceLeak from the refQueue and then calls reportUntracedLeak or reportTracedLeak for error

DefaultResourceLeak

Netty – common – 4.1.33. Final – sources. The jar! /io/netty/util/ResourceLeakDetector.java

    private static final class DefaultResourceLeak<T>
            extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {

        @SuppressWarnings("unchecked") // generics and updaters donot mix. private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<? >, Record> headUpdater = (AtomicReferenceFieldUpdater) AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class,"head");

        @SuppressWarnings("unchecked") // generics and updaters donot mix. private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<? >> droppedRecordsUpdater = (AtomicIntegerFieldUpdater) AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class,"droppedRecords");

        @SuppressWarnings("unused")
        private volatile Record head;
        @SuppressWarnings("unused") private volatile int droppedRecords; private final Set<DefaultResourceLeak<? >> allLeaks; private final int trackedHash; DefaultResourceLeak( Object referent, ReferenceQueue<Object> refQueue, Set<DefaultResourceLeak<? >> allLeaks) { super(referent, refQueue); assert referent ! = null; // Store thehash of the tracked object to later assert it in the close(...) method.
            // It's important that we not store a reference to the referent as this would disallow it from // be collected via the WeakReference. trackedHash = System.identityHashCode(referent); allLeaks.add(this); // Create a new Record so we always have the creation stacktrace included. headUpdater.set(this, new Record(Record.BOTTOM)); this.allLeaks = allLeaks; } @Override public void record() { record0(null); } @Override public void record(Object hint) { record0(hint); } /** * This method works by exponentially backing off as more records are present in the stack. Each record has a * 1 /  2^n chance of dropping the top most record and replacing it with itself. This has a number of convenient * properties: * * 
      
    *
  1. The current record is always recorded. This is due to the compare and swap dropping the top most * record, rather than the to-be-pushed record. *
  2. The very last access will always be recorded. This comes as a property of 1. *
  3. It is possible to retain more records than the target, based upon the probability distribution. *
  4. It is easy to keep a precise record of the number of elements in the stack, since each element has to * know how tall the stack is. *
* * In this particular implementation, there are also some advantages. A thread local random is used to decide * if something should be recorded. This means that if there is a deterministic access pattern, it is now * possible to see what other accesses occur, rather than always dropping them. Second, after * {@link #TARGET_RECORDS} accesses, backoff occurs. This matches typical access patterns, * where there are either a high number of accesses (i.e. a cached buffer), or low (an ephemeral buffer), but * not many in between. * * The use of atomics avoids serializing a high number of accesses, when most of the records will be thrown * away. High contention only happens when there are very few existing records, which is only likely when the * object isn'
t shared! If this is a problem, the loop can be aborted and the record dropped, because another * thread won the race. */ private void record0(Object hint) { // Check TARGET_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords if (TARGET_RECORDS > 0) { Record oldHead; Record prevHead; Record newHead; boolean dropped; do { if ((prevHead = oldHead = headUpdater.get(this)) == null) { // already closed. return; } final int numElements = oldHead.pos + 1; if (numElements >= TARGET_RECORDS) { final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30); if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << backOffFactor) != 0) { prevHead = oldHead.next; } } else { dropped = false; } newHead = hint ! = null ? new Record(prevHead, hint) : new Record(prevHead); }while(! headUpdater.compareAndSet(this, oldHead, newHead));if (dropped) { droppedRecordsUpdater.incrementAndGet(this); } } } boolean dispose() { clear(); return allLeaks.remove(this); } @Override public boolean close() { if (allLeaks.remove(this)) { // Call clear so the reference is not even enqueued. clear(); headUpdater.set(this, null); return true; } return false; } @Override public boolean close(T trackedObject) { // Ensure that the object that was tracked is the same as the one that was passed to close(...) . assert trackedHash == System.identityHashCode(trackedObject); try {return close(); } finally { // This method will do `synchronized(trackedObject)` and we should be sure this will not cause deadlock. // It should not, because somewhere up the callstack should be a (successful) `trackedObject.release`, // therefore it is unreasonable that anyone else, anywhere, is holding a lock on the trackedObject. // (Unreasonable but possible, unfortunately.) reachabilityFence0(trackedObject); } } /** * Ensures that the object referenced by the given reference remains * <a href="package-summary.html#reachability"><em>strongly reachable</em></a>, * regardless of any prior actions of the program that might otherwise cause * the object to become unreachable; thus, the referenced object is not * reclaimable by garbage collection at least until after the invocation of * this method. * * <p> Recent versions of the JDK have a nasty habit of prematurely deciding objects are unreachable. * see: https://stackoverflow.com/questions/26642153/finalize-called-on-strongly-reachable-object-in-java-8 * The Java 9 method Reference.reachabilityFence offers a solution to this problem. * * <p> This method is always implemented as a synchronization on {@code ref}, not as * {@code Reference.reachabilityFence}for consistency across platforms and to allow building on JDK 6-8. * <b>It is the caller's responsibility to ensure that this synchronization will not cause deadlock. * * @param ref the reference. If {@code null}, this method has no effect. * @see java.lang.ref.Reference#reachabilityFence */ private static void reachabilityFence0(Object ref) { if (ref ! = null) { // Empty synchronized is ok: https://stackoverflow.com/a/31933260/1151521 synchronized (ref) { } } } @Override public String toString() { Record oldHead = headUpdater.getAndSet(this, null); if (oldHead == null) { // Already closed return EMPTY_STRING; } final int dropped = droppedRecordsUpdater.get(this); int duped = 0; int present = oldHead.pos + 1; // Guess about 2 kilobytes per stack trace StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE); buf.append("Recent access records: ").append(NEWLINE); int i = 1; Set seen = new HashSet (present); for (; oldHead ! = Record.BOTTOM; oldHead = oldHead.next) { String s = oldHead.toString(); if (seen.add(s)) { if (oldHead.next == Record.BOTTOM) { buf.append("Created at:").append(NEWLINE).append(s); } else { buf.append(' #').append(i++).append(':').append(NEWLINE).append(s);}}else{ duped++; }}if (duped > 0) { buf.append(":") .append(duped) .append(" leak records were discarded because they were duplicates") .append(NEWLINE); } if (dropped > 0) { buf.append(":") .append(dropped) .append(" leak records were discarded because the leak record count is targeted to ") .append(TARGET_RECORDS) .append(". Use system property ") .append(PROP_TARGET_RECORDS) .append(" to increase the limit.") .append(NEWLINE); } buf.setLength(buf.length() - NEWLINE.length()); returnbuf.toString(); }}Copy the code
  • DefaultResourceLeak is a private static class defined by ResourceLeakDetector, which inherits WeakReference class and implements ResourceLeakTracker(Record and close methods are defined) interface; The record method internally calls the Record0 method, which updates newHead to the new Record; The close method will remove allLeaks, which is passed in when defaultResourceleakDetector creates DefaultResourceLeak. DefaultResourceLeak adds itself to allLeaks

SimpleLeakAwareByteBuf

Netty – netty – 4.1.33. Final/buffer/SRC/main/Java/IO/netty/buffer/SimpleLeakAwareByteBuf. Java

class SimpleLeakAwareByteBuf extends WrappedByteBuf {

    /**
     * This object's is associated with the {@link ResourceLeakTracker}. When {@link ResourceLeakTracker#close(Object)} * is called this object will be used as the argument. It is also assumed that this object is used when * {@link ResourceLeakDetector#track(Object)} is called to create {@link #leak}. */ private final ByteBuf trackedByteBuf; final ResourceLeakTracker
      
        leak; SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker
       
         leak) { super(wrapped); this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf"); this.leak = ObjectUtil.checkNotNull(leak, "leak"); } SimpleLeakAwareByteBuf(ByteBuf wrapped, ResourceLeakTracker
        
          leak) { this(wrapped, wrapped, leak); } / /... @Override public boolean release() { if (super.release()) { closeLeak(); return true; } return false; } @Override public boolean release(int decrement) { if (super.release(decrement)) { closeLeak(); return true; } return false; } private void closeLeak() { // Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when // calling DefaultResourceLeak.track(...) . boolean closed = leak.close(trackedByteBuf); assert closed; } private ByteBuf unwrappedDerived(ByteBuf derived) { // We only need to unwrap SwappedByteBuf implementations as these will be the only ones that may end up in // the AbstractLeakAwareByteBuf implementations beside slices / duplicates and "real" buffers. ByteBuf unwrappedDerived = unwrapSwapped(derived); if (unwrappedDerived instanceof AbstractPooledDerivedByteBuf) { // Update the parent to point to this buffer so we correctly close the ResourceLeakTracker. ((AbstractPooledDerivedByteBuf) unwrappedDerived).parent(this); ResourceLeakTracker
         
           newLeak = AbstractByteBuf.leakDetector.track(derived); if (newLeak == null) { // No leak detection, just return the derived buffer. return derived; } return newLeakAwareByteBuf(derived, newLeak); } return newSharedLeakAwareByteBuf(derived); } / /... }
         
        
       
      Copy the code
  • SimpleLeakAwareByteBuf inherits WrappedByteBuf and its constructor requires that the ResourceLeakTracker be passed in
  • SimpleLeakAwareByteBuf overwrites WrappedByteBuf’s retainedSlice, retainedDuplicate, and readRetainedSlice methods, all of which internally call the unwrappedDerived method. UnwrappedDerived method in unwrappedDerived object are called when AbstractPooledDerivedByteBuf types AbstractByteBuf. LeakDetector. Track to track
  • SimpleLeakAwareByteBuf also overwrites WrappedByteBuf’s release method and calls closeLeak if the parent class release is successfully called. Use leak.close(trackedByteBuf) to release trackedByteBuf

AdvancedLeakAwareByteBuf

Netty – netty – 4.1.33. Final/buffer/SRC/main/Java/IO/netty/buffer/AdvancedLeakAwareByteBuf. Java

final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf {

    private static final String PROP_ACQUIRE_AND_RELEASE_ONLY = "io.netty.leakDetection.acquireAndReleaseOnly";
    private static final boolean ACQUIRE_AND_RELEASE_ONLY;

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AdvancedLeakAwareByteBuf.class);

    static {
        ACQUIRE_AND_RELEASE_ONLY = SystemPropertyUtil.getBoolean(PROP_ACQUIRE_AND_RELEASE_ONLY, false);

        if (logger.isDebugEnabled()) {
            logger.debug("-D{}: {}", PROP_ACQUIRE_AND_RELEASE_ONLY, ACQUIRE_AND_RELEASE_ONLY);
        }

        ResourceLeakDetector.addExclusions(
                AdvancedLeakAwareByteBuf.class, "touch"."recordLeakNonRefCountingOperation");
    }

    AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) {
        super(buf, leak);
    }

    AdvancedLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped, trackedByteBuf, leak);
    }

    static void recordLeakNonRefCountingOperation(ResourceLeakTracker<ByteBuf> leak) {
        if(! ACQUIRE_AND_RELEASE_ONLY) { leak.record(); }} / /... @Override public ByteBuf order(ByteOrder endianness) { recordLeakNonRefCountingOperation(leak);return super.order(endianness);
    }

    @Override
    public ByteBuf slice() {
        recordLeakNonRefCountingOperation(leak);
        return super.slice();
    }

    @Override
    public ByteBuf slice(int index, int length) {
        recordLeakNonRefCountingOperation(leak);
        returnsuper.slice(index, length); } / /... @Override public ByteBufretain() {
        leak.record();
        return super.retain();
    }

    @Override
    public ByteBuf retain(int increment) {
        leak.record();
        return super.retain(increment);
    }

    @Override
    public boolean release() {
        leak.record();
        return super.release();
    }

    @Override
    public boolean release(int decrement) {
        leak.record();
        return super.release(decrement);
    }

    @Override
    public ByteBuf touch() {
        leak.record();
        return this;
    }

    @Override
    public ByteBuf touch(Object hint) {
        leak.record(hint);
        returnthis; } / /... }Copy the code
  • AdvancedLeakAwareByteBuf inherited SimpleLeakAwareByteBuf, it to cover method, these methods covered or internal leak through recordLeakNonRefCountingOperation calls. The record, Or call leak.record directly

summary

  • The ResourceLeakDetector uses the Level enumeration to define four different leak detection levels: DISABLED, SIMPLE, ADVANCED, and PARANOID. The default level is SIMPLE. You can use – Dio.net ty. LeakDetection. Level = advanced to set; ResourceLeakDetector TARGET_RECORDS (io.netty.leakDetection.targetRecords) and SAMPLING_INTERVAL (io.netty.leakDetection.samplingInterval), where targetRecords defaults to 4 and samplingInterval defaults to 128
  • ResourceLeakDetector provides the track method to create ResourceLeakTracker; The track method internally calls the Track0 method. If the level is PARANOID, call reportLeak immediately and create DefaultResourceLeak, otherwise use random numbers to determine (PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) call reportLeak and create DefaultResourceLeak; The reportLeak method has a for loop that continuously fetches DefaultResourceLeak from the refQueue and then calls reportUntracedLeak or reportTracedLeak for error
  • DefaultResourceLeak is a private static class defined by ResourceLeakDetector, which inherits WeakReference class and implements ResourceLeakTracker(Record and close methods are defined) interface; The record method internally calls the Record0 method, which updates newHead to the new Record; The close method will remove allLeaks, which is passed in when defaultResourceleakDetector creates DefaultResourceLeak. DefaultResourceLeak adds itself to allLeaks
  • SimpleLeakAwareByteBuf inherits WrappedByteBuf and its constructor requires that the ResourceLeakTracker be passed; SimpleLeakAwareByteBuf overwrites WrappedByteBuf’s retainedSlice, retainedDuplicate, and readRetainedSlice methods, all of which internally call the unwrappedDerived method. UnwrappedDerived method in unwrappedDerived object are called when AbstractPooledDerivedByteBuf types AbstractByteBuf. LeakDetector. Track track; SimpleLeakAwareByteBuf also overwrites WrappedByteBuf’s release method and calls closeLeak if the parent class release is successfully called. Use leak.close(trackedByteBuf) to release trackedByteBuf
  • AdvancedLeakAwareByteBuf inherited SimpleLeakAwareByteBuf, it to cover method, these methods covered or internal leak through recordLeakNonRefCountingOperation calls. The record, Either call leak.record directly; In addition, there are SimpleLeakAwareCompositeByteBuf and AdvancedLeakAwareCompositeByteBuf, Their support for Leak Detect is similar to SimpleLeakAwareByteBuf and AdvancedLeakAwareByteBuf

doc

  • Netty’s resource leak detection mechanism
  • A Netty ByteBuf Memory Leak Story and the Lessons Learned
  • In 4.0.23. Final, Seeing io.net ty. Util. ResourceLeakDetector – LEAK: ByteBuf.release() was not called before it’s garbage-collected #2774