Communication protocol

Based on TCP/IP protocol, ZK has realized its own communication protocol to complete the network communication between the client and the server, the server and the server. The overall design of ZK communication protocol is very simple.

The client initiates a connection and sends a handshake packet for timeout negotiation. After the negotiation succeeds, the client returns a session ID and timeoout value. During the communication, ping packets must be sent within the timeout range. The basic rule of the communication protocol between the ZooKeeper client and server is to send a request for a response. And do different actions according to the response.

Send data in the following format:

  • Message Length + xID + request.xid Each request must be unique. The message length and xID are both 4 bytes, and the command length is 4 bytes and must be the first 4 bytes of the request.
  • The command is a number from 1 to 11. The close command is -11
  • Special requests have fixed Xids :watch_xid is fixed to -1,ping_xid is fixed to -2, and auth_xid is fixed to -4. For common requests, xIDS are accumulated once each request starts from 0.

The response data is:

  • Message length +header+ Response. The message length is 4 bytes, indicating the total length of header+ Response.
  • Header is xID, zxID,err is 4,8, and 4. Response Varies according to the request type
  • According to the difference of xID in header, it can be divided into four types: watch,ping, Auth and data
  • According to these four types, the return message is an event, authentication, heartbeat, and request data, and the client responds accordingly.

Message structure

Handshake messages

The request message body

protocol_version+zxid+timeout+session_id+passwd_len+passwd+read_only. The corresponding byte length is 4,8,4,8,4,16,1. The value of “timeout” can be 0, and the value of “password” can be any 16 characters. Note: The handshake package has no XID and command

The response message body

protocol_version+timeout+session_id+passwd_len+passwd+read_only. Note: The handshake response package does not have headers.

Results show

