Zookeeper Curator basic Use (1)

preface

However, native ZK API development is tedious and inefficient, and The Curator framework is a ZK Client provided by Apache.

Establish a connection

To use the Curator framework, you need to add dependencies to the POM file

< the dependency > < groupId > org. Apache. Curator < / groupId > < artifactId > curator - framework < / artifactId > < version > 4.0.0 < / version > </dependency>Copy the code

Using the CuratorFramework to establish a connection to zk, CutartorFactory creates a zkClient method that exposes two methods, newClient and builder. Finally, build is called as well

CuratorFactory’s newClient overrides two methods

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
​
Copy the code
  • ConnectString: host:port of zkServers A character string separated by commas (,) for each instance in a clustered environment
  • RetryPolicy: retryPolicy
  • SessionTimeoutMs: Session timeout duration the default value is 60 x 1000 ms
  • ConnectionTimeoutMs: The connection timeout duration is 15 x 1000 ms by default

The sample code

final String ZK_SERVERS = "localhost:2181,localhost:2182,localhost:2183";
final int RETRY_TIMES = 3;
final int RETRY_INTERVAL_MILLIONS_SECONDS = 5000;
final int sessionTimeoutMs = 60 * 1000;
final int connectionTimeoutMs = 15 * 1000;
​
final RetryPolicy retry = new ExponentialBackoffRetry(RETRY_INTERVAL_MILLIONS_SECONDS, RETRY_TIMES);
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVERS, retry);
CuratorFramework client2 = CuratorFrameworkFactory.builder()
        .connectString(ZK_SERVERS)
        .sessionTimeoutMs(sessionTimeoutMs)
        .connectionTimeoutMs(connectionTimeoutMs)
        .retryPolicy(retry)
        .build();
client.start();
Copy the code

ExponentialBackoffRetry is a common retry policy. In the example code, three retry attempts are implemented at an interval of 5000ms. The default session time and connection timeout times are not allowed. Pay special attention to client.start(), the start method only needs to be executed once, and if called repeatedly in code, Throw new IllegalStateException(“Cannot be started more than once”);

2. Create a node

client.create().forPath("/5AnJam");
client.create().withMode(CreateMode.EPHEMERAL).forPath("/5AnJam/test","Hello world".getBytes(StandardCharsets.UTF_8));
​
List<String> l = client.getChildren().forPath("/5AnJam");
System.out.println(Arrays.toString(l.toArray()));
​
String val1 = new String(client.getData().forPath("/5AnJam/test"));
System.out.println(val1);
​
Copy the code

Create a new node using create().forpath, forPath accepts arguments (String path,byte[] data), data can be null by default, and the node type can be set using withMode. There are “7” types corresponding to the ZNode type of ZooKeeper. Generally, we think that ZNode has 4 types in eight-part document, but in fact, ZNode has 7 types (the node type in ZooKeeper is used to explain here).

public enum CreateMode { PERSISTENT(0, false, false, false, false), PERSISTENT_SEQUENTIAL(2, false, true, false, false), EPHEMERAL(1, true, false, false, false), EPHEMERAL_SEQUENTIAL(3, true, true, false, false), CONTAINER(4, false, false, true, false), PERSISTENT_WITH_TTL(5, false, false, false, true), PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true); private CreateMode(int flag, boolean ephemeral, boolean sequential, boolean isContainer, boolean isTTL) { this.flag = flag; this.ephemeral = ephemeral; this.sequential = sequential; this.isContainer = isContainer; this.isTTL = isTTL; }}Copy the code

Focus on the next three nodes

  • CONTAINER CONTAINER node: A CONTAINER node is similar to a persistent node, except that when Zk is started it scans for children under the CONTAINER node. If this is empty, the CONTAINER node is deleted.
  • PERSISTENT_WITH_TTL: TTL is the Time To Live of a node. PERSISTENT_WITH_TTL is the Time To Live of a node, which is dependent on the client connection and is used To establish a node with an expiration date.
  • PERSISTENT_SEQUENTIAL_WITH_TTL: Similar to PERSISTENT_WITH_TTL, except that it is ordered.

