This article is welcome to reprint, reprint the original link, and attach the author’s personal information Li Yanpeng.





Queues and queues

1 Background

Message queue has been widely used in the field of the Internet, it is mostly used in asynchronous processing, module decoupling and high concurrency peak elimination scenarios, message queue performance is the best Apache open source project Kafka, Kafka uses the Scala language development to support high concurrency, using the operating system cache principle to achieve high performance, And naturally with partition, distributed characteristics, and there are different language clients, it is very convenient to use.

Kclient is a simple and easy to use framework for Kafka producer client and consumer client. It has the advanced characteristics of efficient integration, high performance and high stability.

Before continuing to read about kClient’s features, architecture, and usage, the reader needs to have a basic understanding of Kafka. If you are a beginner to Kafka and your English is good, you can also refer to Kafka’s official online documentation: Kafka 0.8.2 Documentation, for those not interested in English, can be found in Kakfa’s Chinese Documentation on the Internet. It will cover Kafka usage guides and the principles of “alley-oop” using operating system caching, persistence, sharding, high availability, etc.

This article contains the background, features, architecture design, usage guide, API introduction, background monitoring and management, message processor template project, and performance pressure test related chapters. If you want to use KClient to quickly build Kafka processor services, refer to the message Processor Template project section. If you want to learn about other kClient usage modes, functions and features, monitoring and management, see functions and features, Usage Guide, API Introduction, background Monitoring and Management. For a more in-depth understanding of kClient’s architecture design and performance metrics, please refer to the architecture Design and Performance metrics section.

2 Functions and Features

2.1 Easy to Use

Kafka client API simplified, especially for the consumer side development, the consumer side developers only need to implement MessageHandler interface or related subclasses, in the implementation of the message to complete the business logic, and in the main thread to start the encapsulated consumer side server. It provides a variety of commonly used MessageHandlers that automatically convert messages to data structures such as domain object models or JSON objects, allowing developers to focus more on business processing. If you use the service source annotation statement message processor of the background, a general service method can be directly transformed into a perfect function of Kafka message queue processor, is easy to use and the code seems to be clear at a glance, at the framework level through a variety of thread pool technology to ensure the high performance processor.

In terms of use, it provides a variety of ways to use:

  1. Use the Java API directly;
  2. Seamless integration with the Spring environment;
  3. Service source annotations that start the Kafka message queue processor with an annotation declaration.

In addition, it provides a template project for message processors based on annotations, from which Kafka’s message processors can be rapidly developed through configuration.

2.2 the high performance

To achieve high performance in different business scenarios, it provides different threading models:

  1. Synchronous threading model for lightweight services;
  2. Asynchronous threading model suitable for IO intensive services (subdivided into shared thread pools for all consumer flows and exclusive thread pools for each flow).

In addition, thread pools in the asynchronous model also support thread pools with a fixed number of threads and thread pools with a scalable number of threads.

2.3 High Stability

The framework level handles common exceptions, logs errors, can be used to manually recover or wash data from errors, and implements elegant shutdown and restart features.

3 Architecture Design

3.1 Thread Model

1. Synchronous thread model

In this threading model, the client uses one thread for each consumer flow, and each thread is responsible for consuming messages from the Kafka queue and performing business processing in the same thread. We call these threads consuming threads, and the thread pool in which they reside is called the message consuming thread pool. This model handles business in the message consumption thread pool because it is mostly used for lightweight business, such as caching queries, local computing, and so on.

2. Asynchronous threading model

In this threading model, the client uses one thread for each consumer flow, and each thread is responsible for consuming messages from the Kafka queue and passing the messages to the asynchronous thread pool at the back end, where the business is processed. We still refer to the earlier thread pool responsible for consuming messages as the message consuming thread pool and the later asynchronous thread pool as the asynchronous business thread pool. This threading model is suitable for heavy business, such as a large number of IO operations, network IO operations, complex calculations, calls to external systems, etc.

The asynchronous business thread pool at the back end is subdivided into shared thread pools for all consumer flows and exclusive thread pools for each flow.

1). All consumer flows share the thread pool

