This is the second day of my participation in the August More text Challenge. For details, see:August is more challenging

preface

The text has been included in my GitHub repository. Welcome to Star: github.com/bin39232820… The best time to plant a tree was 10 years ago, followed by now I know a lot of people do not play QQ, but nostalgia, welcome to join the six veins god sword Java novice learning group, group chat number: 549684836 encourage you to write blog on the way to technology

omg

The last chapter has done most of the basic learning, this time let’s learn the real basic case, hee hee.

  • Six six, rocketMQ (one)
  • Rocketmq (2)
  • Leo is taking everyone to rocketMQ (3)

Case introduced

Business analysis

【 Order 】 and 【 Payment 】 in simulated e-commerce site shopping scene

Place the order

  1. User request order system order
  2. The order system invokes the order service through RPC to place an order
  3. The order service invokes the coupon service and deducts the coupon
  4. The order service invocation invokes the inventory service to verify and subtract the inventory
  5. The order service invokes the user service and deducts the user balance
  6. Order service completes confirmation orders

pay

  1. The user requests the payment system
  2. The payment system calls the API of the third-party payment platform to initiate the payment process
  3. After the user successfully pays through the third-party payment platform, the third-party payment platform will call back and notify the payment system
  4. The payment system calls the order service to modify the order state
  5. The payment system calls the credits service to add credits
  6. The payment system calls the logging service to log

Problem analysis

After the user submits the order, the inventory is successfully deducted, the coupon is successfully deducted and the balance is successfully used. However, when the order confirmation operation fails, the inventory, inventory and balance need to be rolled back.

Question 1. How to ensure data integrity?

Use MQ to ensure the integrity of system data after an order failure

Question 2

After the user successfully pays through the third-party payment platform (Alipay, wechat), the third-party payment platform shall asynchronously notify the merchant payment system of the user’s payment result through the callback API, and the payment system shall modify the order status, record the payment log and increase the points to the user according to the payment result.

How does the merchant payment system ensure that when receiving the asynchronous notification from the third-party payment platform, how does it respond quickly to the third-party payment voucher?

Data is distributed through MQ to improve system processing performance

Technical analysis

Technology selection

  • SpringBoot
  • Dubbo
  • Zookeeper
  • RocketMQ
  • Mysql

Integrate SpringBoot RocketMQ

Message producer

Add the dependent

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> < version > 2.0.1. RELEASE < / version > < / parent > < properties > < rocketmq - spring - the boot - the starter version - > 2.0.3 < / rocketmq - spring - the boot - starter - version > < / properties > < dependencies > <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring-boot-starter-version}</version> </dependency> <dependency> < the groupId > org. Projectlombok < / groupId > < artifactId > lombok < / artifactId > < version > 1.18.6 < / version > < / dependency > <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>Copy the code

The configuration file

# application. The properties rocketmq. Name - server = 192.168.25.135:9876; 192.168.25.138:9876 rocketmq. Producer. Group = my - groupCopy the code

Start the class

@SpringBootApplication public class MQProducerApplication { public static void main(String[] args) { SpringApplication.run(MQSpringBootApplication.class); }}Copy the code

The test class

@RunWith(SpringRunner.class) @SpringBootTest(classes = {MQSpringBootApplication.class}) public class ProducerTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void test1(){ rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq"); }}Copy the code

Message consumer

Add the dependent

Same message producer

The configuration file

Same message producer

Start the class

@SpringBootApplication
public class MQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQSpringBootApplication.class);
    }

Copy the code

Message listener

@Slf4j @Component @RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("Receive The message: "+ message); }}Copy the code

Dubbo SpringBoot integration

Download the Dubbo-spring-boot-starter dependency package

Install the Dubbo-spring-boot-starter into the local repository

mvn install -Dmaven.skip.test=true
Copy the code

Set up the Zookeeper cluster

The preparatory work

  1. Install the JDK
  2. Upload Zookeeper to the server
  3. Decompress Zookeeper, create a data directory, and rename the zoo_sample. CFG file in conf to zoo.cfg
  4. To establish/user/local/zookeeper-cluster, copy the decompressed Zookeeper to the following three directories
/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3
Copy the code

Modify/usr/local/zookeeper cluster/zookeeper – 1 / conf/zoo. CFG

clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
Copy the code

Modify/usr/local/zookeeper cluster/zookeeper – 2 / conf/zoo. CFG

clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
Copy the code

Modify/usr/local/zookeeper cluster/zookeeper – 3 / conf/zoo. CFG

clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
Copy the code

Configure the cluster

  1. Create a myID file with contents 1, 2, and 3 in each ZooKeeper data directory. This file is the ID of each server

  2. Configure the client access port and cluster server IP address list in zookeeper’s zoo. CFG file.

    The IP address list of cluster servers is as follows

Server. 1 = 192.168.25.140:2881-3881 for server 2 = 192.168.25.140:2882:3882 server. 3 = 192.168.25.140:2883-3883Copy the code

Description: server. server ID= Server IP address: Communication port between servers: Voting port between servers

Start the cluster

Starting the cluster means starting each instance separately.

RPC Service Interface

public interface IUserService {
    public String sayHello(String name);
}
Copy the code

Service provider

Add the dependent

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> The < version > 2.0.1. RELEASE < / version > < / parent > < dependencies > <! --dubbo--> <dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> The < version > 2.0.0 < / version > < / dependency > <! --spring-boot-stater--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <artifactId>log4j-to-slf4j</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <! --zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> < Exclusions > < Exclusion > <groupId>org.slf4j</groupId> <artifactId> slf4J-log4j12 </artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9< version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <! --API--> <dependency> <groupId>com.itheima.demo</groupId> <artifactId>dubbo-api</artifactId> The < version > 1.0 - the SNAPSHOT < / version > < / dependency > < / dependencies >Copy the code

The configuration file

# application.properties spring.application.name=dubbo-demo-provider spring.dubbo.application.id=dubbo-demo-provider Spring. Dubbo. Application. The name = dubbo - demo - provider spring. The dubbo. Registry. Address = zookeeper: / / 192.168.25.140:2181; Zookeeper: / / 192.168.25.140:2182; Zookeeper: / / 192.168.25.140:2183 spring. The dubbo, server = true spring. The dubbo. Protocol. The name = dubbo spring.dubbo.protocol.port=20880Copy the code

Start the class

@EnableDubboConfiguration
@SpringBootApplication
public class ProviderBootstrap {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(ProviderBootstrap.class,args);
    }

}
Copy the code

The service implementation

@Component
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService{
    @Override
    public String sayHello(String name) {
        return "hello:"+name;
    }
}
Copy the code

Service consumer

Add the dependent

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> </parent> <dependencies> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <! --dubbo--> <dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> < version > 2.0.0 < / version > < / dependency > < the dependency > < groupId > org. Springframework. Boot < / groupId > <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <artifactId>log4j-to-slf4j</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <! --zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> < Exclusions > < Exclusion > <groupId>org.slf4j</groupId> <artifactId> slf4J-log4j12 </artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9< version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <! --API--> <dependency> <groupId>com.itheima.demo</groupId> <artifactId>dubbo-api</artifactId> The < version > 1.0 - the SNAPSHOT < / version > < / dependency > < / dependencies >Copy the code

The configuration file

# application.properties spring.application.name=dubbo-demo-consumer spring.dubbo.application.name=dubbo-demo-consumer Spring. Dubbo. Application. The id = dubbo - demo - consumer spring. The dubbo. Registry. Address = zookeeper: / / 192.168.25.140:2181; Zookeeper: / / 192.168.25.140:2182; Zookeeper: / / 192.168.25.140:2183Copy the code

Start the class

@EnableDubboConfiguration
@SpringBootApplication
public class ConsumerBootstrap {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerBootstrap.class);
    }
}
Copy the code

Controller

@RestController @RequestMapping("/user") public class UserController { @Reference private IUserService userService; @RequestMapping("/sayHello") public String sayHello(String name){ return userService.sayHello(name); }}Copy the code

Environment set up

The database

Hui coupon table

Goods table

The order sheet

Order commodity log sheet

The users table

User balance log table

Order Payment table

MQ message production table

Table of MQ message consumption

Project initialization

The Shop system is based on Maven for project management

