The listening mechanism is an important feature of Zookeeper. For example, The high availability cluster and distributed lock implemented by Zookeeper take advantage of this feature.

If the node object/information monitored in Zookeeper changes, the monitoring mechanism will be triggered to notify the registrant.

Register listening mechanism

Create a client and create a default listener

To create a ZooKeeper client instance, you need the following parameters.

new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
Copy the code

The meanings of the three parameters are as follows:

ConnectString Server ADDRESS sessionTimeout: indicates the timeout period. Watcher: indicates the monitor

This Watcher will serve as the context for the entire ZooKeeper session and will be kept in the client ZKWatchManager defaultWatcher, == after the monitoring of a node or information is enabled, but no additional monitors are specified ==, The monitor’s methods are called by default.

Performs special listening on the specified node

In addition, the ZooKeeper client can also register Watcher with the ZooKeeper server through getData, EXISTS and getChildren interfaces, thus conveniently adding Watch events in different situations:

getData(String path, Watcher watcher, Stat stat)
Copy the code

Zookeeper allows the monitoring mechanism to take effect only after the client is successfully connected to Zookeeper. Only four types of events can be monitored.

  1. Increase of nodes
  2. Deletion of nodes
  3. Changes to the information carried by the node
  4. Changes to the children of a node

The underlying principle

The Zookeeper listening mechanism is implemented in the observer mode.

In the Observer mode, one of the most important attributes is the need for a list to hold the observer.

In the Zookeeper listening mechanism, this list is also implemented, and ZKWatchManager and WatchManager are maintained on the client side and the server side respectively.

Client Watch registration implementation process

When sending a session request for a Watch event, the Zookeeper client does two main things

  • Mark the session as a request with a Watch event
  • Store the Watch event to ZKWatchManager

Take the getData interface as an example. When sending a request with aWatch event, the client will first mark the session as an event request with Watch monitoring, and then save the corresponding relationship between the Watcher event and the node through the DataWatchRegistration class:

public byte[] getData(final String path, Watcher watcher, Stat stat){
    ...
    WatchRegistration wcb = null;
    // If watcher is not null, there is a Watcher object
    if(watcher ! =null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }
    RequestHeader h = new RequestHeader();
    // mark the request with a listenerrequest.setWatch(watcher ! =null); . GetDataResponse response =new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
Copy the code

When the client sends a request to the server, it encapsulates the request into a Packet object and adds it to an outgoingQueue:

public Packet queuePacketRequestHeader H, ReplyHeader R... {
    Packet packet = null; . packet =newPacket(h, r, request, response, watchRegistration); . outgoingQueue.add(packet); .return packet;
}
Copy the code

Finally, the ZooKeeper client sends the request to the server. Call the readResponse method in the SendThread thread class that handles the server response to receive the server’s callback, and finally execute the finishPacket () method to register Watch with ZKWatchManager:

private void finishPacket(Packet p) {
    int err = p.replyHeader.getErr();
    if(p.watchRegistration ! =null) { p.watchRegistration.register(err); }... }Copy the code

Service end Watch registration implementation process

The Zookeeper server processes the Watch event as follows:

  • Resolves whether the received request has a Watch registration event
  • Store the corresponding Watch event to the WatchManager

Process of triggering the Watch event on the server

Take the setData interface (node data content changes) event as an example.

Inside the setData method after perform the changes to the node data, will call WatchManager. TriggerWatch method trigger data changes.

Set<Watcher> triggerWatch(String path, EventType type...) {
    WatchedEvent e = new WatchedEvent(type,
                                      KeeperState.SyncConnected, path);
    Set<Watcher> watchers;
    synchronized (this) { watchers = watchTable.remove(path); .for (Watcher w : watchers) {
                Set<String> paths = watch2Paths.get(w);
                if(paths ! =null) { paths.remove(path); }}}for (Watcher w : watchers) {
        if(supress ! =null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}
Copy the code
watcherswithpathsThe relationship between:

Bidirectional binding relationships.

Since zK’s monitoring mechanism is one-time (triggered and destroyed), when PATH2 triggered the monitoring event, it immediately destroyed the monitoring event from the watchTable to obtain watchers. And path2 node events have already started, so remove path2 from each watcher corresponding paths. Then call the process() function for each Watcher in Watchers to complete a listener callback.

Procedure for handling client callback

SendThread

This method is used by the client to process a unified request from the server. Replyhdr. getXid(), with a value of -1, responds with notification type information and finally calls EventThread.QueueEvent () to hand the event over to eventThread.

if (replyHdr.getXid() == -1) {... WatcherEvent event =new WatcherEvent();
    event.deserialize(bbia, "response"); .if(chrootPath ! =null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/"); . event.setPath(serverPath.substring(chrootPath.length())); . } WatchedEvent we =newWatchedEvent(event); . eventThread.queueEvent( we ); }Copy the code

EventThread

According to the triggered event type, go to the listener list to query the listener corresponding to the corresponding path, and put it in the set result. Since the Zookeeper event is triggered and destroyed once, the listener should also be removed from the watchManager.

public Set<Watcher> materialize(...).
{
	Set<Watcher> result = newHashSet<Watcher>(); .switch (type) {
    ...
	case NodeDataChanged:
	case NodeCreated:
	    synchronized (dataWatches) {
	        addTo(dataWatches.remove(clientPath), result);
	    }
	    synchronized (existWatches) {
	        addTo(existWatches.remove(clientPath), result);
	    }
	    break; . }return result;
}
Copy the code

After the listener is removed, the query Watcher is placed in the corresponding waitEvents task queue, and the run method in the EventThread class is called to process the event.

Handling events is nothing more than executing the process() function we wrote when we registered the event.

conclusion

Zookeeper’s listening mechanism is based on the observer pattern. This is done by maintaining a table (zkWatcherManager, watcherManager) on both the client and the server to store listener objects.

The process of registering listeners is to register listeners in the process of calling the interface. First, a registration management is carried out on the local client, and then after passing the listener to the server, it is registered on the server according to whether there are listeners or not.

The procedure for triggering a listening event:

  1. Server, via the triggered pathpathThrough thewatcherManagerFind the corresponding collection of listeners by callingprocess()Method to send information to the original client of each listener;
  2. The client, by checking whether it is a notification event, passeszkWatcherManagerFind the corresponding collection of listeners by callingprocess()Method will perform the corresponding reply processing.

Watcher’s client and server implementations use different implementations

When the listener event is triggered, both the client and the server do almost the same thing (find the Watcher through Path and then execute process()), but they do different things. The Watcher process() on the server sends the Path and the triggered event to the client. It then finds watcher again to execute process() via path and event. In this case, the code executed is the response process()== that the developer needs to execute.

Think -> Why Zookeeper maintains two Watcher lists (zkWatcherManager + WatcherManager)?

Use contradiction to prove the excellence of this design.

  1. Suppose only the client maintains the Watcher list. When the server event is triggered, the server does not have the Watcher list. It does not know which clients subscribed to the event, so it can only send the event to all the clients, which wastes both bandwidth and resources for the client to process the response.
  2. Assumptions on the server maintenance only Watcher list, after the server-side event trigger, the service side sent to subscribe to the event of the client, the client will indeed because no Watcher object, and unable to perform the corresponding event response processing, will result in the service side the corresponding processing method, pass through the network, will aggravate the transmission pressure of the network.