Distributed locking mechanism

In a distributed project, when we deal with thread-safety issues, locking is not feasible, because our program runs on multiple machines, each machine gets a different lock object, so locking cannot avoid thread-safety problems, so we need to introduce a distributed lock solution

Introduce the distributed locking case

// Inventory deduction
package com.zhj.bean;

/ * * *@author zhj
 */
public class Stock {
    / / inventory
    private static int num = 1;

    // Reduce inventory
    public boolean reduceStock(a) {
        if (num > 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num--;
            return true;
        } else {
            return false; }}}Copy the code

The test method

package com.zhj.test;

import com.zhj.bean.Stock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@author zhj
 */
public class stockMain {

    private static Lock lock = new ReentrantLock();

    static class StockTask implements Runnable {

        @Override
        public void run(a) {

            lock.lock();

            try {
                boolean b = new Stock().reduceStock();
                if (b) {
                    System.out.println(Thread.currentThread().getName() + "Inventory reduction success.");
                } else {
                    System.out.println(Thread.currentThread().getName() + "Inventory reduction failure"); }}finally{ lock.unlock(); }}}public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            new Thread(new StockTask(),"Thread"+ i).start(); }}}Copy the code

1. Use the database to solve the distributed lock problem

Create data table

CREATE TABLE `lock_db`.`lock_record`  (
  `id` int NOT NULL.`lock_name` varchar(255) NOT NULL COMMENT 'Unique lock name',
  PRIMARY KEY (`id`),
  UNIQUE INDEX `unique_lock_name`(`lock_name`) COMMENT 'Unique index'
);
Copy the code

A unique piece of data is added to the database to lock the database. When unlocked, the data is deleted

package com.zhj.lock;

import com.zhj.bean.LockRecord;
import com.zhj.mapper.LockRecordMapper;
import org.springframework.stereotype.Component;
import tk.mybatis.mapper.entity.Example;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/ * * *@author zhj
 */
@Component
public class DBLock implements Lock {

    private static final String LOCK_NAME = "db_lock_stock";
    @Resource
    private LockRecordMapper lockRecordMapper;

    @Override
    public void lock(a) {
        while (true) {
            boolean flag = tryLock();
            if (flag) {
                try {
                    LockRecord lockRecord = new LockRecord();
                    lockRecord.setLockName(LOCK_NAME);
                    lockRecordMapper.insert(lockRecord);
                    return;
                } catch (Exception e) {
                    System.out.println("-- -- -- -- -- -- --"); }}else {
                System.out.println("Waiting..."); }}}@Override
    public void lockInterruptibly(a) throws InterruptedException {}/** * Attempts to acquire the lock and initiates a query on the table with the specified name *@return* /
    @Override
    public boolean tryLock(a) {
        Example example = new Example(LockRecord.class);
        example.createCriteria().andEqualTo("lockName", LOCK_NAME);
        LockRecord lockRecord = lockRecordMapper.selectOneByExample(example);
        if (lockRecord == null) {
            return true;
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock(a) {
        Example example = new Example(LockRecord.class);
        example.createCriteria().andEqualTo("lockName", LOCK_NAME);
        lockRecordMapper.deleteByExample(example);
    }

    @Override
    public Condition newCondition(a) {
        return null; }}Copy the code

2. Use Redis to solve the distributed lock problem

Avoid deadlocks by setting expiration time

package com.zhj.lock;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/ * * *@author zhj
 */
@Component
public class RedisLock implements Lock {

    private static final String LOCK_NAME = "redis_lock_stock";

    @Resource
    private RedisTemplate redisTemplate;

    @Override
    public void lock(a) {
        while (true) {

            // Unlock is not executed. You can prevent a deadlock by setting the expiration time
            // Boolean flag = redisTemplate.opsForValue().setIfAbsent("lockName", LOCK_NAME);
            Boolean flag = redisTemplate.opsForValue().setIfAbsent("lockName", LOCK_NAME,15,TimeUnit.SECONDS);
            if (flag) {
                return;
            } else {
                System.out.println("Waiting for redis lock"); }}}@Override
    public void lockInterruptibly(a) throws InterruptedException {}@Override
    public boolean tryLock(a) {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock(a) {
        redisTemplate.delete("lockName");
    }

    @Override
    public Condition newCondition(a) {
        return null; }}Copy the code

Use the official recommended Redisson implementation

In order to solve the single point problem, we often use redis cluster, so when using Redis, there may be an event of the primary node suddenly hanging, and there is no time to synchronize data. In this way, two clients will have the same time, and the phenomenon of oversold will occur. Redis officially delayed Redlock algorithm to solve distributed problems of Redisson

1. Import the dependency

</groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId> </groupId>Copy the code

2. Use

package com.zhj.test;

import com.zhj.bean.Stock;
import com.zhj.lock.RedisLock;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.config.Config;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import javax.annotation.Resource;

/ * * *@author zhj
 */
public class stockMain {