3.1.1 Project Viewing

  • Parent project: shop-parent
  • Order system: shop-order-web
  • Payment system: shop-pay-web
  • Coupon service: shop-coupon service
  • Order service: shop-order-service
  • Payment service: shop-pay-service
  • Goods and services: shop-goods-service
  • User service: shop-user-service
  • Entity class: shop-pojo
  • Persistence layer: shop-DAO
  • Interface layer: shop-API
  • Tool project: shop-common

There are 12 systems

Engineering relationship

Mybatis reverse engineering use

Code generation

Using Mybatis reverse engineering to generate CURD persistence layer code for data table

Code to import

  • Import the entity classes into the shop-pojo project
  • Import the corresponding Mapper class and corresponding configuration file in the service layer project

Introduction to Public Classes

  • ID generator

    IDWorker: Twitter snowflake algorithm

  • Exception handling class

    CustomerException: Custom exception class

    CastException: Exception throwing class

  • Constant class

    ShopCode: System status class

  • Response entity class

    Result: Encapsulates the response status and response information

    Order business

Order basic process

The interface definition

  • IOrderService
Public interface IOrderService {/** * confirmOrder * @param order * @return Result */ Result confirmOrder(TradeOrder order); }Copy the code

Business class implementation

@Slf4j @Component @Service(interfaceClass = IOrderService.class) public class OrderServiceImpl implements IOrderService { @Override public Result confirmOrder(TradeOrder order) { //1. Check the order //2. Generate the order try {//3. } catch (Exception e) {//1. //2. Return failure status}}}Copy the code

Check the order

private void checkOrder(TradeOrder order) { //1. If (order==null){castexception.cast (shopcode.shop_order_invalid); } //2. Validate TradeGoods goods = goodsService.findone (order.getGoodSid ()); if(goods==null){ CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST); } //3. TradeUser user = userService.findone (order.getUserid ()); if(user==null){ CastException.cast(ShopCode.SHOP_USER_NO_EXIST); } if(order.getGoodsprice ().compareto (goods.getGoodsprice ())! =0){ CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID); If (order.getGoodsNumber()>= goods.getGoodsnumber ()){ CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH); } log.info(" Verify order passed "); }Copy the code

Generate a reservation

private Long savePreOrder(TradeOrder order) { //1. SetOrderStatus to invisible order.setorderstatus (shopcode.shop_order_no_confirm.getcode ()); //2. Order ID order.setorderId (idworker.nextid ()); BigDecimal shippingFee = calculateShippingFee(order.getorderAmount ()); if (order.getShippingFee().compareTo(shippingFee) ! = 0) { CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID); BigDecimal orderAmount = order.getGoodsprice ().multiply(new BigDecimal(order.getGoodsNumber())); orderAmount.add(shippingFee); if (orderAmount.compareTo(order.getOrderAmount()) ! = 0) { CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID); Long couponId = order.getCouponId(); if (couponId ! = null) { TradeCoupon coupon = couponService.findOne(couponId); If (coupon == null) {castException.cast (shopcode.shop_coupon_no_exist); } // Coupon has been used if ((shopcode.shop_coupon_isused.getCode ().toString()).equals(coupon.getisUsed ().tostring ())) { CastException.cast(ShopCode.SHOP_COUPON_INVALIED); } order.setCouponPaid(coupon.getCouponPrice()); } else { order.setCouponPaid(BigDecimal.ZERO); BigDecimal moneyPaid = order.getMoneypaid (); //5. if (moneyPaid ! = null) {// Compare whether the balance is greater than 0 int r = order.getMoneypaid ().compareto (BigDecimal.zero); If (r == -1) {castexception.cast (shopcode.shop_money_paid_Less_zero); } // The balance is greater than 0 if (r == 1) {// Query user information TradeUser user = userService.findone (order.getUserid ()); if (user == null) { CastException.cast(ShopCode.SHOP_USER_NO_EXIST); } // Compare whether the balance is greater than the user account balance if (user.getUserMoney().compareto (order.getmoneypAid ().longvalue ()) == -1) { CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID); } order.setMoneyPaid(order.getMoneyPaid()); } } else { order.setMoneyPaid(BigDecimal.ZERO); } order.setpayAmount (orderAmount.subtract(order.getCouponpaid ()).subtract(order.getMoneypaid ())); Order.setaddtime (new Date()); Int r = ordermapper.insert (order); if (ShopCode.SHOP_SUCCESS.getCode() ! = r) { CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR); } the info (" order: [" + order. GetOrderId () + "] booking sheet generated success "); return order.getOrderId(); }Copy the code

Deducting the inventory

  • Dubbo invokes goods and services to complete the inventory reduction
private void reduceGoodsNum(TradeOrder order) { TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog(); goodsNumberLog.setGoodsId(order.getGoodsId()); goodsNumberLog.setOrderId(order.getOrderId()); goodsNumberLog.setGoodsNumber(order.getGoodsNumber()); Result result = goodsService.reduceGoodsNum(goodsNumberLog); if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) { CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL); } the info (" order: [" + order. GetOrderId () + "] deduct inventory [" + order. GetGoodsNumber () + "a] success"); }Copy the code

Inventory deduction

@Override public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) { if (goodsNumberLog == null || goodsNumberLog.getGoodsNumber() == null || goodsNumberLog.getOrderId() == null || goodsNumberLog.getGoodsNumber() == null || goodsNumberLog.getGoodsNumber().intValue() <= 0) { CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID); } TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId()); If (goods. GetGoodsNumber () < goodsNumberLog. GetGoodsNumber ()) {/ / inventory shortage CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH); } / / inventory reduction goods. SetGoodsNumber (goods) getGoodsNumber () - goodsNumberLog. GetGoodsNumber ()); goodsMapper.updateByPrimaryKey(goods); / / record inventory operation log goodsNumberLog setGoodsNumber (- (goodsNumberLog. GetGoodsNumber ())); goodsNumberLog.setLogTime(new Date()); goodsNumberLogMapper.insert(goodsNumberLog); return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage()); }Copy the code

