sequence

This article mainly analyzes a kafka consumer offset lag that keeps increasing.

Check consumer spending

Group       Topic        Pid    Offset     logSize         Lag             Owner
demo-group demo-topic     0    9678273         9858394         180121          xxx-service-dpqpc-1510557406684-e2171bd6-0
demo-group demo-topic     1    9689443         9873522         184079          xxx-service-dpqpc-1510557406684-e2171bd6-1
demo-group demo-topic     2    9676875         9855874         178999          xxx-service-q7vch-1510557399475-b1d7d22c-0
demo-group demo-topic     3    9683393         9864518         181125          xxx-service-q7vch-1510557399475-b1d7d22c-1
Copy the code

It was found that the gap between the consumer’s offset and logSize was too large, and the lag value was over 10W.

Normal condition

Group           Topic         Pid   Offset          logSize         Lag             Owner
demo-group      demo-topic    0     9860587         9860587         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    1     9875814         9875814         0               demo-group_tomcat2-1512984437115-fc1ee57b-0
demo-group      demo-topic    2     9858213         9858214         1               demo-group_tomcat2-1512984437115-fc1ee57b-1
demo-group      demo-topic    3     9866744         9866744         0               demo-group_tomcat2-1512984437115-fc1ee57b-2
Copy the code

A relatively small lag gap like this is normal.

View the partition of the topic

Topic:demo-topic PartitionCount:4 ReplicationFactor:2 Configs: Topic: demo-topic Partition: 0 Leader: 3 Replicas: 3,4 Isr: 4 Replicas: 4,3 Topic: demo-topic Partition: 1 Leader: 4 Replicas: 4,1 Isr: 1,4 Topic: demo-topic Partition: 2 Leader: 4 Replicas: 4 1 Replicas: 1,2 Isr: 1,2 Topic: Demo-topic Partition: 3 Leader: 2 Replicas: 2,3 Isr: 2,3Copy the code

Topic is four partitions, so it is normal for four consumers to consume. The problem may be that consumers are spending too slowly, or they are spending abnormally.

screening

jstack -l pid

2017-12-27 04:06:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):

"Attach Listener" #12286 daemon prio=9 os_prio=0 tid=0x00007f2920001000 nid=0x3087 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-3" #9263 prio=5 os_prio=0 tid=0x00007f287400d800 nid=0x2440 waiting on condition [0x00007f285e6eb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048874b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-service-dpqpc-1510557406684-e2171bd6-0-4" #9262 prio=5 os_prio=0 tid=0x00007f28740c2800 nid=0x243f waiting on condition [0x00007f291950d000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048086d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-service-dpqpc-1510557406684-e2171bd6-leader-finder-thread" #9261 prio=5 os_prio=0 tid=0x0000000002302800 nid=0x243e waiting on condition [0x00007f28bd1df000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000703d06518> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007f28f8e86000 nid=0x51 waiting on condition [0x00007f28bd3e1000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007f28f8e84800 nid=0x50 waiting on condition [0x00007f28bd4e2000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000070440cd38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-service-dpqpc-1510557406684-e2171bd6_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007f28fb685800 nid=0x4e waiting on condition [0x00007f28bd8e4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007048878d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonThe $1.run(ZookeeperConsumerConnector.scala:544)
Copy the code

Above consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume consume kafka

error log

The 2017-12-16 12:53:34. 257 INFO 7 - [0.2.84.118:2181] final onsumer. ZookeeperConsumerConnector: [xxx-service-q7vch-1510557399475-b1d7d22c], begin rebalancing consumer xxx-service-q7vch-1510557399475-b1d7d22c try# 1The 2017-12-16 12:53:34. 348 INFO 7 - [0.2.84.118:2181] kafka. Consumer. ConsumerFetcherManager: [ConsumerFetcherManager-1510557399586] Stopping Leader Finder 2017-12-16 12:53:34.348 INFO 7 -- [0.2.84.118:2181] kafka. Consumer. ConsumerFetcherManager: [consumerFetcherManager-1510557399586] Stopping All fetchers 2017-12-16 12:53:34.348 INFO 7 -- [0.2.84.118:2181] kafka.consumer.ConsumerFetcherManager : [ConsumerFetcherManager-1510557399586] All Connections Stopped 2017-12-16 12:53:34.348 INFO 7 -- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector : [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared all relevant queuesforThis fetcher 12:53:34 2017-12-16. 348 INFO 7 - [0.2.84.118:2181] final onsumer. ZookeeperConsumerConnector: [xxx-service-q7vch-1510557399475-b1d7d22c], Cleared the data chunksin2017-12-16 12:53:34.348 INFO 7 -- [0.2.84.118:2181] all the Consumer Message Iterators 2017-12-16 12:53:34.348 INFO 7 -- [0.2.84.118:2181] k.consumer.ZookeeperConsumerConnector : [xxx-service-q7vch-1510557399475-b1d7d22c], Committing all offsets after clearing the fetcher queuesCopy the code

Logs have little trace of consuming messages, but there is a lot of lag.

See abnormal log at the beginning, to find this, plus the above jstack, see the ConsumerFetcherThread has been blocking PartitionTopicInfo. The enqueue, skeptical is caused rebalance deadlock or jam. Jstack forgot to add -l and could not see the deadlock information. Look it up online and see ConsumerFetcherThread DEADLOCK? There is a similar issue mentioned in kafka0.8.2.2, but it should be fixed. And then I saw

