RabbitMQ

Brief introduction:

AMQP(Adcanced Message Queuing Protocol)

AMQP (Advanced Message Queuing Protocol) is an application-layer standard that provides unified messaging services. It is an open application-layer Protocol designed for message-oriented middleware. The client and message-oriented middleware based on this protocol can transmit messages, regardless of different products of client/middleware and different development languages. Implementations of Erlang include RabbitMQ, etc.

JMS

JMS is the JavaMessage Service (JavaMessage Service) application interface. It is an API for message-oriented middleware (MOM) in the Java platform. It is used to send messages and communicate asynchronously between two applications or in distributed systems. JMS is a kind of JavaEE specification, which is similar to JDBC. Many message-oriented middleware implement JMS specification, such as ActiveMQ.RabbitMQ does not officially provide JMS implementation package, but the open source community does

AMQP is different from JMS

  • JMS defines a unified interface to unify message operations; AMQP is to unify the format of data interaction by prescribing protocols

  • JMS restricts the use of the Java language; AMQP is only a protocol and does not specify how to implement it, so it is cross-language.

  • JMS specifies two message modes; The MESSAGE pattern of AMQP is richer

This section describes RabbitMQ concepts

An overview of the

RabbitMQ is a Message Queue developed by Erlang based on the Advanced Message Queue (AMQP) protocol. It is a communication method between applications. Message queues are widely used in distributed system development. RabbitMQ official address: www.rabbitmq.com/ RabbitMQ provides six modes: Simple, Work, Publish/Subscribe, Routing, Topics, RPC remote call (remote call, not MQ; No introduction); Website corresponding mode is introduced: www.rabbitmq.com/getstarted….

Introduction to related Components

Message

A message, which is anonymous, consists of a header and a body. The body of the message is opaque, and the header consists of a set of optional attributes, including routing-key, priority (priority over other messages), delivery-mode (indicating that the message may require persistent storage), and so on.

Publisher

The producer of messages is also a client application that publishes messages to the exchange.

Exchange

A switch that receives messages sent by producers and routes them to queues in the server. The message arrives at the first stop of the broker and is dispatched to the message queue according to the routing key in the query table.

Binding

Binding for association between message queues and exchanges. A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be thought of as a routing table made up of bindings.

Queue

Message queues, used to hold messages until they are sent to consumers. It is the container and destination of the message. A message can be put into one or more queues. The message remains in the queue, waiting for the consumer to connect to the queue to pick it up.

Connection

A network connection, such as a TCP connection, connects producers/consumers to brokers.

Channel

Channel: an independent two-way data channel in a multiplexing connection. The channel is built on the real TCP connection and the virtual connection, AMQP command is sent through the channel, whether it is to publish messages, subscribe queue or receive messages, these actions are completed through the channel. Because it is very expensive for an operating system to establish and destroy TCP, the concept of a channel was introduced to reuse a TCP connection.

Consumer

Message consumer, representing a client application that retrieves a message from a message queue.

Virtual Host

Designed for multi-tenancy and security reasons, virtual hosts represent a batch of exchanges, message queues, and related objects. A virtual host is a separate server domain that shares the same authentication and encryption environment. Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /.

Broker

The application that receives and distributes messages, representing the Message queue Server entity. RabbitMQ Server is Message Broker.

RabbitMQ installation (following steps based on centos7)

1. Install dependent environments

Online Installation dependent environment:

shell
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
Copy the code

2. Install Erlang

Download the following files from the official website

Erlang - 18.3-1. El7. Centos. X86_64. RPM socat 1.7.3.2-5. El7. Lux. X86_64. RPM the rabbitmq server - 3.6.5-1. Noarch. RPMCopy the code

Install command

The RPM - the ivh Erlang - 18.3-1. El7. Centos. X86_64. RPMCopy the code

Error situations

If the following error occurs

The GBLIC version is too low. We can view the GBLIC version of the current machine

strings /lib64/libc.so.6 | grep GLIBC
Copy the code