Discount coupon

Private void changeCoponStatus(TradeOrder Order) {// Determine whether the user uses the coupon if (! Stringutils.isempty (order.getCouponId())) {TradeCoupon coupon = couponService.findone (order.getCouponId()); coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode()); coupon.setUsedTime(new Date()); coupon.setOrderId(order.getOrderId()); Result result = couponService.changeCouponStatus(coupon); If (result.getSuccess().equals(shopcode.shop_fail.getSuccess ())) {// Failed to use the coupon CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL); } the info (" order: [" + order. GetOrderId () + "] use deduct coupon [" + coupon. GetCouponPrice () + "yuan] success"); }}Copy the code

CouponService changes the status of a coupon

@ Override public Result changeCouponStatus (TradeCoupon coupon) {try {/ legal if/whether the request parameters (coupon = = null | | StringUtils.isEmpty(coupon.getCouponId())) { CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID); } / / coupons update status to use couponMapper. UpdateByPrimaryKey (coupon); return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage()); } catch (Exception e) { return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage()); }}Copy the code

Deduct user balance

  • The balance is deducted through the completion of user services
Private void reduceMoneyPaid(TradeOrder Order) {// Determine whether the balance used in the order is legal if (order.getMoneypaid ()! = null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) { TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog(); userMoneyLog.setOrderId(order.getOrderId()); userMoneyLog.setUserId(order.getUserId()); userMoneyLog.setUseMoney(order.getMoneyPaid()); userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode()); / / deductions balance Result Result = userService. ChangeUserMoney (userMoneyLog); if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) { CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL); } the info (" order: [" + order. GetOrderId () + "deduct the balance [" + order. GetMoneyPaid () +" yuan]] success "); }}Copy the code

UserService UserService,

@ Override public Result changeUserMoney (TradeUserMoneyLog userMoneyLog) {/ legal if/whether the request parameters (userMoneyLog = = null | | userMoneyLog.getUserId() == null || userMoneyLog.getUseMoney() == null || userMoneyLog.getOrderId() == null || userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) { CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);  } // Query whether the order has a payment record TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample(); userMoneyLogExample.createCriteria() .andUserIdEqualTo(userMoneyLog.getUserId()) .andOrderIdEqualTo(userMoneyLog.getOrderId()); int count = userMoneyLogMapper.countByExample(userMoneyLogExample); TradeUser tradeUser = new TradeUser(); tradeUser.setUserId(userMoneyLog.getUserId()); tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue()); / / / / judge balance operation behavior if payment operations 】 【 (userMoneyLog. GetMoneyLogType (.) the equals (ShopCode. SHOP_USER_MONEY_PAID. GetCode ())) {/ / order has been payment, If (count > 0) {castexception.cast (shopcode.shop_order_pay_status_is_pay); } / / user account deductions balance userMapper. ReduceUserMoney (tradeUser); } / / if a refund operation 】 【 (userMoneyLog. GetMoneyLogType (.) the equals (ShopCode. SHOP_USER_MONEY_REFUND. GetCode ())) {/ / if the order is not payment, can't get a refund, throw exceptions if (count == 0) { CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY); } // Prevent multiple refunds userMoneyLogExample = new TradeUserMoneyLogExample(); userMoneyLogExample.createCriteria() .andUserIdEqualTo(userMoneyLog.getUserId()) .andOrderIdEqualTo(userMoneyLog.getOrderId()) .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode()); count = userMoneyLogMapper.countByExample(userMoneyLogExample); if (count > 0) { CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY); } // Add balance to user account userMapper.addUserMoney(tradeUser); } / / record users use balance log userMoneyLog setCreateTime (new Date ()); userMoneyLogMapper.insert(userMoneyLog); return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage()); }Copy the code

Make sure the order

private void updateOrderStatus(TradeOrder order) { order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode()); order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode()); order.setConfirmTime(new Date()); int r = orderMapper.updateByPrimaryKey(order); if (r <= 0) { CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL); } log.info(" Order :["+ order.getorderId ()+"] Status changed successfully "); }Copy the code