2020-04-21 19:26:53.990 cms_watcher info info cms_watcher [pid:240] [thread4] [connection.py:646 _connect] Connecting to 30.3.3.60:9888, use_ssl: False 2020-04-21 19:26:53.990 localhost cms_watcher info info cms_watcher [pid:240] [thread-4] [connection.py:650 _connect] Using session_id: 144131667822575626 session_passwd: B '41 f366ef7005bc5c859b7fc56fa40872 19:26:53 2020-04-21. 990 localhost cms_watcher info info cms_watcher [pid: 240] [Thread-4] [connection.py:299 _submit] Sending request(xid=None): Connect(protocol_version=0, last_zxid_seen=12884901993, time_out=30000, session_id=144131667822575626, passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', Read_only =None) 2020-04-21 19:26:53.991 cms_watcher info cms_watcher [pid:240] [Thread-4] [connection.py:285 _invoke] Read response Connect(protocol_version=0, last_zxid_seen=0, time_out=30000, session_id=144131667822575626, passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', Read_only =False) 2020-04-21 19:26:53.991 cms_watcher info cms_watcher [pid:240] [Thread-4] [connection.py:694 _connect] Session created, session_id: 144131667822575626 session_passwd: 30000 b '41 f366ef7005bc5c859b7fc56fa40872 negotiated the session timeout: connect the timeout: 10000.0 read timeout: 20000.0 2020-04-21 19:26:53.991 localhost cms_watcher info Info cms_watcher [pid:240] [thread4] [client.py:463 _session_callback] test: cur state CONNECTED, old state CONNECTINGCopy the code

Ping message

The request message body

Type (the ping packet has only one field which is the command value 11, and its complete send packet is 4 bytes long,4 bytes xid, and 4 bytes command.)

The response message body

Res_len +header+res (Ping response package can be confirmed by xID if only the header is removed)

Results show

2020-04-21 20:05:03.971 localhost cms_watcher info info cms_watcher [PID :240] [thread-4] [connection.py:603 _connect_attempt] test: Send ping 2020-04-21 20:05:03.971 localhost cms_watcher info info cms_watcher [pid:240] [thread-4] [connection.py:490 _send_ping] test: Send ping 2020-04-21 20:05:03.971 localhost cms_watcher info info cms_watcher [PID :240] [thread-4] [connection.py:299 _submit] Sending request(xid=-2): Ping() 2020-04-21 20:05:03.973 localhost cms_watcher info info cms_watcher [pid:240] [thread-4] [connection.py:606 _connect_attempt] test: Read socket 2020-04-21 20:05:03.973 localhost cms_watcher info info cms_watcher [PID :240] [thread-4] [connection.py:415 _read_socket] test: Received PingCopy the code

Getdata message

The request message body

Type +path_len+path+watcher type=4. Path_len is 4 bytes, which is the length of path. Path indicates the path to be queried. Determines whether an event is registered. The value is 1 or 0.1 bytes

The response message body

Data_len +data+stat Data_len is a data length of 4 bytes. Stat consists of 8,8,8,8,4,4, 8,4,4,8 bytes.

Results show

2020-04-21 20:25:13.460 localhost cms_watcher info info cms_watcher [PID :9078] [thread-4] [connection.py:610 _connect_attempt] test: Send Request 2020-04-21 20:25:13.460 Localhost cms_watcher info info cms_watcher [PID :9078] [Thread4] [connection.py:482 _send_request] test: Send Request xID 4 2020-04-21 20:25:13.460 Localhost cms_watcher info info cms_watcher [PID :9078] [Thread-4] [connection.py:299 _submit] Sending request(xid=4): GetData(path='/cms/config/items/item.cts_cfg', watcher=<bound method DataWatch._watcher of <kazoo.recipe.watchers.DataWatch object at 0x7ff860ba3438>>) 2020-04-21 20:25:13.461 localhost cms_watcher info Info cms_watcher [PID :9078] [thread-4] [connection.py: 606_connect_attempt] test: Read socket 2020-04-21 20:25:13.461 localhost cms_watcher info info cms_watcher [PID :9078] [thread-4] [connection.py:448  _read_socket] test: Reading for header ReplyHeader(xid=4, zxid=17179869239, err=0)Copy the code

Serialization and deserialization

How does kazoo assemble and parse data according to the zK protocol

Request byte stream

The following code shows the process of serializing the request object into a Socket stream

def _submit(self, request, timeout, xid=None):
	"""Submit a request object with a timeout value and optional
	xid"""
	b = bytearray()
	if xid:
		b.extend(int_struct.pack(xid))
	if request.type:
		b.extend(int_struct.pack(request.type))
	b += request.serialize()
	self.logger.log(
		(BLATHER if isinstance(request, Ping) else logging.DEBUG),
		"Sending request(xid=%s): %s", xid, request)
	self._write(int_struct.pack(len(b)) + b, timeout)
Copy the code

As you can see from the above code, it first decides whether to send the XID field, the Type field (as described in the above protocol), and finally serializes the byte stream from the Request object, depending on the request. This request is kazoo/protocol/serialization. Py definition of each class instance Such as connection types:

class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
                         ' time_out session_id passwd read_only')):
    type = None

    def serialize(self):
        b = bytearray()
        b.extend(int_long_int_long_struct.pack(
            self.protocol_version, self.last_zxid_seen, self.time_out,
            self.session_id))
        b.extend(write_buffer(self.passwd))
        b.extend([1 if self.read_only else 0])
        return b

    @classmethod
    def deserialize(cls, bytes, offset):
        proto_version, timeout, session_id = int_int_long_struct.unpack_from(
            bytes, offset)
        offset += int_int_long_struct.size
        password, offset = read_buffer(bytes, offset)

        try:
            read_only = bool_struct.unpack_from(bytes, offset)[0] is 1
            offset += bool_struct.size
        except struct.error:
            read_only = False
        return cls(proto_version, 0, timeout, session_id, password,
                   read_only), offset
Copy the code

The response byte stream

The following code shows the deserialization of the Socket stream into an object

def _read_header(self, timeout):
	b = self._read(4, timeout)
	length = int_struct.unpack(b)[0]
	b = self._read(length, timeout)
	header, offset = ReplyHeader.deserialize(b, 0)
	return header, b, offset
Copy the code

From the above code, we can see that the socket first reads 4 bytes. According to the above protocol, we know that these 4 bytes are data_len, which is the packet size. Then we continue to read byte streams of that size based on Len, and finally parse into specific objects.

class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
    @classmethod
    def deserialize(cls, bytes, offset):
        """Given bytes and the current bytes offset, return a
        :class:`ReplyHeader` instance and the new offset"""
        new_offset = offset + reply_header_struct.size
        return cls._make(
            reply_header_struct.unpack_from(bytes, offset)), new_offset
Copy the code