Mind mapping

preface

After the Introduction to ZooKeeper, we learned the basic usage of ZooKeeper.

In fact, The application of ZooKeeper is very wide, the implementation of distributed lock is just one of them. ZooKeeper implements distributed locks to solve the oversold problem.

What is the second kill oversold problem

Seckill activity should be familiar, need not explain too much.

It is not difficult to imagine that in such a “kill” scenario, there will actually be multiple users competing for “resources”, that is, multiple threads concurrently, which is prone to inaccurate data, that is, oversold problems.

1.1 Project Demonstration

I use SpringBoot2.0, Mybatis, Mybatis-Plus, SpringMVC to build a simple project, github address:

Github.com/yehongzhi/m…

Create a commodity information table:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT 'Trade Name',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT 'Commodity price',
  `number` int(10) DEFAULT '0' COMMENT 'Quantity of goods',
  `description` varchar(2048) DEFAULT ' ' COMMENT 'Product Description'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Commodity Information Sheet';
Copy the code

Add an item [Barbecued pork package] in:

The core code logic looks like this:

    @Override
    public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //1. Query the number of items in the database
        TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
        //2. Check whether the quantity of goods is greater than 0, or whether the quantity purchased is greater than the inventory
        Integer count = commodityInfo.getNumber();
        if (count <= 0 || number > count) {
            Return false if the quantity of the item is less than or equal to 0, or if the quantity purchased is greater than the quantity in stock
            return false;
        }
        //3. If the quantity in stock is greater than 0 and the quantity purchased is less than or equal to the quantity in stock. Then update the quantity of goods
        count -= number;
        commodityInfo.setNumber(count);
        boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
        if (bool) {
            // If the update succeeds, the print purchase succeeds
            System.out.println("Purchase of goods [" + commodityInfo.getCommodityName() + "] success, quantity:" + number);
        }
        return bool;
    }
Copy the code

The logic diagram is as follows:

This logic is fine if the request is single threaded.

But multithreading is a problem. Now I’ll create multiple threads to make requests via HttpClient and see what happens:

    public static void main(String[] args) throws Exception {
        // Request an address
        String url = "http://localhost:8080/mall/commodity/purchase";
        // Request parameters, item ID, quantity
        Map<String, String> map = new HashMap<>();
        map.put("commodityId"."4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number"."1");
        // Create 10 threads to send requests via HttpClient
        for (int i = 0; i < 10; i++) {
            // The logic of this thread is simply to send the request
            CommodityThread commodityThread = newCommodityThread(url, map); commodityThread.start(); }}Copy the code

To clarify, the quantity of char siu bao is 100, there are 10 threads to purchase at the same time, assuming that all the purchases are successful, the inventory quantity should be 90.

In fact, all 10 threads did buy:

But the inventory of goods in the database is not accurate:

Try using local locks

The general process of the above scenario is as follows:

It can be seen that the problem is caused by two threads “simultaneously” querying the remaining inventory and then updating the inventory. To solve this problem, you just need to ensure that multiple threads execute sequentially in this logic, that is, lock.

The JDK provides two types of local locks: synchronized and Lock.

Either way you can use synchronized for simplicity:

    // Use the synchronized modifier
    @Override
    public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        / / to omit...
    }
Copy the code

Then test the situation of multi-threaded concurrent buying and see the result:

Problem solved!!

You think that’s the end of it? You look at the progress bar, and it’s not easy.

We know that in real projects, more than one server is deployed, so let’s start two servers with port numbers 8080 and 8081 to simulate a real project scenario:

Write an alternate request test script to simulate the scenario where multiple servers process the request separately and users snap up the request:

    public static void main(String[] args) throws Exception {
        // Request an address
        String url = "http://localhost:%s/mall/commodity/purchase";
        // Request parameters, item ID, quantity
        Map<String, String> map = new HashMap<>();
        map.put("commodityId"."4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number"."1");
        // Create 10 threads to send requests via HttpClient
        for (int i = 0; i < 10; i++) {
            // Alternate 8080 and 8081 requests, with each server processing 5 requests
            String port = "808" + (i % 2);
            CommodityThread commodityThread = newCommodityThread(String.format(url, port), map); commodityThread.start(); }}Copy the code

First look at the purchase of the situation, are certainly successful purchase:

The key is whether the inventory quantity is correct:

Ten of the requests were successful. The inventory should be 90. Here it is 95. It turns out that local locks can’t solve the problem of overselling multiple servers.

Here’s why:

In fact, the reason is similar to the multithreading problem, multiple servers to query the database, obtain the same inventory, and then update the inventory, resulting in incorrect data. To ensure the correct number of inventory, the key is to ensure that only one server can perform this logic, that is, to add distributed locks.

This also demonstrates the use of distributed locks to ensure that only one server can execute multiple locks.

Distributed locks are implemented in three ways: Redis, ZooKeeper, and databases (such as mysql).

3. Use ZooKeeper to implement distributed lock

3.1 the principle

In fact, ZooKeeper’s temporary sequential node feature is used to achieve distributed locking. How do you do that?

Suppose you now have client A that needs to be locked, create A temporary order node under the “/Lock” path. Then get the node list under “/Lock”, determine whether their serial number is the smallest, if it is the smallest serial number, then Lock success!

Now you have another client, client B, that needs to be locked, so you also create a temporary order node under the “/Lock” path. Still get the node list under “/Lock” to determine whether their node serial number is the smallest. If it is not found to be the smallest, lock fails, and then listen on its last node.

How to release the lock is to delete the temporary node. Suppose client A releases the lock and deletes node 01. This will trigger a listening event for node 02, and the client will fetch the node list again and determine if it is the smallest number. If it is the smallest number, it will lock.

If multiple clients are actually the same, they will create a temporary node at the beginning, and then start to determine whether they are the smallest serial number, if not, they will listen to the last node, forming a queuing mechanism. This creates the effect of a lock, which ensures that only one server executes from multiple servers.

If one of the clients breaks down, ZooKeeper automatically deletes the temporary node based on the characteristics of the temporary node, which automatically releases the lock.

3.2 Handwritten code to achieve distributed lock

First add Maven dependencies

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>
Copy the code

ZkLock class = ZkLock class

public class ZkLock implements Lock {
    // counter for blocking when locking fails
    private static CountDownLatch cdl = new CountDownLatch(1);
    // IP port of the ZooKeeper server
    private static final String IP_PORT = "127.0.0.1:2181";
    // Lock root path
    private static final String ROOT_NODE = "/Lock";
    // The path to the last node
    private volatile String beforePath;
    // The path of the currently locked node
    private volatile String currPath;
    // Create a ZooKeeper client
    private ZkClient zkClient = new ZkClient(IP_PORT);

