1. Introduction

  • The Curator framework is wrapped on top of the Native ZooKeeper API, addressing many of the very low-level details of the ZooKeeper client development.
  • It provides abstract encapsulation of Various zooKeeper application scenarios (such as distributed lock service, cluster leader election, shared counters, cache mechanism, distributed queue, etc.) and implements Fluent style API interface, which is the most useful and popular zooKeeper client.

1.1 deficiencies of the native zookeeperAPI

  • Connection objects are created asynchronously, requiring developers to code their own waits
  • Connections do not have an automatic reconnect timeout mechanism
  • Watcher is registered once
  • Recursive creation of tree nodes is not supported

1.2 curator characteristics

  • Resolve session session timeout reconnection
  • Watcher signed up repeatedly
  • Simplified development apis
  • Follow the Fluent style API
  • Distributed lock service, shared counters, caching mechanism and other mechanisms are provided

1.3 depend on

<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <! -- Encapsulates advanced features such as Cache event listening, elections, distributed locks, and distributed barriers.
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
        <type>jar</type>
    </dependency>
</dependencies>
Copy the code

2. Connect and close

  • Factory design pattern and builder design pattern are adopted. You can obtain a client that connects to the Zookeeper server by entering some connection information.
      public static void main(String[] args) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000.3);  // Indicates an interval of 1s. Reconnection can be attempted for a maximum of three times
        CuratorFramework client = CuratorFrameworkFactory
                                              .builder()
                                              .connectString("192.168.233.133:2181192168 233.131:2181192168 233.132:2181")
                                              .sessionTimeoutMs(5000)
                                              .retryPolicy(retryPolicy)
                                              .namespace("create")
                                              .build();
        client.start();  // Start the client
        log.info(client.isStarted());
        client.close();  // Close the client
      }
    Copy the code

    ConnectString: Used to set the address and port number.

    sessionTimeoutMs: Used to set the timeout period.retryPolicy: Used to set the reconnection policynamespace: indicates the path of the root node

2.1 Test template

  • Therefore, you can write a test template that opens the client before you start and closes it after you finish.
    public class CreateTest {
      private final static Logger log = Logger.getLogger(ConnectTest.class);
      private String connectString = "192.168.233.133:2181192168 233.131:2181192168 233.132:2181";
      CuratorFramework client;
      Integer sessionTimeoutMs = 5000;
      Integer baseSleepTimeMs = 1000;
      Integer maxRetries = 3;
      String namespace = "create";
    
    
      @Before
      public void before(a) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();
        client.start();
        log.info("Client is open");
      }
    	
      @After
      public void after(a) {
        client.close();
        log.info("Client closed"); }}Copy the code

3. Add a node

3.1 Case 1: Simple Creation

  @Test
  public void testCreate(a) throws Exception {
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node"."data".getBytes());
    log.info(End of the "create");
  }
Copy the code

3.2 Case 2: Creating Custom Permissions

  @Test
  public void testCreate2(a) throws Exception {
    Id ip = new Id("ip"."192.168.233.133");
    List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
    client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acl)
            .forPath("/node1"."data".getBytes());
    log.info(End of the "create");
  }
Copy the code

3.3 Case 3: Recursive Node Creation

  • .creatingParentsIfNeeded()Implementation, you can create nodes recursively
      @Test
      public void testCreate3(a) throws Exception {
        // Create nodes recursively
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node2/node33"."data".getBytes());
        log.info(End of the "create");
      }
    Copy the code

3.4 Case 4: Creating a Node asynchronously

  • The first argument received by the method is explained herecuratorFrameworkIt’s actually the client;curatorFrameworkThe results of some queries are saved.
      @Test
      public void testCreate4(a) throws Exception {
        // Create nodes asynchronously
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .inBackground(new BackgroundCallback() {
                  @Override
                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    log.info(curatorFramework == client);  // true
                    log.info("getResultCode(): " + curatorEvent.getResultCode());  // 0 indicates that the vm is created successfully
                    log.info("getType(): " + curatorEvent.getType().toString());  // Get the operation type CREATE
                    log.info("getPath(): " + curatorEvent.getPath());   // Get the node path
                  }
                })
                .forPath("/node2/node38"."data".getBytes());
        log.info(End of the "create");
      }
    Copy the code

