GitHub 9.4K Star Java engineers become god’s path, not to learn about it?

GitHub 9.4K Star Java engineers become god’s way, really not to know?

GitHub 9.4K Star Java engineers become god’s way, really sure not to check out?

As we all know, Redis is a high-performance key-value database. In the NoSQL database market, Redis itself occupies nearly half of the market, which is enough to see its strength. Also, because of the single-threaded nature of Redis, we can use it as a message queue. This article shows how to integrate Redis into Spring Boot and use it as a message queue…

What is a message queue

A message queue is a container that holds messages during their transmission. — Baidu Encyclopedia

A message can be thought of as data that passes through a computer or an entire computer network.

Queues are one of the basic data structures that we learn when we learn data structures, and they have first-in, first-out characteristics.

So, a message queue is a container that holds messages, and it has a first-in, first-out feature.

Why do message queues exist?

  1. Asynchronous: In the B/S architecture, the client sends a request to the server, but the server takes a long time to process the request. If the client waits for the server to finish processing the request, the system resources on the client will be wasted. After using the message queue, the server directly pushes the message to the message queue, and the special message processing program processes the message, so that the client does not have to spend a lot of time waiting for the response of the server;
  2. Decoupling: traditional software development mode, the call between modules is direct call, such a system is not conducive to the expansion of the system, at the same time, the mutual call between modules, the sharing of data between the problem is also very big, each module should always consider whether other modules will hang; With message queues, modules do not call each other directly, but through data, and when a module dies, the data is still stored in the message queue. The most typical is the producer-consumer model, which is used in this case;
  3. Peak load: At a certain point, the number of concurrent requests exceeds the maximum processing capacity of the system. If no processing is done, the system will crash. After using the message queue, the server pushes the request to the message queue, and the special message processor consumes the message at a reasonable speed, reducing the pressure on the server.

Here’s a quick look at message queues

As can be seen from the figure above, the message queue acts as a middleman and we can manipulate the message queue to ensure the stability of our system.

Two, environmental preparation

Java environment: JDK1.8

Spring Boot version: 2.2.1.release

Redis-server version: 3.2.100

Third, related dependencies

Only redis-related dependencies are shown here,

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
</dependency>
Copy the code

Here are two dependencies to explain:

  • The first dependency is support for Redis NoSQL
  • The second dependency is the combination of Spring Integration and Redis, which was added here primarily for distributed locking

4. Configuration files

Only redis-related configurations are shown here

Spring.redis. host=localhost # redis database index, starting from 0, can be viewed from the visual client of Redis, spring.redis.database=1 # redis port, Port =6379 # redis password Spring. Redis. Password = # redis timeout (ms), Default is 2000 spring. Redis. Maximum number of connections timeout = 5000 # connection pool spring. Redis. Jedis. Pool. The Max - active = 16 # connection pool minimum free connection Spring. Redis. Jedis. Pool. Min - idle = 0 # connection pool Max idle connection spring. Redis. Jedis. Pool. Max - idle = 16 # connection pool biggest jam waiting time (a negative number indicates no limit) Spring. Redis. Jedis. Pool. Max - wait = 1 # connecting redis client name is spring. The redis. The client - name = mallCopy the code

Five, code configuration

Redis is used as the message queue, its main show is in the spring to boot a RedisTemplate convertAndSend () method and a MessageListener interface. So we’ll inject a RedisTemplate and a class that implements the MessageListener interface into the IOC container. Without further ado, let’s look at the code

Configuration RedisTemplate

The main purpose of configuring RedisTemplate is to configure the serialization method to solve the garble problem, and also to reduce the performance overhead.

