An overview of the

ZK provides publish/subscribe functionality for distributed data. A typical publish/subscribe model system defines a one-to-many subscription relationship that allows multiple subscribers to listen to a topic object at the same time and notify all subscribers when the topic object’s state changes. The Watcher mechanism was introduced in ZK to implement this distributed notification capability.

ZK allows clients to register a Watcher listener with the server. When the Watcher is triggered by some specified event on the server, an event notification is sent to the specified client to implement distributed notification.


The general process is that the Client registers the Watcher with ZK. If the registration is successful, the corresponding Watcher is stored locally. When a Watcher event is triggered by the ZK server, a notification is sent to the client, which retrieves the corresponding Watcher from the ClientWatchManager for callback.

Watcher interface

What is Watcher after all that talk? What’s the use?

/ * * * This interface specifies the public interface an event handler class must
 * implement. A ZooKeeper client will get various events from the ZooKeeper
 * server it connects to. An application using such a client handles these
 * events by registering a callback object with the client. The callback object  * is expected to be an instance of a class that implements Watcher interface. * / @InterfaceAudience.Public public interface Watcher {  void process(WatchedEvent event); } Copy the code

As long as you register a listener with the ZK server via the interface’s implementation object, the process method is called back when a ZK server notifies the Client of an event.

WatchedEvent

So what’s the catch with Watchet Ent?

public class WatchedEvent {
   / * *    * Enumeration of states the ZooKeeper may be at the event
* /
    private final KeeperState keeperState;
 / * * * Enumeration of types of events that may occur on the ZooKeeper * /  private final EventType eventType;  private String path; } Copy the code

KeeperState and EventType are enumerated classes that represent notification status and EventType, respectively. Path is the path that the client listens to.

A common combination of KeeperState and EventType

KeeperState EventType The trigger condition instructions
SyncConnected None(-1) The client successfully establishes a session with the server. Procedure The client and server are connected
SyncConnected NodeCreated(1) Watcher listens for the corresponding data node to be created The client and server are connected
SyncConnected NodeDeleted(2) Watcher listens for the corresponding data node to be deleted The client and server are connected
SyncConnected NodeDataChanged(3) Watcher listens for changes in the content of the corresponding data node (Data content and data version number) The client and server are connected
SyncConnected NodeChildrenChanged(4) Watcher listens on the corresponding data nodeChild node listchange The client and server are connected

As for the NodeDataChanged event type, the changes here include the data content of the node as well as the datversion number of the data. So as long as a client calls the data update interface, Regardless of whether the data content changes, datSpanning will change, thus triggering the monitoring of the corresponding Watcher. This avoids the typical optimistic lock ABA problem.

WatcherEvent

We can find such a method in Watchet Dev

 / * *     *  Convert WatchedEvent to type that can be sent over network
* /
    public WatcherEvent getWrapper(a) {
        return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
 } Copy the code

Broadly speaking, a WatcherEvent and a WatchedEvent represent the same thing, both encapsulating server events. WatchedEvent is an object for logical processing, while WatcherEvent is an entity object for transport. As you can see from the above code, the parameters to create a WatcherEvent are the values of the various properties in the WatchedEvent.

People.apache.org/~larsgeorge… You can see that it implements the Record interface

public class WatcherEvent
extends Object
implements org.apache.jute.Record
Copy the code

Serialization and de-sequence methods are defined in the Record interface

@InterfaceAudience.Public
public interface Record {
    void serialize(OutputArchive archive, String tag) throws IOException;
    void deserialize(InputArchive archive, String tag) throws IOException;
}
Copy the code

Related components


The related process

Generalizations can be divided into three processes

  • The client registers Watcher
  • The server handles Watcher
  • The client calls back Watcher

The client registers Watcher

We can pass a default Watcher into the constructor when we create a ZK client instance object

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
Copy the code

The Watcher in the parameter will be saved in ZKWatchManager as the default Watcher for the entire session

watchManager.defaultWatcher = watcher;
Copy the code

In addition, ZK client can also through the getData, getChildren, exist three interfaces to ZK server register Watcher