The current highest version is 2.12, which requires 2.15. Therefore, you need to upgrade Glibc

  • Update installation dependencies with YUM
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
Copy the code

-** Download the RPM package

wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55. el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55 .el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x8 6_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55 .el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55. el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-5 5.el6.x86_64.rpm & wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86 _64.rpm &Copy the code

-** Install the RPM package

Run the following command to run the sudo RPM -uvh *-2.17-55.el6.x86_64. RPM --force --nodeps commandCopy the code
  • After the installation, check the version of glibc. The version of glibc is 2.17
strings /lib/libc.so.6 | grep GLIBC
Copy the code

3. Install the RabbitMQ

RPM -ivh rabbitmq-server-3.6.5-1. Noarch. RPM -ivh rabbitmq-server-3.6.5-1Copy the code

4. Linux commands

Start the

Service rabbitmq-server start service rabbitmq-server stop service rabbitmq-server restart service rabbitmq-server restartCopy the code

Configuration File Introduction

CD/usr/share/doc/the rabbitmq server - 3.6.5 / cp rabbitmq. Config. The example/etc/rabbitmq/rabbitmq configCopy the code

Install the RabbitMQ management interface

Start the Management page

rabbitmq-plugins enable rabbitmq_management
Copy the code

Example Modify the default configuration

Vim/usr/lib/ra bbitmq/lib/rabbitmq_server - 3.6.5 / ebin/rabbit. The appCopy the code

Access Control console

http://IP address: 15672Copy the code

This section describes basic information about the control console

User identity level

1. Super administrator

Can log in the management console, can view all the information, and can operate the user, policy.

2. Monitoring

Log on to the management console and view rabbitMQ node information (number of processes, memory usage, disk usage, etc.)

3. Policymaker

You can log in to the management console and manage policies. However, you cannot view the node information (marked by the red box in the figure above).

4. Management

You can only log in to the management console. You cannot see node information or manage policies.

5, other

Unable to log on to the administrative console, it is usually the ordinary producer and consumer

Add user

Virtual Hosts configuration

concept

Mysql, for example, has the concept of a database and can specify user permissions for operations such as libraries and tables. RabbitMQ has similar permission management; A Virtual message server can be installed in RabbitMQ. Each VirtualHost is an independent RabbitMQ server. Each VirtualHost is isolated from the other. The Exchange, Queue, and Message cannot communicate with each other. Equivalent to mysql db. The Virtual Name usually starts with a slash (/).

Create a Virtual Hosts

Set Virtual Hosts permissions阿鲁纳恰尔邦

RabbitMQ Quick start

Basic Steps

1. Create a project 2. Add corresponding dependencies 3. Write consumer acceptance messages

1. Create a project

2. Add the dependency

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
Copy the code

3. Write producers to send messages

package com.pjh; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException {/* create a ConnectionFactory */ ConnectionFactory ConnectionFactory = new ConnectionFactory(); / * set the RabbitMQ host address, the default is localhost. * / connectionFactory setHost (" 121.196.111.120 "); / * * connection port/connectionFactory. SetPort (5672); / * the virtual host name, the default is / * / connectionFactory setVirtualHost ("/demo "); / * connection user name * / connectionFactory. SetUsername (" guest "); / * connection password * / connectionFactory. SetPassword (" guest "); / * create a Connection * / Connection Connection = connectionFactory. NewConnection (); /* createChannel */ Channel Channel = connection.createchannel (); Parameter 1: queue name, create a queue without changing the queue parameter 2: define persistent queue parameter 3: exclusive connection parameter 4: automatically delete queue when not in use parameter 5: QueueDeclare ("demo",true,false,false,null); /* Message to send */ String message="Hello Word!!" ; /* * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: message other attributes * Parameter 4: */ channel.basicPublish("","demo",null,message.getBytes()); System.out.println(" The following message has been sent: "); System.out.println(message); /* Close the resource */ channel.close(); connection.close(); }}Copy the code

4. Write consumer acceptance messages