    public ZkLock(a) {
        // Determine whether there is a root node
        if(! zkClient.exists(ROOT_NODE)) {// Create one if it does not existzkClient.createPersistent(ROOT_NODE); }}/ / lock
    public void lock(a) {
        if (tryLock()) {
            System.out.println("Lock successful!!");
        } else {
            // Attempt to lock failed
            waitForLock();
            // Try locking againlock(); }}// Try locking
    public synchronized boolean tryLock(a) {
        // Create your own temporary node the first time
        if (StringUtils.isBlank(currPath)) {
            currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/"."lock");
        }
        // Sort the nodes
        List<String> children = zkClient.getChildren(ROOT_NODE);
        Collections.sort(children);

        // If the current node is the smallest, the lock is successfully returned
        if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
            return true;
        } else {
            // If the smallest node is not found, the same is true for the previous one
            int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
            beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
            // Failed to lock
            return false; }}/ / unlock
    public void unlock(a) {
        // Delete the node and close the client
        zkClient.delete(currPath);
        zkClient.close();
    }
    
    // Wait for the lock, lock failed to enter the block, listen on the last node
    private void waitForLock(a) {
        IZkDataListener listener = new IZkDataListener() {
            // Listen for node update events
            public void handleDataChange(String s, Object o) throws Exception {}// Listen for node deletion events
            public void handleDataDeleted(String s) throws Exception {
                // Unblockcdl.countDown(); }};// Listen on the last node
        this.zkClient.subscribeDataChanges(beforePath, listener);
        // Check whether the last node exists
        if (zkClient.exists(beforePath)) {
            // The previous node exists
            try {
                System.out.println("Lock failed to wait");
                // Lock failed, block waiting
                cdl.await();
            } catch(InterruptedException e) { e.printStackTrace(); }}// Release the listener
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly(a) throws InterruptedException {}public Condition newCondition(a) {
        return null; }}Copy the code

Add a lock to the Controller layer:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool;
        // Obtain the ZooKeeper distributed lock
        ZkLock zkLock = new ZkLock();
        try {
            / / lock
            zkLock.lock();
            // Call the service method of the snapper
            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
        } catch (Exception e) {
            e.printStackTrace();
            bool = false;
        } finally {
            / / unlock
            zkLock.unlock();
        }
        return bool;
    }
Copy the code

Test, still up two servers, 8080, 8081. Then run the test script:

    public static void main(String[] args) throws Exception {
        // Request an address
        String url = "http://localhost:%s/mall/commodity/purchase";
        // Request parameters, item ID, quantity
        Map<String, String> map = new HashMap<>();
        map.put("commodityId"."4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number"."1");
        // Create 10 threads to send requests via HttpClient
        for (int i = 0; i < 10; i++) {
            // alternate requests for 8080 and 8081
            String port = "808" + (i % 2);
            CommodityThread commodityThread = newCommodityThread(String.format(url, port), map); commodityThread.start(); }}Copy the code

The result is correct:

3.3 Make good wheels

Curator is an Apache open source framework for operating ZooKeeper. Among them, there is the function of ZooKeeper distributed lock.

Of course, the implementation of distributed lock is only a very small part of this framework, in addition to many uses, we can go to the official website to learn.

First add Maven dependencies:

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.3. 0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3. 0</version>
    </dependency>
Copy the code

Do the same for places that need to be locked:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
                                         @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool = false;
        // Set the retry policy
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000.3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        // Start the client
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
        try {
            / / lock
            if (mutex.acquire(3, TimeUnit.SECONDS)) {
                // Call the snap kill service methodbool = commodityInfoService.purchaseCommodityInfo(commodityId, number); }}catch (Exception e) {
            e.printStackTrace();
        } finally {
            / / unlock
            mutex.release();
            client.close();
        }
        return bool;
    }
Copy the code

Four, encountered pit

I tried to write a distributed lock using native ZooKeeper and it cracked a bit. Encountered a lot of pit, finally give up, use ZKClient API. Maybe I’m too bad at it.

I’m going to share some of the problems I’ve encountered so that you can quickly locate the problem if you encounter the same type of exception.

4.1 the Session expired

This error is caused by using the native ZooKeeper API. It mainly occurs when I enter the debug mode for debugging.

Native ZooKeeper requires a session timeout period. Generally, in debug mode, we will be stuck in a place to debug, which will definitely exceed the session duration

4.2 KeeperErrorCode = ConnectionLoss

This is also a native ZooKeeper API error, how to occur?

The ZooKeeper client is created asynchronously when it connects to the server. Before the connection succeeds, the code executes create() or exists() and then reports this error.

Solution: Block using the CountDownLatch counter, stop blocking once the connection is successful, and then perform an operation like create() or exists().

4.3 Data Inconsistency Occurs during Concurrent Query and Update

This is a really big mistake

At first I added distributed locks to the Service layer and thought I was done. Then start 8080 and 8081 for concurrent tests. All 10 threads were purchased successfully, and the result was incorrect!

At first I thought I had a problem with the code I implemented, so I switched to a distributed lock implemented by the Curator framework. The open source framework should be ok. I didn’t think I could

Since it is not the lock itself, it is not a transaction problem. The next request comes in before the last transaction update inventory operation is committed. So I made the locking a little bit bigger and put it on the Controller layer. It worked!

You may have noticed that my example above is to add distributed locks to the Controller layer, but I don’t really like to write too much code in the Controller layer.

Maybe there is a more elegant way, but MY ability is not enough, if you have a better way to achieve, can share ~

Add: comment below a big guy said, in the original method to wrap a layer, pro test is ok. This should be a transaction issue.

Above on the Controller layer can succeed is because there is no transaction Controller layer, originally written in the service I am wrote a @ Transactional annotation on the class, so the whole class all have affairs, so after unlock haven’t commit the transaction to update the database, Then the next request comes in and finds the unupdated data.

For elegance, place the @Transactional annotation on snap service methods

It then wraps a method with no transactions for locking.

Five, the summary

Finally, let’s review and summarize:

  • First of all, we simulate the second kill scenario of single machine multi-threading. If single machine can use local lock to solve the problem.
  • Then simulate the multi-server multi-thread scenario, the idea is to use ZooKeeper to achieve distributed lock solution.
  • ZooKeeper implements distributed locking.
  • Then start to write code to achieve distributed lock.
  • Finally summarize the pit encountered.

Hope you found this article useful

If you want to see my updated article for the first time, you can search the official account on wechat.Java technology enthusiast“,Refusing to be a salt fish, I’m a programmer trying to be remembered. See you next time!!

Ability is limited, if there is any mistake or improper place, please criticize and correct, study together!