One, foreword

The last blog has introduced the simple and practical Zookeeper open source client. This blog will explain the application scenarios of Zookeeper.

2. Typical application scenarios

Zookeeper is a highly available distributed data management and coordination framework that ensures data consistency in distributed environments. More and more distributed systems (Hadoop, HBase, Kafka) use Zookeeper as a core component.

2.1 Data Publishing/subscription

Data publish/subscribe system, the configuration center. Publishers need to publish data to Zookeeper nodes so that subscribers can subscribe data. In this way, dynamic data can be obtained and centralized configuration information can be managed and data can be dynamically updated. Publish/subscribe generally has two design modes: push mode and pull mode. The server actively sends data updates to all subscribing clients. Zookeeper adopts the push and pull mode. The client registers the node that it needs to pay attention to with the server. Once the node data is changed, the server will push the Watcher event notification to the corresponding client. Go to the server to get the latest data.

If the configuration information is stored in Zookeeper for centralized management, the application will take the initiative to obtain the configuration information on the Zookeeper server during startup. Meanwhile, a Watcher listener is registered on a specified node, so that the configuration information can be changed. The server notifies all subscribing clients of the latest configurations in real time.

2.2 Load Balancing

Load balancing is a fairly common computer network technology used to allocate load among multiple computers, network connections, cpus, disk drives, or other resources to optimize resource usage, maximize throughput, minimize response times, and avoid overloads.

Use Zookeeper to implement the dynamic DNS service

Domain name configuration, first on the Zookeeper create a node for the domain name configuration, such as DDNS/app1/server.app1.company1.com.

· Domain name resolution: The application obtains the IP address and port configuration from the domain name node and resolves the IP address and port by itself. At the same time, the application registers a data change Watcher listener on the domain node to receive notifications of domain changes.

· Domain name change: If the IP address or port number is changed, you need to change the domain name. In this case, you only need to update the specified domain name node, and Zookeeper sends the event notification to the subscribed client, and the client obtains the domain name configuration again.

2.3 Naming Service

Naming service is a common scenario in a step-by-step implementation system. In a distributed system, the named entity can be a machine in a cluster, a service address provided or a remote object. Through naming service, the client can obtain information about the resource entity, service address and provider according to the specified name. Zookeeper also helps application systems locate and use resources by referring to resources. In a broad sense, resource location of naming services is not a real entity resource. In a distributed environment, upper-layer applications only need a globally unique name. Zookeeper implements a distributed globally unique ID allocation mechanism.

A sequential node can be created by calling the API created by the Zookeeper node, and the full name of the node is returned in the API return value. Using this feature, the global ID can be generated as follows

1. According to the task type, the client invokes the interface to create a sequence node, for example, “job-“.

2. After the node is created, a complete node name is returned, for example, job-00000001.

3. After the client conflates the type type and return value, the ID can be used as the global unique ID, for example, type2-JOB-00000001.

2.4 Distributed coordination/notification

The special Watcher in Zookeeper is registered in the asynchronous notification mechanism, which can well realize the coordination and notification between different machines and even different systems in the distributed environment, so as to realize the real-time processing of data changes. In general, different clients register the same data node on Zookeeper with Watcher and monitor the change of the data node (including the node itself and its children). If the data node changes, all subscribed clients can receive the corresponding Watcher notification and make corresponding processing.

MySQL Data Replication Bus is a real-time data replication framework for asynchronous data replication and data change notification between different MySQL database instances. The whole system is composed of MySQL database cluster, message queue system, task management monitoring platform, Zookeeper cluster and other components, which is a data bus system including producers, replication pipes, data consumption and other parts.

Zookeeper is mainly responsible for distributed coordination. In terms of specific implementation, the data replication component is divided into three modules according to its functions: Core (implements the Core logic of data replication, encapsulates the data replication into a pipeline, and abstracts the concepts of producer and consumer), Server (starts and stops the replication task), and Monitor (monitors the running status of the task and alarms if an exception or fault occurs during the data replication).

Each module runs on the server as an independent process, and the data and configuration information during running are saved on Zookeeper.

  

① When the Core process is started, the task is registered with the /mysql_replicator/tasks node. For example, a child node is created with the /mysql_replicator/tasks/copy_hot/item node. If the child node already exists during the registration process, Note The Task has been registered with another Task machine. Therefore, the Task machine does not need to create this node.