import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {/* getConnection */ Connection Connection = getConnection(); /* createChannel */ Channel Channel = connection.createchannel (); /* * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: Automatically delete queue when not in use * Parameter 5: QueueDeclare ("demo",true, false, false, null); DefaultConsumer = new DefaultConsumer(channel){/** * consumerTag tag, While channel.basicConsume we specify the contents of the envelope message packet, from which we obtain the message ID, the message routingkey, the switch, * properties property information * body message */ @override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, Byte [] body) throws IOException {// Route key System.out.println(" Route key: "+ envelope. GetRoutingKey ()); // switch system.out. println(" switch is: "+ envelope. GetExchange ()); // Message id system.out.println (" Message ID: "+ envelope. GetDeliveryTag ()); System.out.println(" Received message: "+ new String(body," utF-8 ")); }}; // Listen for messages /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If the value is set to true, the message will be automatically replied to MQ when it is received. If the value is set to false, the message will be deleted manually. */ String Demo = channel.basicConsume("demo", true, defaultConsumer); System.out.println(demo); // Do not close the resource, should always listen for message channel.close(); connection.close(); } public static Connection getConnection() throws IOException, TimeoutException {/* create a ConnectionFactory */ ConnectionFactory ConnectionFactory = new ConnectionFactory(); / * set the host address, the default for the local address. * / connectionFactory setHost (" 121.196.111.120 "); / * set the connection port number * / connectionFactory. SetPort (5672); / * set up a virtual host name * / connectionFactory setVirtualHost ("/demo "); / * set up the connection user name * / connectionFactory. SetUsername (" guest "); / * set up the connection password * / connectionFactory. SetPassword (" guest "); / * create a connection * / return connectionFactory. NewConnection (); }}Copy the code

This section describes related method parameters

channel.queueDeclare

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
Copy the code

1. The queue:

The queue name; * * 2. Durable:

Whether to persist;

  • whendurable = false, the queue is nonpersistent. Since the queue is stored in memory, it will be lost when RabbitMQ restarts or the server restarts.
  • whendurable = true, the queue is persistent. Queues are not lost when RabbitMQ restarts. When RabbitMQ exits, it stores queue information into Erlang’s own Mnesia database, which will be read when RabbitMQ restarts.

* * 3. The exclusive:

Whether exclusive or not;

  • When EXCLUSIVE = true sets the queue to be exclusive. If a queue is declared to be exclusive, the queue is visible only to the Connection that first declared it, is private, similar to a lock, and is automatically deleted when the Connection is disconnected.

  • When exclusive = false, the queue is set to non-exclusive and can be used by channels with different connections.

Note 2: **

  • Exclusive queues are visible based on connections. Different channels of the same Connection can access exclusive queues created for the same Connection at the same time. Other connections cannot be accessed, forcing access will result in an error:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.; The following declaration is fine:
Channel channel = connection.createChannel(); Channel channel2 = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, true, false, null); channel2.queueDeclare(QUEUE_NAME, false, true, false, null); Channel2.queuedeclare () = if the channel and channel2 are created in different connections, then the above = "channel2.queuedeclare () is an error!!!!!!Copy the code
  • “First time”If a Connection has already declared an exclusive queue, other connections are not allowed to create an exclusive queue of the same name. This is different from normal queues: even though the queue is persistent (durable = true), the exclusive queue is automatically deleted once the connection is closed or the client exits. This queue applies to the scenario where a client sends and reads messages at the same time.

** ** 4. AutoDelete:

Whether to automatically delete; If autoDelete = true, the queue is automatically deleted when all consumers disconnect from it. Note: This is not to say that the queue is automatically deleted when there are no consumer connections to the queue, because the queue is not automatically deleted when producers declare the queue and there are no consumer connection purchases. ** ** 5. Arguments:

Set some other parameters for the queue, For example, X-Rnessage-TTL, X-Expires, X-Rnax-Length, X-Rnax-length-bytes, X-dead-letter-exchange, x-deadletter-routing-key, X – rnax – priority, etc.