Real-time message push based on RabbitMQ

Several ways to implement server-side push

Web applications are based on HTTP request/response mode and cannot maintain a long connection like TCP. Therefore, It is difficult for Web applications to implement real-time message push like mobile phones. At present, there are mainly the following message push modes for Web applications:

1. Short Ajax polling

Ajax polling mainly implements data loading through asynchronous periodic JS refresh task on the page side, but this method has poor real-time effect and great pressure on the server side.

2. Long polling

Long polling is also primarily done through Ajax, but unlike traditional Ajax applications, the server side of long polling blocks requests when there is no data until new data is generated or the request times out, and then the client reestablishes the connection to retrieve the data, as shown in Figure 1. However, the long polling server occupies resources for a long time, and it will bring great pressure to the server if messages are sent frequently.

Figure 1. Long polling implementation

3.WebSocket bidirectional communication

WebSocket is a new communication protocol in HTML5, which can realize full duplex communication between browser and server. If both browser and server support WebSocket protocol, message push implemented in this way is undoubtedly the most efficient and concise. The latest versions of Internet Explorer, Firefox, and Chrome all support WebSocket. Apache Tomcat 7.0.27 and later also support WebSocket.

Introduction of the RabbitMQ

AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is primarily used for decoupling between components so that the sender of a message does not need to be aware of the message consumer and vice versa. The main features of AMQP are message -, queue – and routing-oriented, reliable and secure. RabbitMQ is an open source IMPLEMENTATION of AMQP. The server is written in Erlang and supports a variety of clients, such as Python, Ruby,.NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, and Ajax. It is used to store and forward messages in distributed systems, and has good performance in ease of use, scalability, and high availability. There are a few concepts that need to be understood before using RabbitMQ: Broker, Exchange, Queue, Binding, Routingkey, Producter, Consumer, and Channel.

1.Broker

This is simply the entity of the message queue server.

2.Exchange

Receives the message, forwards the message to the bound queue, and specifies the rules by which the message is routed to the queue.

3.Queue

Message queue carrier, used to store messages. Queues with the same attributes can be defined repeatedly, and each message will be put to one or more queues.

4.Binding

Bind, which binds exchanges and queues according to routing rules.

5.RoutingKey

Routing keyword, which Exchange uses to deliver messages.

6.Producter

Message producer, the program that produces the message.

7.Consumer

Message consumers, the programs that receive messages.

8.Channel

Message channels. Multiple channels can be established in each connection of the client, and each Channel represents a session.

Install the RabbitMQ service

The solution proposed in this article is based on a RabbitMQ message server, so the RabbitMQ service and related plug-ins need to be installed initially. RabbitMQ is developed in the Erlang language, so the Erlang runtime environment must be installed first. The following uses CentOS6.5 64-bit server as an example to describe the installation process.

1. Download erlang-r15B-02.1.el5.x86_64. RPM and install it

# RPM - the ivh Erlang R15B - 02.1 - el5. X86_64. RPMCopy the code

2. Download rabbitmq-server-3.2.1-1.noarch. RPM and install it

# RPM - the ivh the rabbitmq server - 3.2.1-1. Noarch. RPMCopy the code

3. Enable related plug-ins

# rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stompCopy the code

4. Restart the RabbitMQ service

# service rabbitmq-server restartCopy the code

5. Check whether the installation is successful

To check the RabbitMQ running status, enter http://{server_ip}:15672 in the Web browser and log in to the RabbitMQ service using the default user name guest/guest and password.

Real-time message push based on RabbitMQ

RabbitMQ has a number of third-party plugins that extend the AMQP protocol. Web STOMP plug-in is a STOMP text protocol plug-in based on AMQP. It can easily realize real-time messaging between browser and server by using WebSocket, as shown in Figure 2 below:

Figure 2. Real-time messaging implementation between browser and server

Message sender

The following example illustrates the process, with Java as the RabbitMQ client message sender and the Web browser as the message consumer.

Listing 1. Java-side code
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Program { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); 192.168.1.101 factory. SetHost (" "); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("rabbitmq", "fanout"); String routingKey = "rabbitmq_routingkey"; String message = "{\"name\":\"Welcome to RabbitMQ message push! \ "} "; channel.basicPublish("rabbitmq", routingKey,null, message.getBytes()); System.out.println("[x] Sent Message:"+message); channel.close(); connection.close(); }}Copy the code

We use the Java Client Library provided by RabbitMQ to send messages. The process of using the message queue is roughly shown in Figure 3:

Figure 3. Client delivery message flow

After exchange receives the message, it routes the message according to the key and binding set, and finally sends the message to one or more queues for processing. RabbitMQ has pre-configured exchanges. If the client has not declared an exchange, RabbitMQ uses the default exchange based on the exchange type, as shown in Table 1.

Table 1. Preset Exchange names
Name Default pre declared names
Direct exchange amq.direct
Fanount exchange amq.fanout
Topic exchange amq.topic
Heades exchange amq.headers

Exchange type

There are several types of Exchange:

1.Direct exchange

Direct Exchange delivers messages based on keys. Only messages whose keys are exactly the same as the binding routing key are received. See Figure 4 on the official website for a more intuitive view of Direct Exchange.

Figure 4. Direct exchange

2.Fanount exchange

Fanount does not care about the key at all, and broadcasts messages directly. All queues bound to the switch receive messages, as shown in Figure 5 on the official website.

Figure 5. Fanount exchange

3.Topic exchange

Topic exchanges will pattern match the key and then post the message to the queue that matches the set routing key.