4. Update nodes

4.1 Case 1: Updating a node

  @Test
  public void testSet(a) throws Exception {
    client.setData()
            .forPath("/node"."set".getBytes());
    log.info("Setup complete");
  }
Copy the code

4.2 Case 2: Updating a Node with the Version

  @Test
  public void testSet2(a) throws Exception {
    client.setData()
            .withVersion(1)  // With a version number
            .forPath("/node"."12".getBytes());
    log.info("Setup complete");
  }
Copy the code

4.3 Case 3: Update a node with callback method

  @Test
  public void testSet3(a) throws Exception {
    client.setData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getResultCode());  / / 0
                log.info(curatorEvent.getType());  // SET_DATA
                log.info(curatorEvent.getPath());  // /node
                log.info(curatorEvent.getStat().toString());  / /,4,0,0,0,3,0,21474836489 21474836489214483542162040876 12162042284 88
              }
            })
            .forPath("/node"."432".getBytes());
    log.info("Setup complete");
  }
Copy the code

5. Delete the node

5.1 Case 1: Deleting a Node

  @Test
  public void testDelete(a) throws Exception {
    client.delete()
            .forPath("/node");
    log.info("Deletion completed");
  }
Copy the code

5.2 Case 2: Deleting nodes Recursively

  @Test
  public void testDelete1(a) throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .forPath("/node2");
    log.info("Deletion completed");
  }
Copy the code

5.3 Case 3: Delete a node using the Callback method

  @Test
  public void testDelete3(a) throws Exception {
    client.delete()
            .deletingChildrenIfNeeded()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType());  // DELETE
                log.info(curatorEvent.getPath());  // /node1
              }
            })
            .forPath("/node1");
    log.info("Deletion completed");
  }
Copy the code

6. View the node

6.1 Case 1: Viewing a Node

  @Test
  public void testGet(a) throws Exception {
    byte[] data = client.getData()
            .forPath("/node2");
    log.info(new String(data));
  }
Copy the code

6.2 Case 2: Viewing the Value and status of a Node

  @Test
  public void testGet2(a) throws Exception {
    Stat stat = new Stat();
    byte[] data = client.getData()
            .storingStatIn(stat)
            .forPath("/node2");
    log.info(new String(data));
    log.info(stat.getVersion());
  }
Copy the code

6.3 Case 3: Check a node using the Callback method

  @Test
  public void testGet3(a) throws Exception {
    client.getData()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(new String(curatorEvent.getData()));  / / 4134134
                log.info(curatorEvent.getStat().toString());  / /,0,0,0,0,7,0,21474836566 21474836566214483566162042639 98162042639 98
                log.info(curatorEvent.getType().toString());  // GET_DATA
              }
            })
            .forPath("/node2");
  }
Copy the code

7. View child nodes

7.1 Case 1: Viewing all child nodes of a node

  @Test
  public void testChildren(a) throws Exception {
    List<String> children = client.getChildren()
            .forPath("/");
    log.info(children.toString());
  }
Copy the code

7.2 Case 2: View all child nodes of a node using the callback method

  @Test
  public void testChildren2(a) throws Exception {
    client.getChildren()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getPath()); // /
                log.info(curatorEvent.getType().toString());  // CHILDREN
                log.info(curatorEvent.getChildren().toString());  // [node, node2, node3]
              }
            })
            .forPath("/");
  }
Copy the code

8. Check whether the node exists

8.1 Case 1: Checking whether a node exists

  @Test
  public void testExists(a) throws Exception {
    Stat stat = client.checkExists()
            .forPath("/node");
    if(stat ! =null)
      log.info(stat.toString());
    else
      log.info("Node does not exist");
  }
Copy the code

8.2 Case 2: The callback method is used to check whether a node exists

  @Test
  public void testExists1(a) throws Exception {
    client.checkExists()
            .inBackground(new BackgroundCallback() {
              @Override
              public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                log.info(curatorEvent.getType().toString());  // EXISTS
                Stat stat = curatorEvent.getStat();
                if(stat ! =null)
                  log.info(stat.toString());  / /,0,0,0,0,0,0,21474836548 21474836548214483548162042341 64162042341 64
                else
                  log.info("Node does not exist");
              }
            })
            .forPath("/node");
  }