Let’s use the getData interface for analysis

public byte[] getData(final String path, Watcher watcher, Stat stat){
.}
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return getData(path, getDefaultWatcher(watch), stat);
} Copy the code

If our watch argument is true, then getDefaultWatcher is going to get the default Watcher we passed in when we created Zookeeper

 private Watcher getDefaultWatcher(boolean required) {
        if (required) {
            if(watchManager.defaultWatcher ! =null) {
                return watchManager.defaultWatcher;
            } else {
 throw new IllegalStateException("Default watcher is required, but it is null.");  }  }  return null;  } Copy the code

Here is the complete getData code

 public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
 // Create a watch registration for the data type  WatchRegistration wcb = null;  if(watcher ! =null) {  wcb = new DataWatchRegistration(watcher, clientPath);  }   // Change the client's change root directory path back to the normal path on the server side  final String serverPath = prependChroot(clientPath);   RequestHeader h = new RequestHeader();  h.setType(ZooDefs.OpCode.getData);  GetDataRequest request = new GetDataRequest();  request.setPath(serverPath);  // Flag whether there is watcher request.setWatch(watcher ! =null);  GetDataResponse response = new GetDataResponse();   ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);  if(r.getErr() ! =0) {  throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);  }  if(stat ! =null) {  DataTree.copyStat(response.getStat(), stat);  }  return response.getData();  } Copy the code
  1. Create a DataWatchRegistration
  2. Convert path (client side may change root directory to server side before sending request)
  3. Submit the request using ClientCnxn
public ReplyHeader submitRequest(
        RequestHeader h,
        Record request,
 Record response,  WatchRegistration watchRegistration,  WatchDeregistration watchDeregistration) throws InterruptedException {  ReplyHeader r = new ReplyHeader();  Packet packet = queuePacket(  h,  r,  request,  response,  null. null. null. null. watchRegistration,  watchDeregistration); .. return r;  }   Copy the code

Finally, the Request is added to the outgoingQueue

public Packet queuePacket(
        RequestHeader h,
        ReplyHeader r,
 Record request,  Record response,  AsyncCallback cb,  String clientPath,  String serverPath,  Object ctx,  WatchRegistration watchRegistration,  WatchDeregistration watchDeregistration) {  Packet packet = null;   packet = new Packet(h, r, request, response, watchRegistration);   synchronized (state) { .. outgoingQueue.add(packet);  }  } Copy the code

Finally, the request is sent to the server and the return result is processed in SendThread#readResponse

void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

 replyHdr.deserialize(bbia, "header");  switch (replyHdr.getXid()) {  case PING_XID: .. return;  case AUTHPACKET_XID: .. return;  // Handle server-side notifications  case NOTIFICATION_XID:  LOG.debug("Got notification session id: 0x{}". Long.toHexString(sessionId));  WatcherEvent event = new WatcherEvent();  event.deserialize(bbia, "response");   // convert from a server path to a client path  if(chrootPath ! =null) {  String serverPath = event.getPath();  if (serverPath.compareTo(chrootPath) == 0) {  event.setPath("/");  } else if (serverPath.length() > chrootPath.length()) {  event.setPath(serverPath.substring(chrootPath.length()));  } else {  LOG.warn("Got server path {} which is too short for chroot path {}.". event.getPath(), chrootPath);  }  }   WatchedEvent we = new WatchedEvent(event);  LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));  // Joins the event queue and is processed by EventThread  eventThread.queueEvent(we);  return;  default:  break;  }   // Remove the Pacjet  Packet packet;  synchronized (pendingQueue) {  if (pendingQueue.size() == 0) {  throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());  }  packet = pendingQueue.remove();  }  / * * Since requests are processed in order, we better get a response  * to the first request! * /  try { .. } finally {  // Save Watcher in ClientWatchManager  finishPacket(packet);  }  } Copy the code

What did you do

  1. Deserialize, get the request headerXIDDetermine if the server is notified, if so, add it to the event queue, and let EventThread handle it
  2. Remove Packet from outgoingQueue.
  3. Call the finishPacket function to do some subsequent processing
 protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if(p.watchRegistration ! =null) {
            p.watchRegistration.register(err);
        }
.. } Copy the code

