In this article, we begin our practical journey of Spring AMQP project.

Send email to blog.720uenterp and choose Spring AMQP to install RabbitMQ

introduce

Through this project you will learn how to use Spring Boot to integrate Spring AMQP and send emails using RabbitMQ’s message queue mechanism. The message producer is responsible for sending the user’s mail message to the message queue, and the message consumer gets the mail message from the message queue for sending. This process, you can think of as the post office: when the mail you’re about to post is in the mailbox, you can be sure that the mailman will eventually deliver it to the recipient.

To prepare

This tutorial assumes RabbitMQ is already installed and running on localhost on standard port 5672. If you use different hosts, ports, connection Settings will need to be adjusted.

host = localhost
username = guest
password = guest
port = 5672
vhost = /Copy the code

Field trip

The preparatory work

This tutorial will build two projects: email-server-producer and email-server-consumer. Email-server-producer is a message producer project, and email-server-consumer is a message consumer project.

At the end of the tutorial, I’ll submit the complete code to Github, so you can read the tutorial with the source code and get better results.

Now begin the journey. We used Spring Boot to integrate Spring AMQP and built dependencies through Maven. (Due to space issues, I won’t paste the full pop.xml configuration, you can check out the full configuration file at Github source.)

<dependencies>
    <! -- spring boot-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
    </dependency>

    <dependency>
        <groupId>javax.mail</groupId>
        <artifactId>mail</artifactId>
        <version>${javax.mail.version}</version>
    </dependency>

</dependencies>Copy the code

Building message Producers

We configure message producers using Java Config.

@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("mq.host").trim());
        connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
        connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
        connectionFactory.setUsername(env.getProperty("mq.username").trim());
        connectionFactory.setPassword(env.getProperty("mq.password").trim());
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    @Bean
    Queue queue() {
        String name = env.getProperty("mq.queue").trim();
        // Whether to persist
        boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true; 
        // A private queue that can only be used by the creator
        boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false; 
        // Whether the queue is automatically deleted when all consumer clients are disconnected
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false; 
        return new Queue(name, durable, exclusive, autoDelete);
    }

    @Bean
    TopicExchange exchange() {
        String name = env.getProperty("mq.exchange").trim();
        // Whether to persist
        boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())?
                Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true;
        // Whether the queue is automatically deleted when all consumer clients are disconnected
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false;
        return new TopicExchange(name, durable, autoDelete);
    }

    @Bean
    Binding binding() {
        String routekey = env.getProperty("mq.routekey").trim();
        returnBindingBuilder.bind(queue()).to(exchange()).with(routekey); }}Copy the code

Queues, switches, and bindings are defined. In fact, this way Spring AMQP automatically creates a queue or exchange when it doesn’t exist. If you do not want to create them automatically, you can enable queues and switches in the RabbitMQ admin background and comment out the queue() and exchange() methods. In addition, we have removed the configuration information for creating queues or switches from the configuration file application.properties for better scaling. RabbitMQ configuration information is also included.

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.exchange=email_exchange
mq.exchange.durable=true
mq.exchange.autoDelete=false

mq.queue=email_queue
mq.queue.durable=true
mq.queue.exclusive=false
mq.queue.autoDelete=false

mq.routekey=email_routekeyCopy the code

In addition, suppose a producer sends to a switch and a consumer receives messages from a queue. At this point, binding queues to the exchange is critical to connecting these producers and consumers. In Spring AMQP, we define a Binding class to represent these connections. We use BindingBuilder to build a “streaming API” style.

BindingBuilder.bind(queue()).to(exchange()).with(routekey);Copy the code

Now that we’re close to the end, we need to define a way to put the sending mail task into the message queue. At this point, in order to better extend, we define an interface and an implementation class, programming based on the interface.

public interface EmailService {
    /** * The sending task stores the message queue * @param message * @throws Exception */
    void sendEmail(String message) throws Exception;
}Copy the code

Its implementation class overrides the sendEmail() method to transcode the message and write it to the message queue.

@Service
public class EmailServiceImpl implements EmailService{
    private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

    @Resource( name = "rabbitTemplate" )
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.exchange}")
    private String exchange;

    @Value("${mq.routekey}")
    private String routeKey;

    @Override
    public void sendEmail(String message) throws Exception {
        try {
            rabbitTemplate.convertAndSend(exchange, routeKey, message);
        }catch (Exception e){
            logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e)); }}}Copy the code

So, let’s simulate another RESTful API call scenario to simulate a real scenario.

@RestController()
@RequestMapping(value = "/v1/emails")
public class EmailController {

    @Resource
    private EmailService emailService;

    @RequestMapping(method = RequestMethod.POST)
    public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception {
        emailService.sendEmail(jsonObject.toJSONString());
        returnjsonObject; }}Copy the code

Finally, write a main method to get the Spring Boot service up and running.

@RestController
@EnableAutoConfiguration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class WebMain {

    public static void main(String[] args) throws Exception { SpringApplication.run(WebMain.class, args); }}Copy the code

At this point, we’re done. We can send an HTTP request via Postman. (Postman is a powerful Chrome plugin for debugging and sending HTTP requests to web pages.)