/** * RedisTemplate, @bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory Factory) {logger.debug (" Redis serialization configuration starts "); RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); / / string serialization RedisSerializer serializer = new GenericJackson2JsonRedisSerializer (); / / set the default template serialization way. SetDefaultSerializer (serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); Logger. debug(" Redis serialization configuration ends "); return template; }Copy the code

Code line 12, we configure the default serialization way for GenericJackson2JsonRedisSerializer

In line 13, we configure StringRedisSerializer for the key

Code line 14, we configure the value of the hash table for GenericJackson2JsonRedisSerializer serialization of the way

RedisTemplate a brief introduction to several serialization methods

Serialization mode introduce
StringRedisSerializer Serialize an object as a string, but tests failed to serialize an object, usually on a key
OxmSerializer Serialize objects to XML properties, which are essentially strings
ByteArrayRedisSerializer Default serialization, which serializes an object to binary bytes, but requires the object to implement the Serializable interface
GenericFastJsonRedisSerializer Json serialization, using the FastJSON serialization method to serialize objects
GenericJackson2JsonRedisSerializer Json serialization, which serializes objects using the Jackson serialization method

Redis queue Listener (consumer)

As mentioned above, the class associated with the Redis queue listener is an interface named MessageListener. Here is the source code for this interface

public interface MessageListener {
    void onMessage(Message message, @Nullable byte[] pattern);
}
Copy the code

As you can see, the interface has only one onMessage(Message Message, @Nullable Byte [] Pattern) method, which is the callback method that listens for messages in the queue. Let’s explain these two parameters:

  • Message: Redis message class, which has only two methods
    • byte[] getBody()Gets the message body in binary form
    • byte[] getChannel()Gets the message channel in binary form
  • Pattern: message channels in binary form, andmessage.getChannel()Return the same value

Having introduced the interface, let’s implement a simple Redis queue listener

@Component public class RedisListener implement MessageListener{ private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); @override public void onMessage(Message Message,byte[] pattern){logger. debug(" Listen to Message from Message channel ={} ",new String(pattern)); Logger.debug (" Listen for messages from message channel ={} ",new String(message.getChannel())); Logger.debug (" Meta message ={}",new String(message.getBody())); // Create a new object for deserialization, Pay attention to the object and configuration before / / because I set the default serialization mode for GenericJackson2JsonRedisSerializer / / in front of the so here is implemented for GenericJackson2JsonRedisSerializer RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); Logger.debug (" Deserialized message ={}",serializer.deserialize(message.getBody())); }}Copy the code

The code is simple, just the key information contained in the output parameters. Note that the implementation of RedisSerializer must be the same as the serialization configured above.

Once the queue listener is implemented, we need to add this listener to the Redis queue listener container as follows:

@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(factory);
    container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
    return container;
}
Copy the code

These lines of code roughly mean creating a New Redis message listener container, binding the listener to the pipe name, and returning the container.

Note that this pipe name must be the same as the pipe name used to push the message, as described below, otherwise the listener will not hear the message.

7. Redis Queue Push Service (producer)

So we’ve configured the RedisTemplate that we’re going to use here.

The code is as follows:

@Service public class Publisher{ @Autowrite private RedisTemplate redis; public void publish(Object msg){ redis.convertAndSend("demo-channel",msg); }}Copy the code

The key code is line 7, where redis.convertandSend () is a method that pushes a message (second argument) to a channel (argument 1).

Again, the producer and consumer channel names should be the same.

At this point, the producers and consumers of the message queue are all written.

Viii. Problems encountered and solutions

1. Spring Boot uses log4j2 logging framework

After I added the spring-boot-starter-log4j2 dependency and removed spring-boot-starter-logging in spring-boot-starter-web, I still received the following error when running the project:

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [the jar file:... the m2 / repository/ch/qos/logback/logback - classic / 1.2.3 / logback - classic - 1.2.3. Jar! / org/slf4j/impl/StaticLogger Binder.class] SLF4J: Found binding in [the jar file:... the m2 / repository/org/apache/logging/log4j/log4j - slf4j - impl / 2.12.1 / log4j - slf4j - impl - 2.12.1. Jar! / org/slf4j/imp l/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Copy the code

This error is caused by multiple logging frameworks in Maven. After dependency analysis, it is found that spring-boot-starter-logging is also dependent on spring-boot-starter-logging in spring-boot-starter-data-redis. The solution is also very simple, and detailed code is posted below

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
</dependency>
Copy the code

2. Redis queue listener thread safety issues

The listening mechanism of the Redis queue listener is that a thread listens to the queue, and when there are unconsumed messages in the queue, a new thread is generated to consume the messages. If you remember, I started by saying that redis was used for message queues because of its single-threaded nature, but if listeners were to generate new threads to consume information every time they received a message, that would completely miss out on the single-threaded nature of Redis, as well as create thread-safety issues.

A single consumer (one consumer per channel) solution

The simplest way to do this is to lock the onMessage() method, which is simple and useful, but it doesn’t control the rate at which the queue listens, and unlimited threads of creation can end up draining system resources.

So how do you solve this situation? The thread pool.

In the add listeners to the container configuration, RedisMessageListenerContainer setTaskExecutor class has a method (Executor taskExecutor) to monitor the container configuration thread pool. After the thread pool is configured, all threads are generated by the thread pool, so we can adjust the thread pool to control the rate at which the queue listens.

Multiple consumers (multiple consumers on a channel) solution

The problem of a single consumer is relatively simple compared to multiple consumers, because Java built-in locks can only control the execution of their own programs, can not interfere with the execution of other programs; However, many times today we develop in a distributed environment, and it makes sense to deal with multiple consumers.

So how to solve this problem? Distributed locks.

Here’s a brief overview of what distributed locks are:

Distributed lock means that in a distributed environment, only one client can obtain the lock from a shared environment (such as Redis) at a time. Only the client that obtains the lock can execute the program.

However, distributed locks generally meet the following requirements: exclusivity (that is, only one client can acquire the lock at a time), deadlock avoidance (that is, automatic release after timeout), and high availability (that is, the mechanism for acquiring or releasing the lock must be highly available and perform well)

In the dependencies section, we imported a spring-integration-Redis dependency that contains a number of useful utility classes. The next part of the distributed locking toolkit is RedisLockRegistry.

How to use it first. After importing the dependency, first configure a Bean

@Bean public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) { return new RedisLockRegistry(factory,  "demo-lock",60); }Copy the code

Constructor of RedisLockRegistry. The first parameter is the redis connection pool, the second parameter is the lock prefix, that is, the lock taken out, the key name is “demo-lock:KEY_NAME”, and the third parameter is the lock expiration time (seconds). The default is 60 seconds.

Using the lock method, the following changes are made to the listener

@Component public class RedisListener implement MessageListener{ @Autowrite private RedisLockRegistry redisLockRegistry;  private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); @Override public void onMessage(Message message,byte[] pattern){ Lock lock=redisLockRegistry.obtain("lock"); try{ lock.lock(); // Lock logger. debug(" Listen for messages from message channel ={} ",new String(pattern)); Logger.debug (" Listen for messages from message channel ={} ",new String(message.getChannel())); Logger.debug (" Meta message ={}",new String(message.getBody())); // Create a new object for deserialization, Pay attention to the object and configuration before / / because I set the default serialization mode for GenericJackson2JsonRedisSerializer / / in front of the so here is implemented for GenericJackson2JsonRedisSerializer RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); Logger.debug (" Deserialized message ={}",serializer.deserialize(message.getBody())); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); // unlock}}}Copy the code

The code above only adds an injected RedisLockRegistry to the listener code above, one that obtains locks through the redislockregistry.obtain () method, and one that adds locks and one that unlocks. This completes the use of distributed locks.

Note that the Lock acquisition method, redislockregistry. obtain(), returns a Lock named RedisLock, which is a private inner class that implements the Lock interface, so we cannot create an instance of it from outside the code, only the obtian() method can obtain the Lock.