Finally, go back to WatchRegistration and register the corresponding Watcher with the corresponding Map

>.
,>

The server handles Watcher

Let’s start with a few major component classes


WatchManager is the manager of ZK server Watcher. It internally manages two storage structures watchTable and watch2Paths, respectively storing Watcher in two dimensions.

  • WatchTable hosts Watcher from the granularity of the data node path.
  • Watch2Paths controls the granularity of the Watcher for events to trigger data nodes that need to be triggered.

ServerCnxn is a connection interface between Zookeeper client and load, representing a connection between client and server. Its default implementation is NIOServerCnxn, and Netty based implementation NettyServerCnxn has been introduced since 3.4.0.

ServerCnxn also implements the Watcher interface, so we can think of it as a Watcher object.

The data node path and ServerCnxn are stored in WatchManager


The server receives the request from the client and determines in FinalRequestProcessor#processRequest whether the current request needs to be registered with Watcher.

case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
                path = getDataRequest.getPath();
 // Call the method that handles the getData request  rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);  requestPathMetricsCollector.registerRequest(request.type, path);  break;  } Copy the code
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
..        // If the client needs to register with Watcher, there is only a Boolean field in the request
        // Gets whether Watcher needs to be registered from the request
 byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);  return new GetDataResponse(b, stat);  } Copy the code
public byte[] getData(String path, Stat stat, Watcher watcher)  {
        return dataTree.getData(path, stat, watcher);
    }

public byte[] getData(String path, Stat stat, Watcher watcher)  {
  synchronized (n) {  n.copyStat(stat);  if(watcher ! =null) {  DataWatches is the instance of the IWatchManager interface  dataWatches.addWatch(path, watcher);  }  data = n.data;  }  updateReadStat(path, data == null ? 0 : data.length);  return data;  } Copy the code

It will eventually be placed into watchTable and watch2Paths for storage

 @Override
    public boolean addWatch(String path, Watcher watcher) {
        return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE);
    }

 @Override  public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {  if (isDeadWatcher(watcher)) {  return false;  }  // Select Set from Set  Set<Watcher> list = watchTable.get(path);  if (list == null) {  list = new HashSet<>(4);  watchTable.put(path, list);  }  list.add(watcher);  //   Set<String> paths = watch2Paths.get(watcher);  if (paths == null) {  paths = new HashSet<>();  watch2Paths.put(watcher, paths);  }   watcherModeManager.setWatcherMode(watcher, path, watcherMode);  return paths.add(path);  } Copy the code

Watcher of the trigger

NodeDataChange is triggered when the data content of our node or dataVersion of the node changes.

So we can take a look at org. Apache. Zookeeper. Server DataTree# setData method

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
 }  byte[] lastdata = null;  synchronized (n) {  lastdata = n.data;  nodes.preChange(path, n);  n.data = data;  n.stat.setMtime(time);  n.stat.setMzxid(zxid);  n.stat.setVersion(version);  n.copyStat(s);  nodes.postChange(path, n);  }  .. updateWriteStat(path, dataBytes);  // Call the IWatchManager method  dataWatches.triggerWatch(path, EventType.NodeDataChanged);  return s;  } Copy the code
 @Override
    public WatcherOrBitSet triggerWatch(String path, EventType type) {
        return triggerWatch(path, type, null);
    }

 @Override  public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {  // Encapsulate as WatchedEvent  WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);  Set<Watcher> watchers = new HashSet<>();  PathParentIterator pathParentIterator = getPathParentIterator(path);  synchronized (this) {  for (String localPath : pathParentIterator.asIterable()) {  Set<Watcher> thisWatchers = watchTable.get(localPath);  / / without listening  if (thisWatchers == null || thisWatchers.isEmpty()) {  continue;  }  Iterator<Watcher> iterator = thisWatchers.iterator();  while (iterator.hasNext()) {  Watcher watcher = iterator.next();  WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);  if (watcherMode.isRecursive()) {   } else if(! pathParentIterator.atParentPath()) { watchers.add(watcher);  if(! watcherMode.isPersistent()) { / / remove  iterator.remove();  Set<String> paths = watch2Paths.get(watcher);  if(paths ! =null) {  // Remove from watch2Paths  paths.remove(localPath);  }  }  }  }   }  }  for (Watcher w : watchers) {  if(supress ! =null && supress.contains(w)) {  continue;  }  // Call the process method  w.process(e);  } .. return new WatcherOrBitSet(watchers);  } Copy the code

Had mentioned above, ServerCnxn implements the Watcher interface, we see org. Apache. Zookeeper. Server NIOServerCnxn# process

@Override
    public void process(WatchedEvent event) {
      // The XID in the request header is set to -1, as mentioned above in sendThread. readResponse
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L.0);
     
 // WatchedEvent becomes WatcherEvent  WatcherEvent e = event.getWrapper();  // Send notifications to the client  sendResponse(h, e, "notification".null.null, ZooDefs.OpCode.error);  } Copy the code

The basic flow

  • Encapsulation WatchedEvent
  • fromwatchTableFind the corresponding Watcher in thewatchTablewatch2PathsThe associated Watcher and path in theYou can only trigger it once)
  • callprocessMethods.

