background

Just recently, ZooKeeper may be used in the project, so I sorted out the previous ZK materials, and everyone is interested in taking a look.

Introduction to Basic Concepts

ZooKeeper is a distributed, open source distributed application collaboration service. ZooKeeper is designed to encapsulate complex and error-prone distributed consistency services into an efficient and reliable set of primitives that can be delivered to users in a series of easy-to-use interfaces. Hereinafter referred to as ZK.

Typical Application Scenarios

  • Configuration management, similar to a database

  • DNS service

  • Group member Management

  • A distributed lock

    As ZK data are stored in memory, the data volume is mostly maintained in hundreds of megabytes, and the database data of tens of GB is also common.

ZK cluster setup

Docker-compose is strongly recommended for development and testing, which is very convenient. Production requires multiple independent machines for cluster implementation, otherwise there would be no need to use ZK.

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181: 2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: Server. 1 = 0.0.0.0:2888-3888; 2181 server.2=zoo2:2888:3888; 2181 server.3=zoo3:2888:3888; 2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182: 2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888; 2181 Server. 2 = 0.0.0.0:2888-3888; 2181 server.3=zoo3:2888:3888; 2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183: 2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888; 2181 server.2=zoo2:2888:3888; 2181 Server. 3 = 0.0.0.0:2888-3888; 2181
Copy the code

ZK interaction based on Java

Rely on the library

dependencies {
  implementation "Org. Apache. They are: they are: 2.6.2." "
  implementation "Org. Apache. Curator: curator - recipes: 4.2.0"
  implementation "Org. Apache. Curator: curator - x - discovery: 4.2.0"
  implementation "Org. Apache. Curator: curator - x - discovery - server: 4.2.0"

  testImplementation "Junit: junit: 4.12"
  testImplementation "com.google.truth:truth:1.0"
}
Copy the code

The original API

Basic test cases

package org.yao;

import com.google.common.collect.ImmutableList;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.truth.Truth.assertThat;

public class ZooKeeperTests {
  private String pathPrefix = "/multi";
  private ZooKeeper zk;
  private CountDownLatch startLatch;
  private CountDownLatch closeLatch;
  private AsyncCallback.MultiCallback callback;

  private String path1 = pathPrefix + "1";
  private String path2 = pathPrefix + "2";
  private byte[] data1 = {0x1};
  private byte[] data2 = {0x2};

  @Before
  public void setUp(a) throws Exception {
    // Register the callback, and after the callback, stop waiting
    startLatch = new CountDownLatch(1);
    callback =
        (int rc, String path, Object ctx, List<OpResult> opResults) -> {
          assertThat(rc).isEqualTo(KeeperException.Code.OK.intValue());
          System.out.printf("delete multi executed");
          closeLatch.countDown();
        };
    zk = new ZooKeeper("localhost".2181.new DefaultWatcher());
    startLatch.await();
  }

  @After
  public void tearDown(a) throws Exception {
      // Clean up zk links
    closeLatch.await();
    zk.close();
  }

  @Test
  public void testMulti(a) throws Exception {
    closeLatch = new CountDownLatch(1);

    // Create two znodes
    Op createOp1 = Op.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    Op createOp2 = Op.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    // Synchronous API
    zk.multi(ImmutableList.of(createOp1, createOp2));
    System.out.println("create multi executed");

    assertThat(zk.getData(path1, false.null)).isEqualTo(data1);
    assertThat(zk.getData(path2, false.null)).isEqualTo(data2);

    // Delete two znodes
    Op deleteOp1 = Op.delete(path1, -1);
    Op deleteOp2 = Op.delete(path2, -1);

    // Asynchronous API
    zk.multi(ImmutableList.of(deleteOp1, deleteOp2), callback, null);
  }

  @Test
  public void testTransaction(a) throws Exception {
    closeLatch = new CountDownLatch(1);

    // Create two znodes
    Transaction tx = zk.transaction();
    tx.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    tx.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    // Synchronous API
    tx.commit();
    System.out.println("transaction committed");

    assertThat(zk.getData(path1, false.null)).isEqualTo(data1);
    assertThat(zk.getData(path2, false.null)).isEqualTo(data2);

    // Delete two znodes
    tx = zk.transaction();
    tx.delete(path1, -1);
    tx.delete(path2, -1);

    // Asynchronous API
    tx.commit(callback, null);
  }