Creating fewer thread pool objects for all consumer streams can save a bit of memory compared to creating a thread pool for each stream alone, but because multiple streams share the same thread pool, processing between streams can affect each other during large data volumes. For example, a business using 2 area and two flow, they are one-to-one correspondence, by producers to specify custom hash function to replace the default key – hash, to implement a flow (area) to deal with ordinary users, another flow (area) to handle the VIP users, if two flow share a thread pool, when the production of large Numbers of ordinary users of news, If there are only a few VIP users and they are at the back of the queue, they will starve to death. This scenario can be solved by using multiple topics, one for ordinary users and one for VIP users, but in this way, one more topic needs to be maintained, and the client needs to explicitly judge the topic target when sending, which is not much good.

2). Each stream has its own thread pool

Each flow exclusive thread pool using different asynchronous thread pool to dealing with different flow inside of the message, isolated from each other, each other independence, do not affect each other, for different flow (area) the priority of different situations, or messages in different flow (district) under the condition of uneven performance will be better, of course, to create multiple thread pool will use a little more memory, But this is not a big problem.

In addition, asynchronous business thread pools support thread pools with a fixed number of threads and thread pools with a scalable number of threads.

1). If core business hardware resources are guaranteed, core services have exclusive resource pools, or online traffic can be predicted, please use a fixed number of thread pools.

2). If non-core businesses are generally mixed, resources are allocated to each other, and online traffic is not fixed, please use thread pool with scalable number of threads.

3.2 Exception Handling

For service exceptions generated during message processing, Throwable is captured on the upper layer of service processing and the error message is recorded in the dedicated error recovery log. In the future, you can manually process the error message based on the error recovery log or redo or wash data. TODO: Consider the implementation of the exception Listener architecture, the implementation of the exception processing Listener mode, exception handler pluggable, and so on, the default print error log.

Because of the default exception handling, the exception is caught, the error is recorded in a dedicated error reply log, and processing continues to the next message. We need to resort to alarm and monitoring systems to solve the problem, considering that there may be a failure to go online, or the upstream message format may be wrong, causing all message processing to be wrong and filling up with error recovery logs.

3.3 Graceful Shutdown

Since the Consumer itself is an event-driven server, similar to Tomcat, which receives HTTP requests and returns HTTP responses, the Consumer receives a Kafka message, processes the transaction and returns, and can also send the processing results to the next message queue. So the consumer itself is very complex, in addition to the threading model, exception handling, performance, stability, availability and so on are all points to think about. Since the consumer is a background server, we need to consider how to gracefully shut down the consumer server while it is processing messages, so as not to interrupt the processing of messages and lose them.

The key to an elegant shutdown is to solve the following three problems:

  1. How do I know if the JVM is exiting?
  2. How do I prevent Daemon threads from being killed during JVM exit and messages being lost?
  3. If the Worker thread is blocking, how can it be invoked and quit?

First question: If a daemon is running in the foreground of the console, you can send an exit signal to the JVM with Ctrl + C, or you can send an exit signal with kill -2 PS_IS or kill -15 PS_IS, but not kill -9 PS_IS, otherwise the process will exit unconditionally. When the JVM receives the exit signal, it calls the registered hook, and the registered JVM we passed exits the hook for a graceful shutdown.

Second problem: Threads are divided into Daemon threads and non-Daemon threads. A thread inherits Daemon attributes from its parent thread by default. If the current thread pool is created by a Daemon thread, the Worker thread is a Daemon thread. If Worker threads were Daemon threads, we would need to wait in the JVM exit hook for the Worker thread to complete the message it is currently processing before exiting the JVM. If it is not a Daemon thread, the JVM will have to wait for the Worker thread to exit, even if it receives the exit signal, without losing the message it is processing.

Third problem: When the Worker thread consumes messages from the Kafka server, the Worker thread may be blocked and the thread needs to be interrupted to exit without messages being dropped. It is possible to block when the Worker thread processes a business, for example: IO, the network IO, does not complete within the specified exit time. We also need to interrupt the thread to exit, which generates an InterruptedException that is caught in the default handler for exception handling and written to the error log, and the Worker thread exits.