The client calls back Watcher

Let’s start with the EventThread class


Inherit from Thread, use LinkedBlockingQueuewaitingEvents to hold the events to be processed, and then the run method will continuously fetch them from the queue for processing.

We already know that SendThread#readResponse is handled in the client (this code also appears when the client registers Watcher above).

case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}".                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
  // convert from a server path to a client path  if(chrootPath ! =null) {  String serverPath = event.getPath();  if (serverPath.compareTo(chrootPath) == 0) {  event.setPath("/");  } else if (serverPath.length() > chrootPath.length()) {  event.setPath(serverPath.substring(chrootPath.length()));  } else {  LOG.warn("Got server path {} which is too short for chroot path {}.". event.getPath(), chrootPath);  }  }   WatchedEvent we = new WatchedEvent(event);  LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));  // Joins the event queue and is processed by EventThread  eventThread.queueEvent(we);  return; Copy the code

Joins the waitingEvents queue

public void queueEvent(WatchedEvent event) {
            queueEvent(event, null);
        }

        private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
 if (event.getType() == EventType.None && sessionState == event.getState()) {  return;  }  sessionState = event.getState();  final Set<Watcher> watchers;  if (materializedWatchers == null) {  // Fetching the corresponding Watcher from clientWatchManager also removes the Watcher from the corresponding Map  // One of them is disposable  watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());  } else {  watchers = new HashSet<Watcher>();  watchers.addAll(materializedWatchers);  }  WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);  // Add it to waitingEvents and wait for the run method to process it  waitingEvents.add(pair);  } Copy the code

Run method

 public void run(a) {
            try {
                isRunning = true;
                while (true) {
                    Object event = waitingEvents.take();
 if (event == eventOfDeath) {  wasKilled = true;  } else {  processEvent(event);  } .. }}  }   private void processEvent(Object event) {  try {  if (event instanceof WatcherSetEventPair) {  // each watcher will process the event  WatcherSetEventPair pair = (WatcherSetEventPair) event;  for (Watcher watcher : pair.watchers) {  try {  // Call the process method for serial synchronization  watcher.process(pair.event);  } catch (Throwable t) {  LOG.error("Error while calling watcher.", t);  }  } }}..  } Copy the code

conclusion

The characteristics of the Watcher

  • Once: It is removed from storage by either client or server once triggered by Watcher.
  • Client serial execution: The process of serial execution is synchronized, and the entire client callback Watcher should not be affected by one Watcher
  • Lightweight: WatchedEvent is the smallest notification unit in the notification mechanism. It contains only three parts: notification status, event type, and node path. The content of the node will not be notified to the client in the way of notification, but after the client receives the notification, it takes the initiative to get data from the server.

Related articles

ZooKeeper data model

Compile and run the Zookeeper source code