Moment For Technology

Microservice - Distributed Lock (3) -Zookeeper solution

Posted on Dec. 2, 2022, 10:27 a.m. by Renee Hayre
Category: The back-end Tag: The back-end Micro service

1 Apache-Curator

As mentioned above, with the help of temporary sequential nodes, concurrent contention lock of multiple nodes can be avoided, which relieves the pressure on the server. This implementation of all locking requests are queued locking, is a specific implementation of fair locking. Common locks provided in Apache-Exhibit exhibit are:

  • InterProcessMutex: is the implementation of fair locking. Reentrant, exclusive lock
  • InterProcessSemaphoreMutex: non-reentrant, an exclusive lock
  • InterProcessReadWriteLock: read-write lock
  • InterProcessSemaphoreV2: shared semaphore
  • InterProcessMultiLock: Multiple shared locks (container that manages multiple locks as a single entity)

2 Use Cases

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DistributedLockDemo {

	// The path of the ZooKeeper lock node. All operations related to distributed locks are performed on this node
	private final String lockPath = "/distributed-lock";
	// The address of the ZooKeeper service is :(127.0.0.1:2181),
	/ / cluster format for: (127.0.0.1:2181127.00 0.1:2182127.00 0.1:2183)
	private String connectString="127.0.0.1:2181";
	// Exhibit client re-test strategy
	private RetryPolicy retry;
	// Exhibit the client object
	private CuratorFramework client1;
	// Client2 user impersonates other clients
	private CuratorFramework client2;

	// Initialize the resource
	@Before
	public void init(a) throws Exception {
		// Retry policy
		// The initial sleep time is 1000ms and the maximum retry times is 3
		retry = new ExponentialBackoffRetry(1000.3);
		// Create a client with 60000(ms) for session timeout and 15000(ms) for link timeout
		client1 = CuratorFrameworkFactory.newClient(connectString, 60000.15000, retry);
		client2 = CuratorFrameworkFactory.newClient(connectString, 60000.15000, retry);
		// Create a session
		client1.start();
		client2.start();
	}

	// Release resources
	@After
	public void close(a) {
		CloseableUtils.closeQuietly(client1);
	}

	/** * InterProcessMutex: reentrant, exclusive lock */
	@Test
	public void sharedReentrantLock(a) throws Exception {
		// Create a reentrant lock
		InterProcessMutex lock1 = new InterProcessMutex(client1, lockPath);
		// Lock2 is used to simulate other clients
		InterProcessMutex lock2 = new InterProcessMutex(client2, lockPath);
		
		// lock1 Obtains the lock
		lock1.acquire();
		try {
			// lock1 acquires the lock the second time
			lock1.acquire();
			try {
				// Lock2 times out to obtain the lock. Because the lock has been occupied by lock1, lock2 fails to obtain the lock and needs to wait for lock1 to be released
				Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
			} finally{ lock1.release(); }}finally {
			// If a lock is acquired twice and released once, then the lock is still occupied.
			// If you comment the following line of code, you will find lock2 below
			// Failed to acquire the lock
			lock1.release();
		}
		
		// Lock2 can acquire the lock after lock1 is released
		Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
		lock2.release();
	}
	
	/ * * * InterProcessSemaphoreMutex: non-reentrant, * / exclusive lock
	@Test
	public void sharedLock(a) throws Exception {
		InterProcessSemaphoreMutex lock1 = new InterProcessSemaphoreMutex(client1, lockPath);
		// Lock2 is used to simulate other clients
		InterProcessSemaphoreMutex lock2 = new InterProcessSemaphoreMutex(client2, lockPath);

		// Obtain the lock object
		lock1.acquire();

		// Test whether reentrant is possible
		// Return false because the lock has been acquired
		Assert.assertFalse(lock1.acquire(2, TimeUnit.SECONDS));// Lock1 returns false
		Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));// Lock2 returns false

		// lock1 Releases the lock
		lock1.release();

		// Lock2 succeeded in trying to acquire the lock because it was already released
		Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));// Returns true
		lock2.release();
		System.out.println("Test over.");
	}

	/ * * * InterProcessReadWriteLock: read-write lock. * features: read and write locks, reentrant * /
	@Test
	public void sharedReentrantReadWriteLock(a) throws Exception {
		// Create a read-write lock object, which is implemented as a fair lock
		InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client1, lockPath);
		// Lock2 is used to simulate other clients
		InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);
		
		// Use lock1 to simulate read operations
		// Use lock2 to simulate write operations
		// Acquire the read lock (implemented using InterProcessMutex, so it is reentrant)
		final InterProcessLock readLock = lock1.readLock();
		// Obtain the write lock (implemented using InterProcessMutex, so it is reentrant)
		final InterProcessLock writeLock = lock2.writeLock();

		/** * Read and write the lock test object */
		class ReadWriteLockTest {
			// Test the data change field
			private Integer testData = 0;
			private SetThread threadSet = new HashSet();

			// Write data
			private void write(a) throws Exception {
				writeLock.acquire();
				try {
					Thread.sleep(10);
					testData++;
					System.out.println("Write data \t" + testData);
				} finally{ writeLock.release(); }}// Read the data
			private void read(a) throws Exception {
				readLock.acquire();
				try {
					Thread.sleep(10);
					System.out.println("Read data \t" + testData);
				} finally{ readLock.release(); }}// Wait for the end of the thread to prevent the current thread from exiting directly after the test call completes, causing the console to be unable to output information
			public void waitThread(a) throws InterruptedException {
				for(Thread thread : threadSet) { thread.join(); }}// Create the thread method
			private void createThread(final int type) {
				Thread thread = new Thread(new Runnable() {
					@Override
					public void run(a) {
						try {
							if (type == 1) {
								write();
							} else{ read(); }}catch(Exception e) { e.printStackTrace(); }}}); threadSet.add(thread); thread.start(); }// Test method
			public void test(a) {
				for (int i = 0; i  5; i++) {
					createThread(1);
				}
				for (int i = 0; i  5; i++) {
					createThread(2);
				}
			}
		}

		ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
		readWriteLockTest.test();
		readWriteLockTest.waitThread();
	}

	/** * InterProcessSemaphoreV2 shared semaphore */
	@Test
	public void semaphore(a) throws Exception {
		// Create a semaphore that is implemented as a fair lock
		InterProcessSemaphoreV2 semaphore1 = new InterProcessSemaphoreV2(client1, lockPath, 6);
		// Semaphore2 is used to simulate other clients
		InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);

		// Obtain a license
		Lease lease1 = semaphore1.acquire();
		Assert.assertNotNull(lease1);
		/ / semaphore. GetParticipantNodes () returns the current participate in the node list of semaphore, the two clients in the same information
		Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());

		// Time out to get a license
		Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);
		Assert.assertNotNull(lease2);
		Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());

		// Obtain multiple licenses. Parameter: number of licenses
		CollectionLease leases = semaphore1.acquire(2);
		Assert.assertTrue(leases.size() == 2);
		Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());

		// Time out to get multiple licenses. The first parameter is the number of licenses
		CollectionLease leases2 = semaphore2.acquire(2.2, TimeUnit.SECONDS);
		Assert.assertTrue(leases2.size() == 2);
		Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());

		Semaphore currently has 3 licenses, Semaphore2 has 3 licenses, making a total of 6, so they can't license any more
		Assert.assertNull(semaphore1.acquire(2, TimeUnit.SECONDS));
		Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));

		// Release a license
		semaphore1.returnLease(lease1);
		semaphore2.returnLease(lease2);
		// Release multiple licenses
		semaphore1.returnAll(leases);
		semaphore2.returnAll(leases2);
	}

	/ * * * InterProcessMutex reentrant, an exclusive lock * InterProcessSemaphoreMutex: non-reentrant, an exclusive lock * InterProcessMultiLock: Multiple shared locks (containers that manage multiple locks as a single entity) */
	@Test
	public void multiLock(a) throws Exception {
		InterProcessMutex mutex = new InterProcessMutex(client1, lockPath);
		InterProcessSemaphoreMutex semaphoreMutex = new InterProcessSemaphoreMutex(client2, lockPath);
		// Insert the above two locks into it
		InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(mutex, semaphoreMutex));
		// Get all locks in the parameter set
		multiLock.acquire();
		// The entire multiLock is non-reentrant because there is a non-reentrant lock
		Assert.assertFalse(multiLock.acquire(2, TimeUnit.SECONDS));
		// Mutex is a reentrant lock, so it can continue to acquire locks
		Assert.assertTrue(mutex.acquire(2, TimeUnit.SECONDS));
		SemaphoreMutex failed to obtain the lock because the lock is non-reentrant
		Assert.assertFalse(semaphoreMutex.acquire(2, TimeUnit.SECONDS));
		// Release all locks in the parameter set
		multiLock.release();
		// The lock in interProcessLock2 has been released, so it can be acquired
		Assert.assertTrue(semaphoreMutex.acquire(2, TimeUnit.SECONDS)); }}Copy the code

More on JAVA, high concurrency, microservices, architectures, solutions, middleware at github.com/yu120/lemon...

Search
About
mo4tech.com (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.