4 Usage Guide

Kclient provides three ways to use it, and for each of them, follow these steps to quickly build Kafka producer and consumer programs.

4.1 Procedure

  1. After downloading the source code, run the following command in the project root directory to install the package files into your Maven local library.

    mvn install

  2. Add a dependency on KClient to your project pom.xml file.

     <dependency>
         <groupId>com.robert.kafka</groupId>
         <artifactId>kclient-core</artifactId>
         <version>0.0.1</version>
     </dependency>Copy the code
  3. Create two topics, test1 and test2.

  4. Then copy kafka-consumer.properties and kafka-producer.properties from the config directory of the Kafka installation directory to your project classpath, usually SRC /main/resources.

4.2 Java API

The Java API provides the most straightforward and easiest way to use KClient.

Build the Producer example:

KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties"."test");

for (int i = 0; i < 10; i++) {
    Dog dog = new Dog();
    dog.setName("Yours " + i);
    dog.setId(i);
    kafkaProducer.sendBean2Topic("test", dog);

    System.out.format("Sending dog: %d \n", i + 1);

    Thread.sleep(100);
}Copy the code

Build the Consumer example:

DogHandler mbe = new DogHandler();

KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties"."test".1, mbe);
try {
    kafkaConsumer.startup();

    try {
        System.in.read();
    } catch(IOException e) { e.printStackTrace(); }}finally {
    kafkaConsumer.shutdownGracefully();
}Copy the code
public class DogHandler extends BeanMessageHandler<Dog> {
    public DogHandler(a) {
        super(Dog.class);
    }

    protected void doExecuteBean(Dog dog) {
        System.out.format("Receiving dog: %s\n", dog); }}Copy the code

4.3 Spring Environment Integration

Kclient integrates seamlessly with the Spring environment, and you can use KafkaProducer and KafkaConsumer just like Spring Beans.

Build the Producer example:

ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml");

KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer");

for (int i = 0; i < 10; i++) {
    Dog dog = new Dog();
    dog.setName("Yours " + i);
    dog.setId(i);
    kafkaProducer.send2Topic("test", JSON.toJSONString(dog));

    System.out.format("Sending dog: %d \n", i + 1);

    Thread.sleep(100);
}Copy the code
<bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init">
    <property name="propertiesFile" value="kafka-producer.properties"/>
    <property name="defaultTopic" value="test"/>
</bean>Copy the code

Build the Consumer example:

ApplicationContext ac = new ClassPathXmlApplicationContext(
        "kafka-consumer.xml");

KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer");
try {
    kafkaConsumer.startup();

    try {
        System.in.read();
    } catch(IOException e) { e.printStackTrace(); }}finally {
    kafkaConsumer.shutdownGracefully();
}Copy the code
public class DogHandler extends BeanMessageHandler<Dog> {
    public DogHandler(a) {
        super(Dog.class);
    }

    protected void doExecuteBean(Dog dog) {
        System.out.format("Receiving dog: %s\n", dog); }}Copy the code
<bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" />

<bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init">
    <property name="propertiesFile" value="kafka-consumer.properties" />
    <property name="topic" value="test" />
    <property name="streamNum" value="1" />
    <property name="handler" ref="dogHandler" />
</bean>Copy the code

4.4 Service source code annotations

Kclient provides a similar Spring declarative programming method, using annotations to declare Kafka processor method, all thread model, exception handling, service start and close are automatically completed by the background service, greatly simplifies the use of API, improve the efficiency of the developer.

Comment declaration Kafka message handler:

@KafkaHandlers
public class AnnotatedDogHandler {
    @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
    @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
    public Cat dogHandler(Dog dog) {
        System.out.println("Annotated dogHandler handles: " + dog);

        return new Cat(dog);
    }

    @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)
    public void catHandler(Cat cat) throws IOException {
        System.out.println("Annotated catHandler handles: " + cat);

        throw new IOException("Man made exception.");
    }

    @ErrorHandler(exception = IOException.class, topic = "test1")
    public void ioExceptionHandler(IOException e, String message) {
        System.out.println("Annotated excepHandler handles: "+ e); }}Copy the code