(2) Task hot backup: To cope with task faults or host faults, replication components adopt the “hot backup” Dr Mode. That is, a replication task is deployed on different hosts. The active and standby task hosts use Zookeeper to check the running status of each other. Each machine needs to register its host name on the /mysql_replicator/ Tasks /copy_hot_item/ Instrances node, regardless of whether the task node is created in the first step. The node type is temporary order node. After the child node is created, Every day, the task machine can obtain the name of the node created by itself and the list of all the child nodes, and then compare it to determine whether it has the smallest serial number among all the child nodes. If so, it sets its RUNNING status to RUNNING and other machines to STANDBY. This policy is called small serial number priority policy.

(3) Hot spare switchover. After the RUNNING status is marked, the client machine marked as RUNNING starts normal data replication, while the machine marked as STANDBY enters the STANDBY state. Once the RUNNING machine fails, The STANDY machine needs to register a child list change monitor on the /mysql_replicator/ Tasks /copy_hot_item/instances node. After the RUNNING machine is down and disconnected from Zookeeper, the corresponding node will also disappear, and all clients will be notified to conduct a new election.

(4) Record the execution status. The RUNNING machine must reserve the RUNNING context status to the STANDBY machine.

(5) Console coordination, the main work of Server is to carry out task control, through Zookeeper to control and coordinate different tasks, The Server will write the producer metadata and consumer information of each replication task into the task node /mysql_replicator/tasks/copy_hot_item in the form of configuration, so that all the task machines in the task can share the configuration of the replication task.

In the above hot backup scheme, at least two task machines (RUNNING and STANDBY, that is, the active and STANDBY machines) are assigned to a task for hot backup. If MySQL instances need to be replicated, too many machines will be consumed. In this case, you need to use the cold backup scheme, which groups all tasks.

The Core process is configured to belong to a Group. That is, if a Core process is marked with group1, it will go to the corresponding Zookeeper group1 node to get all the Task lists after the Core process is started. If the Task “copy_hot_item” is found, If there are no child nodes, Create a temporary order node such as /mysql_replicator/task-groups/group1/copy_hot_item/instances/[Hostname]-1. Other Core processes also create similar child nodes under the Instances node, and determine RUNNING according to the “Small serial number first” policy. The difference is that other Core processes automatically delete the child nodes they create, and then iterate over the next Task node. This process is called cold backup scan. All Core processes perform continuous cold backups of tasks in the corresponding Group during the scan cycle.

In most distributed systems, the communication between machines is nothing more than heartbeat detection, work progress reporting and system scheduling.

(1) Heartbeat detection: Different machines need to detect whether each other is running properly. Zookeeper can be used to implement heartbeat detection between machines. Based on the characteristics of temporary nodes (the life cycle of temporary nodes is the client session, if the client is immediately, its temporary nodes will naturally no longer exist), Different machines can create temporary child nodes under a specified node of Zookeeper. Different machines can determine whether the corresponding client machine is alive based on the temporary child node. You can greatly reduce system coupling with Zookeeper.

(2) Work progress report. Usually, after the task is distributed to different machines, the task execution progress needs to be reported to the distribution system in real time. You can select a node on Zookeeper, and each task client creates a temporary child node under this node, so as to determine whether the machine is alive or not. At the same time, each machine can write its task execution progress to the temporary node, so that the central system can obtain the task execution progress in real time.

3. System scheduling: Zookeeper can implement the following system scheduling modes: The distributed system is composed of the console and some client systems. The responsibility of the console is to send some instruction information to all the clients to control them to carry out the corresponding business logic. The background manager performs some operations on the console, actually modifying the data of some nodes on Zookeeper. Zookeeper can send data changes to subscribing clients as time notifications.

2.5 Cluster Management

Zookeeper provides the following two features:

· If the client registers Watcher listener for Zookeeper’s data node, the Zookeeper server will send change notification to the subscribing client when the data and the contents of the document or the list of its sub-nodes are changed.

· For temporary nodes created on Zookeeper, once the session between client and server fails, the temporary nodes will also be deleted automatically.

If the monitoring system registers a Watcher listener on the /clusterServers node, then a temporary node will be created under the /clusterServers node whenever a dynamic machine is added: /clusterServers/[Hostname], so that the monitoring system can monitor machine changes in real time. The following describes how Zookeeper implements cluster management through a typical application of a distributed log collection system.

Distributed log collection system is the core of the work is to collect distribution system logs, on different machines in a typical log system architecture design, the entire log system will all need to collect log machine is divided into several groups, each group corresponds to a collector, the collector is actually a backstage machine, used to collect logs, In a large-scale distributed log collection system scenario, two problems need to be solved:

