RabbitConfig

/ / define a multithreaded consumption message listener factory @ Bean (" mutiListenerFactory ") SimpleRabbitListenerContainerFactory mutiListenerFactory () { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(containerFactory, connectionFactory); containerFactory.setConcurrentConsumers(10); / / open 10 minimum thread consumption news containerFactory. SetMessageConverter (new Jackson2JsonMessageConverter ()); containerFactory.setPrefetchCount(10); containerFactory.setMaxConcurrentConsumers(20); containerFactory.setTxSize(10); return containerFactory; } @bean public Queue deadQueue() {HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-routing-key", environment.getProperty("rabbitMq.deadletter.routingkey")); / / TTL, after the send to args. The put (" x - dead - letter - exchange ", and the environment. The getProperty (the rabbitMq. Deadletter. "exchange")); //args.put("x-message-ttl", 5000); //args. Put ("x-max-length",10); //args. Put ("x-overflow","reject-publish"); / / overflow directly refused to return the new Queue (the environment) getProperty (" the rabbitMq. Deadletter. Queue "), true, false, false, args); } @bean public TopicExchange realDeadExchange() {return new TopicExchange(environment.getProperty("rabbitMq.realQueue.exchange"), true, false); } //3 rountingKey @bean public Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(realDeadExchange()).with(environment.getProperty("rabbitMq.realQueue.routingkey")); } @bean public Queue recvieDeadMsgQueue() {return new Queue("dead.msg.queue",true,false,false); } @bean public TopicExchange recvieDeadMsgExchange() {return new TopicExchange(environment.getProperty("rabbitMq.deadletter.exchange"), true, false); } @bean public Binding recvieDeadMsgBinding() {return BindingBuilder.bind(recvieDeadMsgQueue()).to(recvieDeadMsgExchange()).with(environment.getProperty("rabbitMq.deadletter. routingkey")); }Copy the code

Second, the production end

Private final ConfirmCallback ConfirmCallback = new ConfirmCallback() {@override public void confirm(CorrelationData correlationData, boolean ack, String s) { log.info("correlationData: {}",correlationData); log.info("ack: {} ",ack); if(! Ack){// make some compensation mechanism such as correlationdata.getid (); Log.info (" exception handling....") ); }}}; Private Final ReturnCallback ReturnCallback = new ReturnCallback() {@override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); / / operation... }}; / / send a Message Message Message = MessageBuilder. WithBody (objectMapper. WriteValueAsBytes (gameLotteryInfoEnity)). setDeliveryMode(MessageDeliveryMode.PERSISTENT).setContentEncoding("UTF-8").setContentType("application/json") .setExpiration("15000").build(); / / setExpiration against this message rabbitTemplate. SetMessageConverter (new Jackson2JsonMessageConverter ()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData(gameGiftInfoEnity.getId()+""); //exchange, routingKey, object, correlationData rabbitTemplate.convertAndSend(environment.getProperty("rabbitMq.realQueue.exchange"), environment.getProperty("rabbitMq.realQueue.routingkey"), message, correlationData);Copy the code

Third, the consumer end

@RabbitListener(queues = "dead.msg.queue",containerFactory = "mutiListenerFactory") public void receiverDirectQueue(@Payload Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException { log.info("msg1 : {}", message); try { channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG), true); / / confirmation message gameLotteryInfoMapper. Insert (objectMapper. ReadValue (message. GetBody (), GameLotteryInfoEnity. Class)); } catch (IOException e) {log.info(" an exception occurred while processing the winning message, re-queue the message "); channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG), true, true); }}Copy the code