    // private static Lock lock = new ReentrantLock();
    // Distributed lock
    @Resource
    private static RedisLock redisLock;
    // private static DBLock dbLock;
    private static RLock rLock;

    static {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        // dbLock = classPathXmlApplicationContext.getBean(DBLock.class);
        redisLock = classPathXmlApplicationContext.getBean(RedisLock.class);
        Config config = new Config();
        config.useSingleServer().setAddress("Redis: / / 127.0.0.1:6379").setDatabase(1);
        Redisson redisson = (Redisson) Redisson.create(config);
        rLock = redisson.getLock("redis_lock_stock");
    }

    static class StockTask implements Runnable {

        @Override
        public void run(a) {

            // dbLock.lock();
            // redisLock.lock();
            rLock.lock();

            boolean b = new Stock().reduceStock();

            // dbLock.unlock();
            // redisLock.unlock();
            rLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName() + "Inventory reduction success.");
            } else {
                System.out.println(Thread.currentThread().getName() + "Inventory reduction failure"); }}}public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            new Thread(new StockTask(),"Thread"+ i).start(); }}}Copy the code

3. Use ZooKeeper to implement distributed locks

An ordered temporary node is created for each thread. To ensure order, after the node is created, it goes back to all the nodes and sorts them again. During the sorting process, each thread has to determine whether the serial number of the remaining temporary nodes is the smallest. If it is up to date, the lock is acquired and the operation is performed to release the lock. If it’s not the smallest, it listens to the node before it, gets the lock when the current one is deleted, and so on

package com.zhj.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/ * * *@author zhj
 */
public class ZKLock implements Lock {
    / / the client
    private ZooKeeper zk;
    // Zk is a directory structure, locks
    private String root = "/locks";
    // Name of the lock
    private String lockName;
    // The sequence node created by the current thread
    private ThreadLocal<String> nodeId = new ThreadLocal<>();
    // Wait for zkClient to connect to the server synchronously
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    private final static int sessionTimeout = 3000;
    private final static byte[] data = new byte[0];

    public ZKLock(String config, String lockName) {
        this.lockName = lockName;

        try {
            zk = new ZooKeeper(config, sessionTimeout, new Watcher() {

                @Override
                public void process(WatchedEvent event) {
                    // Establish the connection
                    if(event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); }}}); connectedSignal.await(); Stat stat = zk.exists(root,false);
            if (null == stat) {
                // Create the root nodezk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }}catch (Exception e) {
            throw newRuntimeException(e); }}// Listen for temporary node deletion
    class LockWatcher implements Watcher {
        private CountDownLatch latch = null;
        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted) { latch.countDown(); }}}@Override
    public void lock(a) {
        try {
            // Create temporary child nodes
            String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName()+myNode+ "created");

            // Fetch all child nodes
            List<String> subNodes = zk.getChildren(root, false);
            TreeSet<String> sortedNodes = new TreeSet<>();
            for(String node :subNodes) {
                sortedNodes.add(root +"/" +node);
            }

            String smallNode = sortedNodes.first();

            if (myNode.equals(smallNode)) {
                // If it is the smallest node, the lock is obtained
                System.out.println(Thread.currentThread().getName()+ myNode+"get lock");
                this.nodeId.set(myNode);
                return;
            }

            String preNode = sortedNodes.lower(myNode);

            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode, new LockWatcher(latch));// Register the listener simultaneously.
            // Check whether a node smaller than oneself exists. If not, register for listening without waiting for the lock
            if(stat ! =null) {
                System.out.println(Thread.currentThread().getName()+myNode+
                        " waiting for " + root + "/" + preNode + " released lock");

                latch.await();// Wait, where you should always wait for another thread to release the lock
                nodeId.set(myNode);
                latch = null; }}catch (Exception e) {
            throw newRuntimeException(e); }}@Override
    public void lockInterruptibly(a) throws InterruptedException {}@Override
    public boolean tryLock(a) {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock(a) {
        try {
            System.out.println(Thread.currentThread().getName()+ "unlock ");
            if (null! = nodeId) { zk.delete(nodeId.get(), -1);
            }
            nodeId.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch(KeeperException e) { e.printStackTrace(); }}@Override
    public Condition newCondition(a) {
        return null; }}Copy the code

Distributed queue

Queue feature: first-in, first-out