· Changing log source machines

· Variable collector machine

Changes to either the log source machine or the collector machine ultimately boil down to how quickly, reasonably, and dynamically you can assign the corresponding log source machine to each collector. The procedure for using Zookeeper is as follows

/logs/collector/[Hostname] /logs/collector/[Hostname]

② Task distribution: after all collector machines have created corresponding nodes, the system divides all log source machines into corresponding groups according to the number of sub-nodes of the collector node, and then writes the list of grouped machines to the sub-nodes created by these collector machines, such as /logs/collector/host1. In this way, the collector machine can get the list of log source machines on its corresponding collector node and start the log collection.

(3) status report, complete the task distribution, the machine will be down at any time, so you need to have a status reporting mechanism of the collector, every collector machine after creating the node, also need to corresponding child nodes to create a state of child nodes, such as/logs/collector/host/status, Each collector machine needs to regularly write its own state information to this node, which can be regarded as a heartbeat detection mechanism. Usually, the collector machine will write logs to collect state information, and the log system determines whether the collector machine survives by judging the last update time of state child nodes.

(4) Dynamic allocation, if the collector machine is down, it needs to dynamically allocate the collection task, collect the system operation process to focus on the change of all sub-nodes under the /logs/collector node, once a machine stops reporting or a new machine joins, the task is reassigned, usually by two methods:

· Global dynamic allocation. When the collector machine breaks down or a new machine joins, the system immediately regroups all the log source machines according to the list of new collector machines, and then allocates them to the remaining collector machines.

Local dynamic allocation, each collector machine in report the state of their own log collection at the same time, also will report their own load, if a machine is down, then logging system will be assigned to the task of the machine before the machines with low load redistribution, also, if there is a new machine to join, Some tasks are transferred from those machines with high load to the new machines.

The preceding steps have completely described the workflow of the log collection system. There are two points to note.

(1) Node type, creating temporary nodes under the /logs/ Collector node is a good way to determine whether the machine is alive or not. However, if the machine dies, its nodes will be deleted and the list of log source machines recorded on the node will also be cleared. Therefore, it is necessary to select persistent nodes to identify each machine. At the same time, create /logs/collector/[Hostname]/status nodes respectively under the node to represent the state of each collector machine. In this way, not only can realize the monitoring of all machines, but also after the machine hangs, it can still restore the assigned tasks.

(2) Log system node monitoring, if the Watcher mechanism is adopted, then the network overhead of the notification message volume is very large, so it is necessary to adopt the log system active polling collector node strategy, which can save network traffic, but there is a certain delay.

2.6 Master the election

In a distributed system, the Master is often used to coordinate the cluster other system unit, has a decision for the distributed system state changes, such as read and write in the separation of application scenario, the client’s written request is usually conducted by the Master, or their often deal with some complex logic processing and the result was synchronized to other system unit. The strong consistency of Zookeeper ensures that global uniqueness can be ensured when nodes are created under distributed and high concurrency conditions, that is, Zookeeper ensures that clients cannot repeatedly create an existing data node.

Create /master_election/2016-11-12. The client cluster will create temporary nodes under this node at regular times every day, such as /master_election/2016-11-12/binding. Only one client can be successfully created. /master_election/2016-11-12 /master_election/2016-11-12 /master_election/2016-11-12 /master_election/2016-11-12 The remaining clients will re-elect the Master.

2.7 Distributed Lock

Distributed locks Are used to control the synchronous access of distributed systems to shared resources, ensuring the consistency of access to one or a group of resources by different systems. Distributed locks are classified into exclusive locks and shared locks.

An exclusive lock is also called a write lock or an exclusive lock. If transaction T1 adds an exclusive lock to O1, only transaction T1 is allowed to read and update O1 during the entire lock period. No other transaction can perform any operations on O1 until T1 releases the exclusive lock.

1.Acquiring a lockTo obtain an exclusive lock, all clients create a temporary child node /exclusive_lock/lock under the /exclusive_lock node by invoking the interface. Zookeeper ensures that only one client can be successfully created. The unsuccessful client must register with the/Exclusive_lock node to listen.

(2) Release the lock. If the client that obtains the lock breaks down or completes the service logic normally, the temporary node will be deleted. At this time, all the clients registered to listen on the/Exclusive_lock node will be notified, and the distributed lock can be obtained again.