The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it?

I’m starting to wonder if my business thread is hanging without catching an exception, so there’s no consumption. I’m going to restart the program, and I’m going to look at the log, the consumption messages. And then jStack

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-3" #81 prio=5 os_prio=0 tid=0x00007fe39c004000 nid=0x63 waiting on condition [0x00007fe3931f4000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007822ac4e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-xxx-376jt-1514353818187-b37be1c0-0-4" #80 prio=5 os_prio=0 tid=0x00007fe39c003000 nid=0x62 waiting on condition [0x00007fe3926ea000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007821c9a68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply$mcV$sp(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequestThe $1.apply(AbstractFetcherThread.scala:109)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"xxx-376jt-1514353818187-b37be1c0-leader-finder-thread" #79 prio=5 os_prio=0 tid=0x0000000001f5a000 nid=0x61 waiting on condition [0x00007fe3920e7000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782154c30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"consume-2" #62 prio=5 os_prio=0 tid=0x00007fe48da13800 nid=0x51 runnable [0x00007fe392ff1000]
   java.lang.Thread.State: RUNNABLE
    //......
    at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1948)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1768)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1751)
    at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:625)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:590)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:582)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptorThe $1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"consume-1" #61 prio=5 os_prio=0 tid=0x00007fe48e310000 nid=0x50 runnable [0x00007fe3930f2000]
   java.lang.Thread.State: RUNNABLE
    //...
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1157)
    at org.springframework.data.mongodb.core.MongoTemplate$12.doInCollection(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:463)
    at org.springframework.data.mongodb.core.MongoTemplate.doUpdate(MongoTemplate.java:1137)
    at org.springframework.data.mongodb.core.MongoTemplate.upsert(MongoTemplate.java:1099)
    at com.xxx.consumer.KafkaStreamProcessor.process(KafkaStreamProcessor.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptorThe $1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"xxx-376jt-1514353818187-b37be1c0_watcher_executor" #59 prio=5 os_prio=0 tid=0x00007fe48fe7c000 nid=0x4e waiting on condition [0x00007fe3934f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000782155248> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonThe $1.run(ZookeeperConsumerConnector.scala:544)
Copy the code

Contrast, turns out that the suspect has been blocking ConsumerFetcherThread in PartitionTopicInfo. The enqueue after the restart still exist, therefore may be normal.

In contrast with Consume -1, the problem was found, and the thread stack in question did not see its business method, which was discovered after a restart. Therefore, it became clear that the problem was caused by no catch exception.

Business methods

The original business approach was roughly as follows

@Async
public void process(KafkaStream<byte[], byte[]> stream){
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); }}Copy the code

There is a problem here that if there is no catch for thread exceptions, theoretically the id of the new thread should increase. However, through experiments, it is found that the id of the thread remains unchanged after the async exception is thrown.

Spring – the core – 4.3.13. RELEASE – sources. The jar! /org/springframework/util/CustomizableThreadCreator.java

public class CustomizableThreadCreator implements Serializable {
    private final AtomicInteger threadCount = new AtomicInteger(0);
    /**
     * Template method for the creation of a new {@link Thread}.
     * <p>The default implementation creates a new Thread for the given
     * {@link Runnable}, applying an appropriate thread name.
     * @param runnable the Runnable to execute
     * @see #nextThreadName()
     */
    public Thread createThread(Runnable runnable) {
        Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(isDaemon());
        return thread;
    }

    /**
     * Return the thread name to use for a newly created {@link Thread}.
     * <p>The default implementation returns the specified thread name prefix
     * with an increasing thread count appended: e.g. "SimpleAsyncTaskExecutor-0".
     * @see #getThreadNamePrefix()
     */
    protected String nextThreadName() {
        returngetThreadNamePrefix() + this.threadCount.incrementAndGet(); } / /... }Copy the code

The threadCount here doesn’t see the Decrement method called, so if a thread dies unexpectedly, the newly added thread ID should theoretically be incremented.

/ Library/Java/JavaVirtualMachines jdk1.8.0 _71. JDK/Contents/Home/SRC. Zip! /java/util/concurrent/ThreadPoolExecutor.java

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while(task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; //if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

The debug found that completedAbruptly was false, so the business thread did not throw an exception. Suddenly think of async annotation interception, gradually suddenly enlightened.

AsyncExecutionInterceptor

Spring aop — 4.3.13. RELEASE – sources. The jar! /org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

@Override public Object invoke(final MethodInvocation invocation) throws Throwable { Class<? > targetClass = (invocation.getThis() ! = null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result  = invocation.proceed();if (result instanceof Future) {
                        return((Future<? >) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); }returnnull; }};return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
Copy the code

Async annotations by AsyncExecutionInterceptor blocking, then the package layer and handle the exception, so there is no exception in the thread pool.

summary

  • When kafka consumption data is used, the lag value of offset needs to be monitored in real time to confirm whether the consumption speed is ok
  • The iterator consuming thread that calls KafkaStream must catch the exception or stop consuming if it does.

doc

  • ConsumerFetcherThread deadlock?
  • Java Highlevel Consumer is stuck and the lag is increasing