  @Test
  public void testTransactionWithCheck(a) throws Exception {
    closeLatch = new CountDownLatch(0);

    {
      Transaction tx = zk.transaction();
      tx.create(path1, data1, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      tx.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      tx.check(path1, 0);
      tx.check(path2, 0);
      tx.commit();
    }

    {
      Transaction tx = zk.transaction();
      tx.check(path1, 0);
      tx.check(path2, 0);
      tx.delete(path1, 0);
      tx.delete(path2, 0); tx.commit(); }}/** * getChildren does not list descendants recursively. */
  @Test
  public void testGetChilren(a) throws Exception {
    closeLatch = new CountDownLatch(0);
    List<String> paths = zk.getChildren("/a".false);
    System.out.printf("child paths: %s\n", paths);
  }

  class DefaultWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
      if (event.getType() == Event.EventType.None
          && event.getState() == Event.KeeperState.SyncConnected) {
        System.out.println("zookeeper client connected"); startLatch.countDown(); }}}}Copy the code

Curator of the API

Curator introduce

Curator is a Java client library for ZooKeeper. Curator aims to simplify the use of ZK. Code that does not use a co-curator handles its own ConnectionLossException. And provides a lock, service discovery encapsulation of the more complete implementation, to avoid duplication of wheel.

The test case

package org.yao;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

import static com.google.common.truth.Truth.assertThat;

/** * Example code to demonstrate the usage of Curator client and framework. */
public class CuratorTests {
  private CuratorFramework client;
  private String connectString = "localhost:2181";
  private RetryPolicy retryPolicy;

  @Before
  public void setUp(a) {
    retryPolicy = new ExponentialBackoffRetry(1000.3);
    client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);

    /* // Fluent style client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(retryPolicy) .build(); * /

    // Start client
    client.start();
  }

  @After
  public void tearDown(a) {
    client.close();
  }

  // create -> getData -> delete in synchronous mode
  @Test
  public void testSyncOp(a) throws Exception {
    String path = "/one";
    byte[] data = {'1'};
    client.create().withMode(CreateMode.PERSISTENT).forPath(path, data);

    byte[] actualData = client.getData().forPath(path);
    assertThat(data).isEqualTo(actualData);

    client.delete().forPath(path);

    client.close();
  }


  // create -> getData -> delete in asynchronous mode
  @Test
  public void testAsyncOp(a) throws Exception {
    String path = "/two";
    final byte[] data = {'2'};
    final CountDownLatch latch = new CountDownLatch(1);

    // Use listener only for callbacks
    client
        .getCuratorListenable()
        .addListener(
            (CuratorFramework c, CuratorEvent event) -> {
              switch (event.getType()) {
                case CREATE:
                  System.out.printf("znode '%s' created\n", event.getPath());
                  // 2. getData
                  c.getData().inBackground().forPath(event.getPath());
                  break;
                case GET_DATA:
                  System.out.printf("got the data of znode '%s'\n", event.getPath());
                  assertThat(event.getData()).isEqualTo(data);
                  // 3. Delete
                  c.delete().inBackground().forPath(path);
                  break;
                case DELETE:
                  System.out.printf("znode '%s' deleted\n", event.getPath());
                  latch.countDown();
                  break; }});// 1. create
    client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path, data);

    latch.await();

    client.close();
  }

  @Test
  public void testWatch(a) throws Exception {
    String path = "/three";
    byte[] data = {'3'};
    byte[] newData = {'4'};
    CountDownLatch latch = new CountDownLatch(1);

    // Use listener only for watches
    client
        .getCuratorListenable()
        .addListener(
            (CuratorFramework c, CuratorEvent event) -> {
              switch (event.getType()) {
                case WATCHED:
                  WatchedEvent we = event.getWatchedEvent();
                  System.out.println("watched event: " + we);
                  if (we.getType() == Watcher.Event.EventType.NodeDataChanged
                      && we.getPath().equals(path)) {
                    // 4. watch triggered
                    System.out.printf("got the event for the triggered watch\n");
                    byte[] actualData = c.getData().forPath(path);
                    assertThat(actualData).isEqualTo(newData);
                  }
                  latch.countDown();
                  break; }});// 1. create
    client.create().withMode(CreateMode.PERSISTENT).forPath(path, data);
    // 2. getData and register a watch
    byte[] actualData = client.getData().watched().forPath(path);
    assertThat(actualData).isEqualTo(data);

    // 3. setData
    client.setData().forPath(path, newData);
    latch.await();

    // 5. delete
    client.delete().forPath(path);
  }

  @Test
  public void testCallbackAndWatch(a) throws Exception {
    String path = "/four";
    byte[] data = {'4'};
    byte[] newData = {'5'};
    CountDownLatch latch = new CountDownLatch(2);

    // Use listener for both callbacks and watches
    client
        .getCuratorListenable()
        .addListener(
            (CuratorFramework c, CuratorEvent event) -> {
              switch (event.getType()) {
                case CREATE:
                  // 2. callback for create
                  System.out.printf("znode '%s' created\n", event.getPath());
                  // 3. getData and register a watch
                  assertThat(client.getData().watched().forPath(path)).isEqualTo(data);
                  // 4. setData
                  client.setData().forPath(path, newData);
                  latch.countDown();
                  break;
                case WATCHED:
                  WatchedEvent we = event.getWatchedEvent();
                  System.out.println("watched event: " + we);
                  if (we.getType() == Watcher.Event.EventType.NodeDataChanged
                      && we.getPath().equals(path)) {
                    // 5. watch triggered
                    System.out.printf("got the event for the triggered watch\n");
                    assertThat(c.getData().forPath(path)).isEqualTo(newData);
                  }
                  latch.countDown();
                  break; }});// 1. create
    client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path, data);

    latch.await();

    // 6. deleteclient.delete().forPath(path); }}Copy the code

A test case for distributed locks

package com.zew.learn;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.Test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CuratorLockTests {
    private final String connectString = "10.0.4.162:2181";
    private final String lockPath = "/locks";
    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;


    @Test
    public void lockTest(a) {
        // Create a QTY thread pool to simulate a concurrent terminal scenario
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final FakeLimitedResource resource = new FakeLimitedResource();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = () -> {
                    // Create a client for a curator
                    CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000.3));
                    try {
                        client.start();

                        ExampleClientThatLocks example = new ExampleClientThatLocks(client, lockPath, resource, "Client " + index);
                        for (int j = 0; j < REPETITIONS; ++j) {
                            example.doWork(10, TimeUnit.SECONDS); }}catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Exception e) {
                        e.printStackTrace();
                        // log or do something
                    } finally {
                        CloseableUtils.closeQuietly(client);
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        assertTrue(true);
    }

    static class FakeLimitedResource {
        private final AtomicBoolean inUse = new AtomicBoolean(false);
        // Simulate finite resource usage, with a random time interval
        public void use(a) throws InterruptedException {
            if(! inUse.compareAndSet(false.true)) {
                fail("Only one client can use the lock, so this means zK is letting multiple clients get the lock.");
                throw new IllegalStateException("Only one client can use this lock");

            }

            try {
                Thread.sleep((long) (3 * Math.random()));
            } finally {
                inUse.set(false); }}}/** * The class responsible for locking */
    static class ExampleClientThatLocks {
        private final InterProcessMutex lock;
        private final FakeLimitedResource resource;
        private final String clientName;