Tips

  1. When the node exists, do not create it again. Otherwise, an exception will be reported
  2. ZK nodes are not allowed to be created recursively. The above code shows that the persistent node /5AnJam/test was created, but the first step can not be omitted and directly create /5AnJam/test
  3. Child nodes cannot be created on temporary nodes. Child nodes can only be created under persistent nodes

3. Modify the node value

client.setData().forPath("/5AnJam/test","Hello again".getBytes(StandardCharsets.UTF_8));
String val2 = new String(client.getData().forPath("/5AnJam/test"));
System.out.println(val2);
Copy the code

Curator can modify the value of the node by using setData().forpath, but an error is reported if the node does not exist

4. Delete a node

 client.delete().forPath("/5AnJam/test");
 client.delete().forPath("/5AnJam");
 Stat nodeStat = client.checkExists().forPath("/5AnJam");
 System.out.println(null == nodeStat);
Copy the code

ForPath, delete nodes will not be recursively deleted. It needs to be deleted from bottom to top, otherwise exceptions will be thrown. After a node is deleted, the checkExists().forpath method is used to check whether the node is successfully deleted. If the stat is successfully deleted, null is returned

5. Node monitoring

        NodeCache nodeCache = new NodeCache(client,"/5AnJam");
        nodeCache.start(true);
        nodeCache.getListenable().addListener(() -> {
            System.out.println(new String(nodeCache.getCurrentData().getData()));
        });
        client.create().forPath("/5AnJam");
        client.setData().forPath("/5AnJam","1".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath("/5AnJam","2".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath("/5AnJam","3".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath("/5AnJam","4".getBytes(StandardCharsets.UTF_8));
        while(true);
Copy the code

NodeCache provides the node-listening function. The concept of cache refers to the comparison between the current node and the node on the ZK server. If the NodeCache is different, the NodeCache mechanism is triggered.

Six. Complete sample code

public static void main(String[] args) throws Exception {
    final String ZK_SERVERS = "localhost:2181,localhost:2182,localhost:2183";
    final int RETRY_TIMES = 3;
    final int RETRY_INTERVAL_MILLIONS_SECONDS = 5000;
    final int sessionTimeoutMs = 60 * 1000;
    final int connectionTimeoutMs = 15 * 1000;

    final RetryPolicy retry = new ExponentialBackoffRetry(RETRY_INTERVAL_MILLIONS_SECONDS, RETRY_TIMES);
    CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVERS, retry);
    CuratorFramework client2 = CuratorFrameworkFactory.builder()
            .connectString(ZK_SERVERS)
            .sessionTimeoutMs(sessionTimeoutMs)
            .connectionTimeoutMs(connectionTimeoutMs)
            .retryPolicy(retry)
            .build();

    client.start();
    client.create().forPath("/5AnJam");
    client.create().withMode(CreateMode.EPHEMERAL).forPath("/5AnJam/test","Hello world".getBytes(StandardCharsets.UTF_8));

    List<String> l = client.getChildren().forPath("/5AnJam");
    System.out.println(Arrays.toString(l.toArray()));

    String val1 = new String(client.getData().forPath("/5AnJam/test"));
    System.out.println(val1);

    client.setData().forPath("/5AnJam/test","Hello again".getBytes(StandardCharsets.UTF_8));
    String val2 = new String(client.getData().forPath("/5AnJam/test"));
    System.out.println(val2);

    client.delete().forPath("/5AnJam/test");
    client.delete().forPath("/5AnJam");
    Stat nodeStat = client.checkExists().forPath("/5AnJam");
    System.out.println(null == nodeStat);
    
    NodeCache nodeCache = new NodeCache(client,"/5AnJam");
    nodeCache.start(true);
    nodeCache.getListenable().addListener(() -> {
        System.out.println(new String(nodeCache.getCurrentData().getData()));
    });
    client.create().forPath("/5AnJam");
    client.setData().forPath("/5AnJam","1".getBytes(StandardCharsets.UTF_8));
    client.setData().forPath("/5AnJam","2".getBytes(StandardCharsets.UTF_8));
    client.setData().forPath("/5AnJam","3".getBytes(StandardCharsets.UTF_8));
    client.setData().forPath("/5AnJam","4".getBytes(StandardCharsets.UTF_8));
    client.delete().forPath("/5AnJam");
    while(true);
}
Copy the code

\