Today, I’m here to tell you about Redis publish/subscribe.

Maybe some friends are strange to this function, not quite clear what this function is, it doesn’t matter little black brother first to give an example.

Suppose we have a business scenario where, after an order is placed and paid on the website, we need to notify the inventory service for shipment processing.

It is not difficult to realize the above business. We only need to provide the inventory service to the relevant suppliers, and only need to call the inventory service after placing orders and paying.

Later, if there is a new business, such as points service, he needs to obtain the results of ordering and paying, and then increase the user’s points.

It is also easy to implement, let the points service also provide an interface, after the order payment just call the inventory service.

If only two businesses need to obtain the results of order and payment, that is also good, the program transformation is also fast. But as business continues to grow, more and more new business is said to be the result of ordering and paying.

In this case, we find that the above system architecture has many problems:

First, the order and payment business is heavily coupled with other businesses. Whenever a new business needs payment results, the order and payment business needs to be changed.

Second, if too many services are called, the response time of the order and payment interface will become longer. In addition, if the response of any downstream interface becomes slow, the response of the order and payment interface will become longer synchronously.

Third, if either downstream interface fails, data inconsistency may result. For example, A is called first, B is called after success, and C is called again.

If an exception occurs during the call of interface B, it may cause the return failure of the order and payment interface at this time, but interface A has actually been successfully called at this time, which means that it has processed the result of the successful order and payment internally.

This will lead to the three downstream interfaces A, B and C. A has successfully obtained the payment results, but B and C have not, resulting in inconsistent data of the three systems.

In fact, if we think about it carefully, for the order and payment business, it does not need to care about the result of the downstream call, as long as there is some mechanism to notify them.

So that brings us to today’s publish-subscribe mechanism.

Redis publish and subscribe

Redis provides a messaging mechanism based on a publish/subscribe model, in which publishers and subscribers do not need to communicate directly.

As shown in the figure above, the message publisher only needs to publish the message on the channel that he wants to specify, and every client that subscribes to that channel can receive the message.

Using the mechanism of Redis publication and subscription, for the above business, the order and payment business only needs to send messages to the channel of payment results, and other downstream businesses subscribe to the channel of payment results, they can receive corresponding messages, and then make business processing.

This decouples the call relationship between the upstream and downstream of the system.

Let’s take a look at how to publish subscriptions using Redis.

Redis provides a set of commands that can be used to publish messages, subscribe to channels, unsubscribe, and subscribe by pattern.

First let’s look at how to publish a message, which is as simple as using the publish directive:

publish channel message
Copy the code

In the figure above, we send a message to the pay_result channel using the publish directive. We can see that Redis returns 0 to us, which actually represents the number of current subscribers. Since there are no subscriptions at this point, it returns 0.

Next we use subscribe to subscribe to one or more channels

subscribe channel [channel ...]
Copy the code

As shown above, we subscribe to the channel pay_result, and when other clients send messages to the channel,

The current subscriber will receive the message.

We are using the subscribe command, we need the main points:

First, after executing the subscription command, the client will enter the subscription state and can only receive the four commands: SUBSCRIBE, pSUBSCRIBE, unsubscribe and punsubscribe.

Second, newly subscribed clients will not be able to receive messages before this channel, because Redis does not persist published messages.

Compared to many professional MQ, such as Kafka and RocketMQ, Redis publish-subscribe functionality is a bit primitive. However, Redis publish-and-subscribe is simple, and if the current scenario can tolerate these disadvantages, you can still choose to use it.

In addition to the above features, Redis also supports pattern-matching subscriptions. Simply put, clients can subscribe to a pattern marked with an asterisk (*), and if certain channel names match the pattern, clients that subscribe to the pattern will receive messages when other clients send messages to those channels.

To use the Redis subscription model, we need to use a new instruction, pSUBSCRIBE.

We execute the following command:

psubscribe pay.*
Copy the code

So once other clients send messages to channels starting with pay, such as pay_result and pay_xxx, we can receive messages.

If we need to unsubscribe, we need to use the corresponding punsubscribe instruction, such as unsubscribe from the above mode:

punsubscribe pay.*
Copy the code

Redis client publish subscription usage

Develop publish/subscribe based on Jedis

After talking about the Redis publish subscribe directive, let’s take a look at how the Java Redis client uses publish subscribe.

The following examples are based on Jedis, and the Maven version is:

<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>3.1.0</version>
</dependency>
Copy the code

Other Redis clients are much the same.

Jedis publishing code is relatively simple, requiring only a call to the Jedis class’s Publish method.

// Do not use this method in production environment. JedisPool is recommended
Jedis jedis = new Jedis("localhost".6379);
jedis.auth("xxxxx");
jedis.publish("pay_result"."hello world");
Copy the code

The subscription code is relatively complex, we need to inherit the JedisPubSub implementation of the relevant methods, once other clients send messages to the subscribed channel, JedisPubSub will call the corresponding methods.

private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Received subscription channel:" + channel + "News:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("Received specific subscription channel:" + channel + Subscription Model: + pattern + "News:"+ message); }}Copy the code

Next we need to call the SUBSCRIBE method of the Jedis class:

Jedis jedis = new Jedis("localhost".6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");
Copy the code

Subscribers will receive messages when other clients send messages to the Pay_result channel.

Note, however, that jedis#subscribe is a blocking method that blocks the main thread when called, so if you need to use it in a formal project that needs to be run on an asynchronous thread, the code is not shown here.

Develop publish subscriptions based on Spring-data-Redis

The native Jedis publish-subscribe operation is still relatively complex. Many of our applications have been developed based on SpringBoot. Using Spring-boot-starter-data-redis simplifies publish-subscribe development.

First we need to introduce the corresponding Startter dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
Copy the code

Here we use Jedis as the underlying connection client, so we need to exclude lettuce and then introduce Jedis dependencies.

Then we need to create a message receiving class with methods to consume messages:

@Slf4j
public class Receiver {
    private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {
        log.info("Received <" + message + ">");
        counter.incrementAndGet();
    }

    public int getCount(a) {
        returncounter.get(); }}Copy the code

Then we just need to inject spring-Redis related beans, for example:

  • StringRedisTemplateTo operate the Redis command
  • MessageListenerAdapter, message listener, which can be injected into this class to create the message receiving class above us Receiver
  • RedisConnectionFactoryTo create the Redis underlying connection
@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // Subscribe to the specified channel using ChannelTopic
        // The subscription pattern uses PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // Inject Receiver, which specifies the receiving method in the class
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver(a) {
        return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return newStringRedisTemplate(connectionFactory); }}Copy the code

Finally we use StringRedisTemplate#convertAndSend to send the message, and the Receiver will receive a message.

@SpringBootApplication
public class MessagingRedisApplication {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {
            template.convertAndSend("pay_result"."Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0); }}Copy the code

Redis publishes subscription applications

The Redis Sentinel node was detected

Redis Sentinel is a high availability solution of Redis, which can automatically promote the secondary node to the primary node when the primary node fails.

We are not going to explain the details of Redis Sentinel, but we are going to look at how Redis Sentinel uses the publish-subscribe mechanism.

The Redis Sentinel node mainly uses the publish-subscription mechanism to realize the discovery of new nodes and the exchange of states between the main nodes.

As shown below, each Sentinel node will regularly send messages to the _sentinel_: Hello channel, and each Sentinel will subscribe to this node.

Once a node sends a message to the channel, other nodes can receive the message immediately.

In this way, once a new node joins, it sends a message to the channel, and when other nodes receive it, it determines that the node does not exist in the local list, so it can be added to the local node list as a new node.

In addition, each message sent to this channel can contain the status information of nodes, which can be used as the basis for Sentinel leader election later.

The above is for the Redis server, for the client, we can also use the publish and subscribe mechanism.

When Redis Sentinel fails over the primary node, each stage of the process is provided externally through a publish subscription.

For our client, we are more concerned about the master node after the switchover, so that we can timely switch the connection of the master node (the old node is now faulty and can no longer accept operation instructions).

Clients can subscribe to the +switch-master channel and publish messages about the master node once Redis Sentinel has finished its failover of the master node.

Redission Distributed lock

The Redission open source framework provides some convenient methods to operate Redis, among which the famous Redission implements distributed locking based on Redis.

Today we look at the implementation of Redis distributed lock how to use Redis publish and subscribe mechanism, improve lock performance.

PS: Redission distributed lock implementation principle, can refer to the previous article:

  1. Implementation of reentrant distributed lock
  2. Redis distributed lock, seemingly simple, actually not simple

First let’s look at the redission lock method:

Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();
Copy the code

RLock inherits the Java standard Lock interface and calls the Lock method. If the current Lock has been acquired by another client, the current locked thread will be blocked until the other client releases the Lock.

How does the currently blocking thread sense that the distributed lock has been released?

There are actually two ways to do this:

The first clock, periodic query distribution lock state, once found that the lock has been released (Redis does not exist this key value), then to lock.

The pseudo-code is as follows:

while (true) {
  boolean result=lock();
  if (!result) {
    Thread.sleep(N);
  }
}
Copy the code

This approach is simple to implement, but has many disadvantages.

If the scheduled task duration is too short, the query times are too many. In fact, these queries are invalid.

If the sleep time of scheduled tasks is too long, the locking time is too long, resulting in poor locking performance.

Then the second implementation scheme is to use the mechanism of service notification. When the distributed lock is released, the client can receive the message of lock release and then add the lock in the first time.

This service notification mechanism we can use Redis publish subscribe model.

When a thread fails to lock, it will subscribe to the redisson_lock__channel_xxx (xx stands for lock name) channel, listen for messages using an asynchronous thread, and then block the current thread using Java Semaphore.

Once other clients unlock, redission sends an unlock message to this redisson_lock__channel_xxx.

When the asynchronous thread receives the message, it will call Semaphore to release the Semaphore, which will wake up the currently blocked thread to lock.

Ps: Here is only a brief description of redission locking part of the principle, for space, here is no message parsing source.

Interested partners can look at redission lock source code.

Through the publisk-subscribe mechanism, blocked threads can be woken up in time, reducing invalid idle queries and effectively improving the efficiency of locking.

Ps: This way, the performance does improve, but the implementation of the complexity is also very high, this part of the source code something, fast dizzy.

conclusion

Today we mainly introduce Redis publish and subscribe function, mainly corresponding to Redis command is:

  • subscribe channel [channel …] Subscribe to one or more channels
  • Unsubscribe channel Unsubscribe a specified channel
  • Publish Channel Message Publishes a message
  • Psubscribe Pattern Subscribes to a specified pattern
  • Punsubscribe Pattern Unsubscribes the specified pattern

We can use Redis publish and subscribe function to achieve simple MQ function, to achieve upstream and downstream decoupling.

Note, however, that since messages published by Redis are not persisted, new subscribed clients will not receive historical messages.

So, if the current business scenario does not tolerate these shortcomings, use professional MQ instead.

Finally, two Redis publishing and subscribing scenarios are introduced for your reference.

Welcome to pay attention to my public account: procedures to get daily dry goods push. If you are interested in my topics, you can also follow my blog: studyidea.cn