        public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
            this.resource = resource;
            this.clientName = clientName;
            // Create a lock, although the client is different, but lockPath can be used to lock the effect
            lock = new InterProcessMutex(client, lockPath);
        }
         /** * The core code of the actual lock is the following sentence *@paramTime Indicates the duration *@paramUnit Time unit *@throwsException Throws an Exception */
        public void doWork(long time, TimeUnit unit) throws Exception {
            // Try to acquire the lock, throw an exception if it has not been acquired for longer
            if(! lock.acquire(time, unit)) {throw new IllegalStateException(clientName + "Unable to obtain lock");
            }

            try {
                System.out.println(clientName + "Got the lock.");
                resource.use();
            } finally {
                System.out.println(clientName + "Release lock");
                lock.release(); // always release the lock in a finally block}}}}Copy the code

Service discovery test case

package org.yao;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.junit.Test;

import static com.google.common.truth.Truth.assertThat;

public class ServiceDiscoveryTests {
  private String connectString = "localhost:2181";

  /** Shows the basic usage for curator-x-discovery. */
  @Test
  public void testBasics(a) throws Exception {
    CuratorFramework client = null;
    ServiceDiscovery<String> discovery = null;
    ServiceProvider<String> provider = null;
    String serviceName = "test";
    String basePath = "/services";

    try {
      client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1));
      client.start();

      ServiceInstance<String> instance1 =
          ServiceInstance.<String>builder().payload("plant").name(serviceName).port(10064).build();
      ServiceInstance<String> instance2 =
          ServiceInstance.<String>builder().payload("animal").name(serviceName).port(10065).build();

      System.out.printf("instance1 id: %s\n", instance1.getId());
      System.out.printf("instance2 id: %s\n", instance2.getId());

      discovery =
          ServiceDiscoveryBuilder.builder(String.class)
              .basePath(basePath)
              .client(client)
              .thisInstance(instance1)
              .build();
      discovery.start();
      discovery.registerService(instance2);

      provider = discovery.serviceProviderBuilder().serviceName(serviceName).build();
      provider.start();

      assertThat(provider.getInstance().getId()).isNotEmpty();
      assertThat(provider.getAllInstances()).containsExactly(instance1, instance2);

      client.delete().deletingChildrenIfNeeded().forPath(basePath);
    } finally{ CloseableUtils.closeQuietly(provider); CloseableUtils.closeQuietly(discovery); CloseableUtils.closeQuietly(client); }}}Copy the code

The resources

  1. ZooKeeper combat and source code analysis
  2. curator.apache.org/
  3. Github.com/apache/cura…