When I first learned ZooKeeper, I often forgot basic concepts such as zooKeeper installation and basic command operation. Therefore, this paper mainly records the building process of the cluster, the common commands of ZooKeeper and the operation and use of API, so as to provide temporary help for some friends who need it and make a backup for their future review.

1. Zookeeper Distributed cluster construction

Cluster planning

Zookeeper is deployed on hadoop102, Hadoop103, and Hadoop104.

Unpack the installation

  1. Decompress the Zookeeper installation package to the /opt/module/ directory

    Tar -zxvf zookeeper-3.5.7.tar.gz -c /opt/module/Copy the code
  2. Synchronize the contents of /opt/module/zookeeper-3.5.7 to hadoop103 and hadoop104

    Xsync zookeeper 3.5.7 /Copy the code

Configuring the Server Number

  • Create zkData in /opt/module/zookeeper-3.5.7/

  • Create a myID file in /opt/module/zookeeper-3.5.7/zkData

  • Edit the myID file and add the number corresponding to the server to the file

  • Copy the ZooKeeper configuration to another host

    xsync myid
    Copy the code

    Distribute script code:

    #! /bin/bash
    #1. Determine the number of parameters
    if [ $# -lt 1 ]
    then
      echo Not Enough Arguement!
      exit;
    fi
    #2. Traverse all the machines in the cluster
    for host in hadoop102 hadoop103 hadoop105
    do
      echo ====================  $host  ====================
      #3. Traverse all directories and send them one by oneIf [-e $file] then #5. Pdir =$(CD -p $(dirname $file); Fname =$(basename $file) SSH $host "mkdir -p $pdir" rsync -av $pdir/$fname $host:$pdir else echo $file  does not exists! fi done doneCopy the code

    Quick query script:

    #! /bin/bashFor I in hadoop102 hadoop103 hadoop104 do echo "= = = = = = = = $I = = = = = = = =" SSH $I "the JPS" | grep -v JPS add permissions chmod + x all done Sudo mv all /bin/ Move to the bin directoryCopy the code

Configure the zoo. CFG file

  • Rename zoo_sample. CFG in /opt/module/zookeeper-3.5.7/conf to zoo.cfg

  • Open the zoo. CFG file

  • Example Modify data store path configuration

    DataDir = / opt/module/zookeeper - 3.5.7 / zkData
    #add
    server.2=hadoop102:2888:3888
    server.3=hadoop103:2888:3888
    server.4=hadoop104:2888:3888
    Copy the code
  • Synchronize the zoo. CFG configuration file

Configuration parameter parsing:

Server. A = B: C: D.Copy the code

A is A number that indicates the server number;

In cluster mode, A file myID is configured, which is in the dataDir directory. There is A data in this file that is the value of A. When Zookeeper starts up, it reads this file and compares the data with the configuration information in zoo.cfg to determine which server it is.

B is the address of the server;

C is the port through which the followers of the server exchange information with the Leader server in the cluster.

In case the Leader server in the cluster fails, a port is needed to re-elect a new Leader, and this port is used to communicate with each other during the election.

Cluster operations

  • Start Zookeeper respectively

    bin/zkServer.sh start
    Copy the code
  • Check the status

    bin/zkServer.sh status
    Copy the code

2. Use basic commands of the Zookeeper client

Zookeeper commands are executed on the ZooKeeper service.

First run the following command to open a new session and enter the terminal:

$ bin/zkCli.sh
Copy the code

Client command line operation

Basic command syntax Functional description
help Displays all operation commands
ls path Use the ls command to view the child nodes of the current ZNode -w to listen for child node changes -s to add secondary information
ls2 path View detailed data about the current node
create Normal create -s contains sequence -e temporary (restart or timeout disappears)
get path Get the value of the node -w to listen for changes in the node content -s to add secondary information
set Set the value of a node
stat Viewing Node Status
delete Remove nodes
deleteall Delete nodes recursively

Example:

#Start the client:
bin/zkCli.sh
#Displays all operation commands
help
#View the contents of the current ZNode
ls /
#View detailed data about the current node
ls2 /
#Create two common nodes
create /A "node1"
create /A/B "node2"
#Gets the value of the nodeget /A "node2" cZxid = 0x100000003 ctime = Wed Aug 29 00:03:23 CST 2018 mZxid = 0x100000003 mtime = Wed Aug 29 00:03:23 CST 2018 pZxid = 0x100000004 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 7 numChildren  = 1 [zk: localhost:2181(CONNECTED) 6] [zk: localhost:2181(CONNECTED) 6] get /sanguo/shuguo liubei cZxid = 0x100000004 ctime = Wed Aug 29 00:04:35 CST 2018 mZxid = 0x100000004 mtime = Wed Aug 29 00:04:35 CST 2018 pZxid = 0x100000004 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0#Creating transient nodes can only be viewed in the current session
create -e /A/C "LSnode"
#Create nodes with serial numbers
#If there is no ordinal node, the ordinal number increases from 0. If there are two nodes under the original node, the sequence starts from 2, and so on.
create -s /A/B/C "node3"
-Created /A/B/C0000000000
create -s /A/B/D "node4"
-Created /A/B/C0000000001
#Example Modify node data
set /A "NEWnode"
#The value of a node listens for changes
get /sanguo watch
#Child node change monitoring of a node (path change)
ls /sanguo watch
#Remove nodes
delete /A/B
#Delete nodes recursively
rmr /sanguo/shuguo
#Viewing Node Status
stat /A
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000011
mtime = Wed Aug 29 00:21:23 CST 2018
pZxid = 0x100000014
cversion = 9
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1
Copy the code

Three, the use of API

  1. Create a Maven project and add poM files:
<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<! -- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.5.7</version>
		</dependency>
</dependencies>
Copy the code
  1. Creation of the log4j.properties file
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Copy the code
  1. Example Creating a ZooKeeper client
private static String connectString =
 "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private static int sessionTimeout = 2000;
	private ZooKeeper zkClient = null;

	@Before
	public void init(a) throws Exception {

	zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent event) {

				// Callback function after receiving event notification (user's business logic)
				System.out.println(event.getType() + "--" + event.getPath());

				// Start listening again
				try {
					zkClient.getChildren("/".true);
				} catch(Exception e) { e.printStackTrace(); }}}); }}Copy the code
  1. Creating child Nodes