Shared lock is also called read lock. If transaction T1 adds a shared lock to data object O1, the current transaction can only read O1, and other transactions can only add a shared lock to the data object until all shared locks on the data object are released.

1.Acquiring a lock/shared_lock/host1-R-00000001 /shared_lock/host1-R-00000001 /shared_lock/host1-R-00000001 /shared_lock/host1-R-00000001 Create a node like /shared_lock/host2 -w-00000002.

(2) To determine the read/write sequence, different transactions can simultaneously perform read/write operations on a data object, while the update operation must be carried out when no transaction is currently performing read/write operations. Zookeeper is used to determine the distributed read/write sequence, which is roughly divided into four steps.

1. After the node is created, obtain all child nodes under the /shared_lock node, and register and listen for changes on the node.

2. Determine the sequence of the node number among all child nodes.

3. For read requests: If no child node with a lower serial number or all child nodes with a lower serial number are read requests, the user successfully obtains the shared lock and starts to read the logic. If there is a write request, the user waits. For write requests: wait if it is not the child node with the smallest ordinal number.

4. After receiving the Watcher notification, repeat Step 1.

(3) Release the lock. The process of release the lock is consistent with that of the exclusive lock.

The above shared lock implementation scheme can meet the requirements of competing locks in general distributed clusters. However, if the machine scale increases, some problems will occur. The following focuses on Step 3 to determine the read and write sequence.

Analyze the situation as shown in the figure above

1. Host1 reads data first and then deletes /shared_lock/host1-R-00000001.

2. The remaining four machines are notified that the node has been removed, and then retrieve a new list of child nodes from the /shared_lock node.

3. Each machine determines its own read/write sequence. When Host2 detects that its own serial number is the smallest, it performs write operations, and the rest machines continue to wait.

4. Continued…

It can be seen that after host1 client removes its own shared lock, Zookeeper sends a Watcher notification to all machines for the change of child nodes. However, it has no effect on other machines except host2. A large number of Watcher notifications and child node list retrieval operations will be repeated, which will cause the network network impact and network overhead. More seriously, if there are multiple nodes corresponding to the client to complete the transaction at the same time or transaction interruption caused by the node hour, The Zookeeper server sends a large number of event notifications to all other clients in a short period of time. This is known as herding.

The following changes can be made to avoid herding.

1. When the client invokes the CREATE interface, it is common to use a temporary sequence node similar to /shared_lock/[Hostname] -request type-serial number.

2. The client calls the getChildren interface to get a list of all the children that have been created (without registering any Watcher).

3. If the shared lock cannot be obtained, call exist interface to register Watcher with its smaller node. For read requests: Register Watcher listeners with the last write request node smaller than your own sequence number. For write requests: Register Watcher listeners with the last node with a lower sequence number than your own.

4. Wait for the notification from Watcher and go to Step 2.

The main change of this scheme lies in: each lock competitor only needs to pay attention to the existence of the node whose serial number is smaller than his /shared_lock node.

2.8 Distributed Queue

Distributed queues can be simply divided into a first-in, first-out queue model and a Barrier model that waits for queue elements to gather and arrange processing execution uniformly.

FIFO first in, first out, first into the queue after the request operation is completed, will begin to process the following request. A FIFO queue is similar to a full write shared model in that all clients create a temporary node under/queue_FIFo, such as/queue_FIFo /host1-00000001.

After creating a node, perform the following steps.

1. Get all the children of/queue_FIFo by calling the getChildren interface, that is, get all the elements in the queue.

2. Determine the sequence of the node number among all child nodes.

3. If your serial number is not the smallest, then you need to wait and register Watcher listening with the last node whose serial number is lower than your own.

4. After receiving the Watcher notification, repeat Step 1.

/queue_barrier () = /queue_barrier () = /queue_barrier () = /queue_barrier () All clients create a temporary node under /queue_barrier, such as /queue_barrier/host1.

After creating a node, perform the following steps.

1. Call getData to obtain the data content of the /queue_barrier node, for example, 10.

2. Get all children under /queue_barrier by calling getChildren and register Watcher listening for changes to child nodes.

3. Count the number of child nodes.

4. If the number of child nodes is less than 10, wait.

5. After receiving the Wacher notification, repeat Step 3.

Third, summary

This blog explains how to use the features of Zookeeper to complete typical applications, showing the powerful role of Zookeeper in solving distributed problems. Based on Zookeeper’s guarantee of distributed data consistency and its features, developers can build their own distributed systems. Thank you for watching