{
    "to":"[email protected]"."subject":"email-server-producer"."text":"< HTML >

Mail test

Hello! This is mail test.

"
}Copy the code

See the illustration.

Take a look at RabbitMQ’s administrative background, where an unprocessed message appears. (Address: http://localhost:15672/#/queues)

And, just be sure not to send test messages to my E-mail inbox, which will remain “buoyed” by mail.

Building message consumers

After the message producer is complete, let’s build a message consumer project. Similarly, message consumers are configured using Java Config.

@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("mq.host").trim());
        connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
        connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
        connectionFactory.setUsername(env.getProperty("mq.username").trim());
        connectionFactory.setPassword(env.getProperty("mq.password").trim());
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer(
            @Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception {
        String queueName = env.getProperty("mq.queue").trim();

        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(cachingConnectionFactory());
        simpleMessageListenerContainer.setQueueNames(queueName);
        simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter);
        // Set the manual ACK
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        returnsimpleMessageListenerContainer; }}Copy the code

You’re smart enough to see the difference. There is an additional listenerContainer() method in this code. Yes, it is a listener container that listens to message queues for message processing. Notice how we set up the manual ACK. By default, it uses auto reply, in which the message queue deletes the message from the message queue immediately after it is sent. If the consumer does not send an ACK due to an outage or connection failure, RabbitMQ will re-send the message to the next consumer listening on the queue to ensure the reliability of the message.

Of course, we also define the application.properties configuration file.

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.queue=email_queueCopy the code

In addition, we created a MailMessageListenerAdapter class to consume news.

@Component("mailMessageListenerAdapter")
public class MailMessageListenerAdapter extends MessageListenerAdapter {

    @Resource
    private JavaMailSender mailSender;

    @Value("${mail.username}")
    private String mailUsername;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            // Parse the RabbitMQ message body
            String messageBody = new String(message.getBody());
            MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class);
            // Send an email
            String to =  mailMessageModel.getTo();
            String subject = mailMessageModel.getSubject();
            String text = mailMessageModel.getText();
            sendHtmlMail(to, subject, text);
            / / ACK manually
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch(Exception e){ e.printStackTrace(); }}/** * Send email * @param to * @param subject * @param text * @throws Exception */
    private void sendHtmlMail(String to, String subject, String text) throws Exception {
        MimeMessage mimeMessage = mailSender.createMimeMessage();
        MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
        mimeMessageHelper.setFrom(mailUsername);
        mimeMessageHelper.setTo(to);
        mimeMessageHelper.setSubject(subject);
        mimeMessageHelper.setText(text, true);
        // Send an emailmailSender.send(mimeMessage); }}Copy the code

In the onMessage() method, we do three things:

  1. Parse the message body from the RabbitMQ message queue.
  2. Sends an email to the target mailbox based on the message body.
  3. Manually acknowledge the ACK and let the message queue delete the message.

Here, the jsonObject.toJavaObject () method uses FastJSON to convert the JSON string into the entity object MailMessageModel. Note that @data is an annotation to the Lombok class library.

@Data
public class MailMessageModel {
    @JSONField(name = "from")
    private String from;

    @JSONField(name = "to")
    private String to;

    @JSONField(name = "subject")
    private String subject;

    @JSONField(name = "text")
    private String text;

    @Override
    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("Email{from:").append(this.from).append(",");
        sb.append("to:").append(this.to).append(",");
        sb.append("subject:").append(this.subject).append(",");
        sb.append("text:").append(this.text).append("}");
        returnsb.toString(); }}Copy the code

Spring has good support for Java Mail. There are several types of messages: simple text messages, HTML text messages, embedded images messages, messages containing attachments. Here, we’ve wrapped a simple sendHtmlMail() to send the mail.

Oh, and we’re missing a mail configuration class.

@Configuration
@PropertySource(value = {"classpath:mail.properties"})
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class EmailConfig {
    @Autowired
    private Environment env;

    @Bean(name = "mailSender")
    public JavaMailSender mailSender() {
        // Create a Mail sender, which mainly provides the Mail sending interface, transparent creation of Java Mail MimeMessage, and Mail sending configuration
        JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
        // If the email address is a common email address, for example, non-SSL authentication
        mailSender.setHost(env.getProperty("mail.host").trim());
        mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim()));
        mailSender.setUsername(env.getProperty("mail.username").trim());
        mailSender.setPassword(env.getProperty("mail.password").trim());
        mailSender.setDefaultEncoding("utf-8");
        // Configure the mail server
        Properties props = new Properties();
        // Ask the server to authenticate the user name and password
        props.put("mail.smtp.auth"."true");
        props.put("mail.smtp.timeout"."25000");
        mailSender.setJavaMailProperties(props);
        returnmailSender; }}Copy the code

This configuration information is maintained in the configuration file mail.properties.

mail.host=smtp163..com
mail.port=25Mail. username= username mail.password= passwordCopy the code

Finally, let’s write a main method to get the Spring Boot service running.

At this point, we have also completed a message consumer project that will continuously process mail messages from the message queue.

The source code

Related examples Complete code: github.com/lianggzone/…

(after)

More wonderful articles, all in the “server-side thinking” wechat public account!