// Create a child node
@Test
public void create(a) throws Exception {
	// Parameter 1: path of the node to be created; Parameter 2: node data; Parameter 3: node permission. Parameter 4: Node type
	String nodeCreated = zkClient.create("/A"."node1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Copy the code
  1. Gets child nodes and listens for node changes
@Test
public void getChildren(a) throws Exception {
	List<String> children = zkClient.getChildren("/".true);
	for (String child : children) {
		System.out.println(child);
	}
	// Delay blocking
	Thread.sleep(Long.MAX_VALUE);
}
Copy the code
  1. Check whether the Znode exists
// Check whether zNode exists
@Test
public void exist(a) throws Exception {
	Stat stat = zkClient.exists("/eclipse".false);
	System.out.println(stat == null ? "not exist" : "exist");
}
Copy the code

Listen for dynamic offline cases of server nodes

In a distributed system, there can be multiple primary nodes that can be dynamically connected. Any client can sense the online connection of the primary node in real time.

Concrete implementation:

Start by creating the/Servers node on the cluster

create /servers "servers"
Copy the code

The server registers the code with Zookeeper

package com.test.zkcase;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class DistributeServer {

	private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private static int sessionTimeout = 2000;
	private ZooKeeper zk = null;
	private String parentNode = "/servers";
	
	// Create a client connection to zk
	public void getConnect(a) throws IOException{
		zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent event) {}}); }// Register the server
	public void registServer(String hostname) throws Exception{
		String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println(hostname +" is online "+ create);
	}
	
	// Business function
	public void business(String hostname) throws Exception{
		System.out.println(hostname+" is working ...");
		Thread.sleep(Long.MAX_VALUE);
	}
	
	public static void main(String[] args) throws Exception {
		// 1 get zK connection
		DistributeServer server = new DistributeServer();
		server.getConnect();
		
		// 2 Use zK to connect to registry server information
		server.registServer(args[0]);
		
		// 3 Enable service functions
		server.business(args[0]); }}Copy the code

Client code

package com.test.zkcase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributeClient {
	private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private static int sessionTimeout = 2000;
	private ZooKeeper zk = null;
	private String parentNode = "/servers";

	// Create a client connection to zk
	public void getConnect(a) throws IOException {
		zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				// Start listening again
				try {
					getServerList();
				} catch(Exception e) { e.printStackTrace(); }}}); }// Get the server list information
	public void getServerList(a) throws Exception {
		
		// 1 Gets information about the child node of the server and listens on the parent node
		List<String> children = zk.getChildren(parentNode, true);

        // 2 Storage server information list
		ArrayList<String> servers = new ArrayList<>();
		
        // 3 Run the following command to obtain the host names of all nodes
		for (String child : children) {
			byte[] data = zk.getData(parentNode + "/" + child, false.null);

			servers.add(new String(data));
		}

        // 4 Displays the server list
		System.out.println(servers);
	}

	// Business function
	public void business(a) throws Exception{

		System.out.println("client is working ...");
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {

		// 1 get zK connection
		DistributeClient client = new DistributeClient();
		client.getConnect();

		// 2 Get the child node information of servers from which to get the server information list
		client.getServerList();

		// 3 The service process is startedclient.business(); }}Copy the code