Copy the code

9. Watcher

  • A curator provides two types of Watcher(Cache) to listen for node changes
  • NodeCache: only listens for a specific nodeAdd, modify, and delete data. (The addition, deletion and modification of child nodes are not managed)
  • PathChildrenCache: Monitors a ZNode child node. When a child nodeAdd, modify, delete dataPathCache changes its state to include the latest child node, its data, and state
  • This monitor can be used multiple times

9.1 Case 1: NodeCache

  @Test
  public void testWatch(a) throws Exception {
    // Observe the change of the node
    NodeCache nodeCache = new NodeCache(client, "/node22");
    nodeCache.start();
    nodeCache.getListenable()
            .addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged(a) throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if(currentData ! =null) {
                  log.info(currentData.getPath());
                  log.info(new String(currentData.getData()));
                } else {
                  log.info("A node was deleted."); }}}); Thread.sleep(60000); / / sleep 30 s
    nodeCache.close();
  }
Copy the code

9.2 Case 2: PathChildrenCache

  @Test
  public void testWatch2(a) throws Exception {
    // Observe the change of the node
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22".true);
    pathChildrenCache.start();
    pathChildrenCache.getListenable()
            .addListener(new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                log.info(pathChildrenCacheEvent.getType());  // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
                log.info(pathChildrenCacheEvent.getData().toString());  / / ChildData {path = '/ node22 / child, stat 59,0,0,0,0,2,0,21474836630 = 21474836630214483630162044842, 59162044842, data=[50, 50]}
                log.info(new String(pathChildrenCacheEvent.getData().getData()));
                log.info(pathChildrenCacheEvent.getData().getPath());  // ChildData{path='/node22/child'
                log.info(pathChildrenCacheEvent.getData().getStat().toString());  / /,0,0,0,0,2,0,21474836630 of 59, 21474836630214483630162044842, 59162044842}}); Thread.sleep(60000); / / sleep 30 s
    pathChildrenCache.close();
  }
Copy the code

The transaction was

10.1 Case 1: Creating two nodes using a transaction

  @Test
  public void testTransaction(a) throws Exception {
    client.inTransaction()
            .create().forPath("/node100"."100".getBytes())
            .and()  / / bridge
            .create().forPath("/node101"."101".getBytes())
            .and()  / / bridge
            .commit();  / / submit
    log.info("Submission successful");
  }
Copy the code

11. Distributed locks

11.1 Using distributed reentrant exclusive locks

  • An exclusive lock means that everyone is competing for the same lock node/lockWhen requested, will be/lockAdd an internal sequence node that can continue execution when it is its turn; Otherwise it blocks. When the lock is released, the sequence node added by the lock is deleted. (Basic implementation principle andA distributed lockBasically the same)
      @Test
      public void testMutex(a) throws Exception {
        / / exclusive lock
        InterProcessLock lock = new InterProcessMutex(client, "/lock");
        log.info("Waiting to acquire lock object");
        lock.acquire();
        for (int i = 0; i < 3; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        lock.release();
        log.info("Release lock");
      }
    Copy the code

11.2 Using read/write Locks

  • Read and write locks are two different types of locks, but something interesting can happen when both compete for the same lock node.
  • When a read lock is entered, other read locks can also be entered. But write locks can only wait outside;
  • When the write lock is entered, neither the read nor write lock can enter.
      /** * Write locks are not allowed to work while blocking. * When a read lock is running, another read lock is allowed to read data * When a write lock is running, no other read/write locks are allowed to enter *@throws Exception
       */
      @Test
      public void testReadLock(a) throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock readLock = interProcessReadWriteLock.readLock();
        log.info("Waiting to acquire read lock object");
        readLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        readLock.release();
        log.info("Release lock");
      }
    Copy the code
      @Test
      public void testWriteLock(a) throws Exception {
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
        InterProcessLock writeLock = interProcessReadWriteLock.writeLock();
        log.info("Waiting to acquire write lock object");
        writeLock.acquire();
        for (int i = 0; i < 10; i++) {
          Thread.sleep(3000);
          System.out.println(i);
        }
        writeLock.release();
        log.info("Release lock");
      }
    Copy the code