4.Headers exchange

Header Exchange uses the message Header instead of the routing key as the key for routing. However, this type of exchange is rarely used in practical applications.

Message persistence

RabbitMQ supports message persistence, that is, message data is persisted to disk. If the message server is disconnected, the message will be sent again the next time it is enabled. Message queue persistence The parts of Exchange (durable=1), Queue (durable=1), and message (delivery_mode=2) must be durable. For data security reasons, messages are typically persisted.

Message receiver

Listing 2.JavaScript code
// Stomp.js boilerplate if (location.search == '? Ws') {var ws = new WebSocket (ws: / / 192.168.1.102:15674 / ws' '); } else {var ws = new SockJS (' http://192.168.1.102:15674/stomp '); } // Init Client var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; // Declare on_connect var on_connect = function(x) { client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) { print_first(d.body); }); }; // Declare on_error var on_error = function() { console.log('error'); }; // Conect to RabbitMQ client.connect('guest', 'guest', on_connect, on_error, '/');Copy the code

The RabbitMQ Web STOMP plug-in can be understood as a bridge between THE HTML5 WebSocket and STOMP protocol to enable browsers to use RabbitMQ. With STOMP and Web STOMP enabled on the RabbitMQ message server, the browser can easily communicate with the RabbitMQ server using a WebSocket or SockerJS client.

RabbitMQ Web STOMP is a bridge to STOMP and therefore follows STOMP syntax. STOMP is a frame-based protocol, similar to HTTP frame. A Frame contains a command, an optional set of headers, and a body. The user agent of STOMP Client can play two roles, or possibly both: as a producer, sending messages to the server via SEND Frame; As the consumer, send the SUBCRIBE Frame to the destination and get the MESSAGE from the server through the MESSAGE Frame.

To use STOMP with WebSocket in a Web page, you only need to download stomp.js. Considering that older browsers do not support WebSocket, SockJS provides simulated support for WebSocket. See the following code listing for using STOMP in Web pages:

Listing 3.JavaScript code
// Initialize the ws object if (location.search == '? Ws') {var ws = new WebSocket (ws: / / 192.168.1.102:15674 / ws' '); } else {var ws = new SockJS (' http://192.168.1.102:15674/stomp '); } // create a connection var client = stomp.over (ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; Var on_connect = function(x) {console.log('connect successfully'); // Send a message client.send(destination,head,body); // Send message client.subcribe(destination,callback); Subcribe (destination,function(message){message.ack (); },{ack:'client'}); Var tx = client.begin(); client.send(destination,head,body); tx.commit(); }; Var on_error = function(error) {console.log(error. Headers. Message); }; // Connect to message server client.connect(login, password, on_connect, on_error, '/');Copy the code

The destinations mentioned above are defined in RabbitMQ Web STOM. There are four types of RabbitMQ Web STOM:

1./exchange/<exchangeName>

For SUBCRIBE frames, the destination is usually in the form of /exchange/<exchangeName>/[/pattern]. The destination creates a unique, auto-deleted queue named <exchangeName> and binds this queue to the given exchange according to pattern to implement message subscription to this queue.

For SEND frames, the destination is usually in the form of /exchange/<exchangeName>/[/routingKey]. In this case the message is sent to the defined Exchange and the routingKey is specified.

2./queue/<queueName>

For the SUBCRIBE frame, Destination defines a <queueName> shared queue and implements a message subscription to that queue.

For a SEND Frame, destination only defines a <queueName> shared queue when sending a message for the first time. The message is sent to the default Exchange, the routingKey being <queueName>.

3./amq/queue/<queueName>

In this case neither SUBCRIBE frame nor SEND Frame will generate a queue. But if the queue does not exist, the SUBCRIBE frame will report an error.

For SUBCRIBE frame, destination implements message subscription to queue <queueName>.

For SEND Frame, messages are sent directly to queue <queueName> using the default exhcange.

4./topic/<topicName>

For SUBCRIBE frames, destination creates an automatically deleted, non-persistent queue and binds it to the AMq. topic exchange according to the Routingkey <topicName>, and implements a subscription to this queue.

For SEND Frame, the message is sent to the AMq. Topic Exchange with the routingKey <topicName>.

Figure 6. The Java side sends messages

Figure 7. Real-time response on JavaScript side

Click for a larger version

summary

WebSocket, as a new generation client-server asynchronous communication method provided by HTML5, can easily complete the two-way communication between the front end and the back end. The RabbitMQ service provides a STOMP plug-in that Bridges webSockets to enable both active push and asynchronous processing of messages. In the traditional Web development, there are many real-time requirements for state changes, such as the need to broadcast its real-time state after the resource is occupied. Using the solution proposed in this paper, it can be convenient to push it to all listening clients. Therefore, in new J2EE development projects, it is recommended to use the solution presented in this article instead of the ajax polling method to refresh the state.

Download resources

  • The example code (stomp.zip | 59 k)

On the topic

  • Refer to the implementation of million-level real-time message push service in Worktile for other message push schemes.
  • For details about the principle of Web long connection, see Long Connection and Long Polling of Web communication.
  • See AMQP and RabbitMQ to learn more about the AMQP protocol.
  • See the RabbitMQ website to learn more about the RabbitMQ message server.
  • See Comet: Server push based on HTTP long Connections to see how Comet implements long connection push.
  • DeveloperWorks Open Source Technology Topics: Find a wealth of operational information, tools, and project updates to help you master open source technologies and apply them to IBM products.