Failure compensation mechanism

Message sender

  • Configure the RocketMQ attribute value
Ocketmq. Name - server = 192.168.25.135:9876; 192.168.25.138:9876 rocketmq. Producer. Group = orderProducerGroup mq.order.consumer.group.name=order_orderTopic_cancel_group mq.order.topic=orderTopic mq.order.tag.confirm=order_confirm mq.order.tag.cancel=order_cancelCopy the code
  • Inject template class and attribute value information
 @Autowired
 private RocketMQTemplate rocketMQTemplate;

 @Value("${mq.order.topic}")
 private String topic;

 @Value("${mq.order.tag.cancel}")
 private String cancelTag;
Copy the code
  • Sending the order failure message
@Override public Result confirmOrder(TradeOrder order) { //1. Check the order //2. Generate a reservation try {//3. } catch (Exception e) {CancelOrderMQ CancelOrderMQ = new CancelOrderMQ(); cancelOrderMQ.setOrderId(order.getOrderId()); cancelOrderMQ.setCouponId(order.getCouponId()); cancelOrderMQ.setGoodsId(order.getGoodsId()); cancelOrderMQ.setGoodsNumber(order.getGoodsNumber()); cancelOrderMQ.setUserId(order.getUserId()); cancelOrderMQ.setUserMoney(order.getMoneyPaid()); try { sendMessage(topic, cancelTag, cancelOrderMQ.getOrderId().toString(), JSON.toJSONString(cancelOrderMQ)); } catch (Exception e1) { e1.printStackTrace(); CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL); } return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage()); }}Copy the code
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {// Check whether Topic is null if (stringutils.isEmpty (Topic)) { CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY); } // check whether the message isEmpty if (stringutils.isempty (body)) {castexception.cast (shopcode.shop_mq_message_body_is_empty); } Message Message = new Message(topic, tags, keys, body.getbytes ()); / / send a message rocketMQTemplate. GetProducer (). The send (message); }Copy the code

Consumer receiver

  • Configure the RocketMQ attribute value
Rocketmq. Name - server = 192.168.25.135:9876; 192.168.25.138:9876 mq. Order. Consumer. Group. The name = order_orderTopic_cancel_group mq. Order. Topic = orderTopicCopy the code

Create a listener class to consume the message

@Slf4j @Component @RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING) public class CancelOrderConsumer implements RocketMQListener<MessageExt>{ @Override public void onMessage(MessageExt messageExt) { ... }}Copy the code

Back in stock

Message consumer

@Slf4j @Component @RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING ) public class CancelMQListener implements RocketMQListener<MessageExt>{ @Value("${mq.order.consumer.group.name}") private String groupName; @Autowired private TradeGoodsMapper goodsMapper; @Autowired private TradeMqConsumerLogMapper mqConsumerLogMapper; @Autowired private TradeGoodsNumberLogMapper goodsNumberLogMapper; @Override public void onMessage(MessageExt messageExt) { String msgId=null; String tags=null; String keys=null; String body=null; MsgId = messageExt.getMsgid (); tags= messageExt.getTags(); keys= messageExt.getKeys(); body= new String(messageExt.getBody(),"UTF-8"); Log.info (" Message accepted successfully "); TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey(); //2. Query message consumption records. primaryKey.setMsgTag(tags); primaryKey.setMsgKey(keys); primaryKey.setGroupName(groupName); TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey); if(mqConsumerLog! =null){//3. / / 3.1 to Integer message processing condition status = mqConsumerLog. GetConsumerStatus (); // Do... Return if(shopcode.shop_mq_message_status_success.getCode ().intValue()==status.intValue()){log.info(" message :"+msgId+", already processed "); return; } // Processing... Return the if (ShopCode. SHOP_MQ_MESSAGE_STATUS_PROCESSING. GetCode () intValue () = = status. IntValue ()) { Log.info (" message :"+msgId+", processing "); return; } // Processing failure if(shopcode.shop_mq_message_status_fail.getCode ().intValue()==status.intValue()){// Number of times to obtain the message processing Integer times = mqConsumerLog.getConsumerTimes(); If (times>3){log.info(" message :"+msgId+", no more processing "); return; } mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()); / / use optimistic locking in a database TradeMqConsumerLogExample example = new TradeMqConsumerLogExample (); TradeMqConsumerLogExample.Criteria criteria = example.createCriteria(); criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag()); criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey()); criteria.andGroupNameEqualTo(groupName); criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes()); int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example); If (r < = 0) {/ / unmodified success, other threads concurrent modification the info (" concurrent modification, and later treatment "); }}}else{//4. mqConsumerLog = new TradeMqConsumerLog(); mqConsumerLog.setMsgTag(tags); mqConsumerLog.setMsgKey(keys); mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()); mqConsumerLog.setMsgBody(body); mqConsumerLog.setMsgId(msgId); mqConsumerLog.setConsumerTimes(0); / / the message mqConsumerLogMapper. The process information added to the database insert (mqConsumerLog); MQEntity MQEntity = json.parseObject (body, mqEntity.class); //5. Long goodsId = mqEntity.getGoodsId(); TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId); goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum()); goodsMapper.updateByPrimaryKey(goods); // Record the inventory operation log TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog(); goodsNumberLog.setOrderId(mqEntity.getOrderId()); goodsNumberLog.setGoodsId(goodsId); goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum()); goodsNumberLog.setLogTime(new Date()); goodsNumberLogMapper.insert(goodsNumberLog); / / 6. The processing of the message to successful mqConsumerLog. SetConsumerStatus (ShopCode. SHOP_MQ_MESSAGE_STATUS_SUCCESS. GetCode ()); mqConsumerLog.setConsumerTimestamp(new Date()); mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog); Log.info (" Inventory rollback succeeded "); } catch (Exception e) { e.printStackTrace(); TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey(); primaryKey.setMsgTag(tags); primaryKey.setMsgKey(keys); primaryKey.setGroupName(groupName); TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey); If (mqConsumerLog==null){// the database has no record mqConsumerLog= new TradeMqConsumerLog(); mqConsumerLog.setMsgTag(tags); mqConsumerLog.setMsgKey(keys); mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode()); mqConsumerLog.setMsgBody(body); mqConsumerLog.setMsgId(msgId); mqConsumerLog.setConsumerTimes(1); mqConsumerLogMapper.insert(mqConsumerLog); }else{ mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1); mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog); }}}}Copy the code

And then the other stuff that comes back, I’m not going to write it here, it’s kind of a process

At the end

This is a real case in our project.

Daily for praise

Ok everyone, that’s all this article is about, can see here people ah, are really fans.

Creation is not easy, your support and recognition, is the biggest motivation for my creation, we will see the next article

Six pulse excalibur | article “original” if there are any errors in this blog, please give criticisms, be obliged!