Annotation launcher:

public static void main(String[] args) {
    ApplicationContext ac = new ClassPathXmlApplicationContext(
            "annotated-kafka-consumer.xml");

    try {
        System.in.read();
    } catch(IOException e) { e.printStackTrace(); }}Copy the code

Note the Spring environment configuration:

<bean name="kclientBoot" class="com.robert.kafka.kclient.boot.kclientBoot" init-method="init"/>

<context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" />Copy the code

5 the API profile

5.1 Producer API

The KafkaProducer class provides a rich API for sending different types of messages. It supports sending string messages, sending a plain Bean, sending JSON objects, and more. You can specify sending to a Topic in these apis, or you can use the default Topic without specifying it. Messages with and without keys are supported for sent data.

Send a string message:

public void send(String message);
public void send2Topic(String topicName, String message); 
public void send(String key, String message); 
public void send2Topic(String topicName, String key, String message); 
public void send(Collection<String> messages); 
public void send2Topic(String topicName, Collection<String> messages); 
public void send(Map<String, String> messages); 
public void send2Topic(String topicName, Map<String, String> messages);Copy the code

Send Bean messages:

public <T> void sendBean(T bean); 
public <T> void sendBean2Topic(String topicName, T bean); 
public <T> void sendBean(String key, T bean); 
public <T> void sendBean2Topic(String topicName, String key, T bean); 
public <T> void sendBeans(Collection<T> beans); 
public <T> void sendBeans2Topic(String topicName, Collection<T> beans); 
public <T> void sendBeans(Map<String, T> beans); 
public <T> void sendBeans2Topic(String topicName, Map<String, T> beans);Copy the code

Send JSON object messages:

public void sendObject(JSONObject jsonObject); 
public void sendObject2Topic(String topicName, JSONObject jsonObject); 
public void sendObject(String key, JSONObject jsonObject); 
public void sendObject2Topic(String topicName, String key, JSONObject jsonObject); 
public void sendObjects(JSONArray jsonArray); 
public void sendObjects2Topic(String topicName, JSONArray jsonArray); 
public void sendObjects(Map<String, JSONObject> jsonObjects); 
public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects);Copy the code

5.2 Consumer API

The KafkaConsumer class provides rich constructors that specify various parameters of the Kafka consumer server, including thread pool policy, thread pool type, number of streams, and so on.

Initialize with the PROPERTIES file:

public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);Copy the code

Initialize with the PROPERTIES object:

public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);Copy the code

5.3 Message Processor

The message processor structure provides a basic interface, and provides different abstract classes to achieve different levels of functionality, so that functions can be maximized and decouple from each other. Developers can choose an abstract class to inherit and use according to their needs.

Interface definition:

public interface MessageHandler {
    public void execute(String message);
}Copy the code

Abstract classes for handling exceptions safely:

public abstract class SafelyMessageHandler implements MessageHandler {
    public void execute(String message) {
        try {
            doExecute(message);
        } catch(Throwable t) { handleException(t, message); }}protected void handleException(Throwable t, String message) {
        for (ExceptionHandler excepHandler : excepHandlers) {
            if(t.getClass() == IllegalStateException.class && t.getCause() ! =null&& t.getCause().getClass() == InvocationTargetException.class && t.getCause().getCause() ! =null)
                t = t.getCause().getCause();

            if (excepHandler.support(t)) {
                try {
                    excepHandler.handle(t, message);
                } catch (Exception e) {
                    log.error(
                            "Exception hanppens when the handler {} is handling the exception {} and the message {}. Please check if the exception handler is configured properly.",
                            excepHandler.getClass(), t.getClass(), message);
                    log.error(
                            "The stack of the new exception on exception is, ",
                            e);
                }
            }
        }
    }
}

protected abstract void doExecute(String message);Copy the code

Type-oriented abstract classes:

public abstract class BeanMessageHandler<T> extends SafelyMessageHandler {... }public abstract class BeansMessageHandler<T> extends SafelyMessageHandler {... }public abstract class DocumentMessageHandler extends SafelyMessageHandler {... }public abstract class ObjectMessageHandler extends SafelyMessageHandler {... }public abstract class ObjectsMessageHandler extends SafelyMessageHandler {... }Copy the code

5.4 Message handler Annotations

Kclient can declare Kafka message handlers through annotations, as described above in the service source code annotations in Part 3 of the usage guide. Kclient provides annotations like @kafkaHandlers, @InputConsumer, @OutputProducer and @ErrorHandler.

@KafkaHandlers:

@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface KafkaHandlers {
}Copy the code

@InputConsumer:

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface InputConsumer {
    String propertiesFile(a) default "";

    String topic(a) default "";

    int streamNum(a) default 1;

    int fixedThreadNum(a) default 0;

    int minThreadNum(a) default 0;

    int maxThreadNum(a) default 0;
}Copy the code

@OutputProducer:

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface OutputProducer {
    String propertiesFile(a) default "";

    String defaultTopic(a) default "";
}Copy the code

@ErrorHandler:

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ErrorHandler {
    Class<? extends Throwable> exception() default Throwable.class;

    String topic(a) default "";
}Copy the code

6 Message processor template project

6.1 Quick Development Wizard

You can quickly develop a Kafka processor service by following these steps.

  1. Download the Kclient-Processor project template from this project, modify pom. XML based on service requirements, and import Eclipse.

  2. According to the business need to change the com. Robert. Kclient. App. Handler. AnimalsHandler class name, and according to the business need to modify CPU annotations. Here, you can import business services to process messages.

     @KafkaHandlers
     public class AnimalsHandler {
         @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
         @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
         public Cat dogHandler(Dog dog) {
             System.out.println("Annotated dogHandler handles: " + dog);
    
             return new Cat(dog);
         }
    
         @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)
         public void catHandler(Cat cat) throws IOException {
             System.out.println("Annotated catHandler handles: " + cat);
    
             throw new IOException("Man made exception.");
         }
    
         @ErrorHandler(exception = IOException.class, topic = "test1")
         public void ioExceptionHandler(IOException e, String message) {
             System.out.println("Annotated excepHandler handles: "+ e); }}Copy the code
  3. MVN Package can be used to package self-starting JAR packages containing Spring Boot functionality.

  4. Run the java-jar kclient-processor.jar command to start the service.

6.2 Background Monitoring and management

The KClient template project provides a back-end management interface to monitor and manage message processing services.

1. Welcome – Used to verify whether the service is started successfully.

curl http://localhost:8080/

2. Service status – Displays the number of processors.

curl http://localhost:8080/status

3. Restart the service – Restart the service.

curl http://localhost:8080/restart

7 Performance pressure test

Benchmark should cover push QPS, receive processing QPS, and use cases for single-threaded and multithreaded producers.

Use case 1: Performance comparison of the lightweight service synchronous and asynchronous threading model.

Use case 2: Performance comparison between synchronous and asynchronous threading models for heavyweight services.

Use case 3: Performance comparison of a thread pool shared by all consumer flows and a thread pool exclusive to each flow in the heavyweight service asynchronous threading model.

Use case 4: Performance comparison of a pool with a certain number of threads versus a pool with a scalable number of threads in the heavyweight service asynchronous threading model.

Since the author did not have time to finish the pressure test of the first four scenarios, I leave it to the readers as an exercise.

8 More thinking

Although the kClient project designed and implemented in this paper provides many advanced features and is easy to use, and has been used online by the author in several companies, there are still some details that need to be improved.

  1. Managing Restful services in the KClient processor project only provides an API for obtaining state for the time being, which needs to be further enriched to add monitoring of thread pools and message processing performance.

  2. The exception parameter in the current @errorHandler annotation is mandatory and can be derived using the first parameter of the method to simplify the configuration of the developer.

  3. The template project is still incomplete and needs to add startup and shutdown scripts that readers can copy and use directly.

  4. Although the online application has proved that kClient has no performance problems, the development of a middleware system needs to be closed loop, and the performance test should be performed as soon as possible and the test report should be formed.