0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry. This article is the second in a series that examines the MetaServer implementation mechanism and introduces the basic functions of MetaServer such as registration, storage, notification, and renewal.

0 x01 MetaServer registration

1.1 Leader entrance

Business startup for MetaServer starts with setLeaderProcessListener.

As mentioned earlier, the MetaServer cluster is elected and replicated internally based on Raft protocol and can be serviced externally as long as no more than 1⁄2 nodes fail.

The Raft protocol consists of three parts:

  • Leader Election
  • Log Replication
  • Safety

If you use JRaft, you need to implement its state machine, and in MetaServer, the implementation of JRaft FSM is the ServiceStateMachine class (more on Raft later).

After raft selects the MetaServer leader, the ServiceStateMachine calls setLeaderProcessListener, which in turn calls registerCurrentNode, This registers the current Meta node in the MetaServer.

raftServer.setLeaderProcessListener(new LeaderProcessListener() {
    @Override
    public void startProcess(a) {
        executorManager.startScheduler();
        PeerId leader = new PeerId(NetUtil.getLocalAddress().getHostAddress(),
            metaServerConfig.getRaftServerPort());
        registerCurrentNode();  
        raftServer.sendNotify(leader, "leader");
    }
Copy the code

1.2 registered

The act of registration is done by Registry. The registration interface is implemented as follows:

public interface Registry<T extends Node> {
    NodeChangeResult setNodes(List<T> nodes);
    NodeChangeResult register(T node);
    void cancel(String connectId, NodeType nodeType);
    void evict(a);
    void renew(T node, int duration);
    void getOtherDataCenterNodeAndUpdate(NodeType nodeType);
    DataCenterNodes getDataCenterNodes(NodeType nodeType);
    NodeChangeResult getAllNodes(NodeType nodeType);
    void pushNodeListChange(NodeType nodeType);
}
Copy the code

Concrete implementation examples are:

public class MetaServerRegistry implements Registry<Node> {  
    @Override
    public NodeChangeResult register(Node node) {
        StoreService storeService = ServiceFactory.getStoreService(node.getNodeType());
        returnstoreService.addNode(node); }}Copy the code

According to different node types, Registry obtains corresponding StoreService to complete adding node services. Here, the node type is “META”.

node = {MetaNode} "MetaNode IP 192.168.1.2 instead of = {}"
 nodeType = {Node$NodeType} "META"
 nodeUrl = {URL} "{URL address = '192.168.1.2 instead: 0}"
 dataCenter = "DefaultDataCenter"
 name = "192.168.1.2 instead"
 regionId = null
 nodeStatus = {Node$NodeStatus} "INIT"
Copy the code

StoreService is extracted from storeServiceMap in ServiceFactory.

For example, MetaNode, its corresponding storeService was MetaStoreService, so MetaStoreService was used to complete the storage.

storeServiceMap = {HashMap}  
 {Node$NodeType} "SESSION" -> {SessionStoreService} 
 {Node$NodeType} "META" -> {MetaStoreService} 
 {Node$NodeType} "DATA" -> {DataStoreService} 
Copy the code

1.3 Storage Service

Node was then stored in Repository by StoreService. For example, MetaStoreService was implemented as:

public class MetaStoreService implements StoreService<MetaNode> {
    @RaftReference(uniqueId = "metaServer")
    private RepositoryService<String, RenewDecorate<MetaNode>> metaRepositoryService;

    public NodeChangeResult addNode(MetaNode metaNode) {
        NodeChangeResult nodeChangeResult;
        String ipAddress = metaNode.getNodeUrl().getIpAddress();
        write.lock();
        try {
            // Store to repository (automatically synchronize to cluster via jraft)
            metaRepositoryService.put(ipAddress, new RenewDecorate(metaNode,
                RenewDecorate.DEFAULT_DURATION_SECS));

            // Trigger notification (need to notify data/session)
            nodeChangeResult = getNodeChangeResult();
            firePushDataListTask(nodeChangeResult, "addMetaNode");
            firePushSessionListTask(nodeChangeResult, "addMetaNode");
        } finally {
            write.unlock();
        }
        returnnodeChangeResult; }}Copy the code

The storage process is summarized as follows:

+------------------------------+ +-----------------------+ | Map(String, NodeRepository) | +--->+ metaRepositoryService +------->+ registry | | +-----------------------+ +------------------------------+ | | | Register +-------------------+ addNode +-----+-----------+ +------> | MetaServerRegistry| +--------> | MetaStoreService| +-------------------+ +-----+-----------+ | | | +-------------------+  +--------------+ +----------------------+ +----------> |TaskListenerManager+---> |TaskDispatcher| +---> |DataNodeChangePushTask| sendTaskEvent +-------------------+ +--------------+ +----------------------+Copy the code

On a mobile phone. See Synonyms at:

1.4 the Repository service

Repository is a classic concept that encapsulates data query and storage logic.

Repository is a separate layer between the domain layer and the data mapping layer (data access layer). Its existence makes the domain layer unaware of the existence of the data access layer, that is, to provide a collection like interface to the domain layer to access domain objects.

A Repository can be thought of as a Repository manager, where the domain layer tells the Repository what it needs and the Repository brings it to it, without knowing where it actually resides.

Here, Node is not persisted, but stored in memory. Therefore, it is important to note that although Repository stores a Node simply, how to ensure that the “state of a new Node in a Repository” can be extended to other nodes in the cluster in a distributed state? And make sure the data is consistent?

1.4.1 MetaRepositoryService

The storage within MetaRepositoryService is protected by Raft protocol within the cluster to ensure data consistency.

The back-end Repository can be seen as a state machine for SOFAJRaft, and any operations to the Map are synchronized within the cluster by Raft protocol to achieve consistency within the cluster. As you can see from the code, MetaRepositoryService annotates RaftService, which is part of Raft’s implementation.

To illustrate this, we’ll introduce Raft annotations in advance: RaftReference and RaftService.

These two annotations can be thought of as encapsulating the interface that Raft presents to Registry. RaftReference corresponds to the client proxy, and RaftService corresponds to the implementation of the server. Why would you do that? Because data consistency needs to be maintained, pure local calls must be converted to asynchronous network calls to ensure data consistency using raft protocol.

  • RepositoryServiceThe concrete implementation classes are added@RaftServiceAnnotations, thus stating that you are a server;
  • Anyone whoreference RepositoryServiceAll of them are added@RaftReference, so calledRepositoryServiceIs equivalent to a client call to the server;
  • Who added@RaftReferenceThe attributes of the annotation are replaced by the dynamic proxy class, whose proxy implementation is shown in the following tableProxyHandlerClass, that is, method calls, encapsulated asProcessRequest, sent to the RaftServer through the RaftClient.

Back to the MetaRepositoryService code, as follows:

@RaftService(uniqueId = "metaServer")
public class MetaRepositoryService extends AbstractSnapshotProcess
                          implements RepositoryService<String.RenewDecorate<MetaNode>> {
    @Autowired
    private NodeConfig                                 nodeConfig;

    private Map<String/*dataCenter*/, NodeRepository> registry          = new ConcurrentHashMap<>();
    private Set<String>                                snapShotFileNames = new HashSet<>();
}
Copy the code

1.4.2 put operation

When a new node is created, MetaRepositoryService performs the put operation. In the concrete PUT function call, you actually wrap the method call as ProcessRequest and send it to the RaftServer via the RaftClient. This ensures data consistency through Raft protocol.

@Override
public RenewDecorate<MetaNode> put(String ipAddress, RenewDecorate
       
         metaNode, Long currentTimeMillis)
        {
    try {
        String dataCenter = metaNode.getRenewal().getDataCenter();
        NodeRepository<MetaNode> metaNodeRepository = registry.get(dataCenter);
        if (metaNodeRepository == null) {
            NodeRepository<MetaNode> nodeRepository = new NodeRepository<>(dataCenter,
                new ConcurrentHashMap<>(), currentTimeMillis);
            // The put operation is actually invoked on the server
            metaNodeRepository = registry.put(dataCenter, nodeRepository); 
            if (metaNodeRepository == null) {
                metaNodeRepository = nodeRepository;
            }
        }
        metaNodeRepository.setVersion(currentTimeMillis);
        Map<String/*ipAddress*/, RenewDecorate<MetaNode>> metaNodes = metaNodeRepository
            .getNodeMap();
        RenewDecorate oldRenewDecorate = metaNodes.get(ipAddress);
        metaNodes.put(ipAddress, metaNode);
    } 
    return metaNode;
}
Copy the code

1.4.3 Node Data Stores

The storage of node data is essentially stored in an in-memory hash table, and its storage structure is as follows:

// RepositoryService RepositoryService
Map<String/*dataCenter*/, NodeRepository> registry;

// NodeRepository underlying storage
Map<String/*ipAddress*/, RenewDecorate<T>> nodeMap;
Copy the code

Once you store RenewDecorate into the Map, the entire node registration process is complete. How to combine with Raft protocol and synchronize data will be discussed later.

The logic for node removal is similar. Deleting node information from the Map also stores a change event to the queue.

The final result is as follows:

this = {MetaRepositoryService} 
 registry = {ConcurrentHashMap}  size = 1
  "DefaultDataCenter" -> {NodeRepository} 
 snapShotFileNames = {HashSet}  size = 1
Copy the code

1.5 JRAFT implements data consistency

The member list data is stored in Repository, which is wrapped by a consistency protocol layer and implemented as a state machine for SOFAJRaft, and all operations to Repository are synchronized to other nodes to manipulate the storage layer via Registry.

During synchronization, Jraft can directly call MetaRepositoryService to achieve internal data consistency.

This is the synchronization of other nodes, which is different from the previous section. In the previous section we had active storage nodes, and here we have passive synchronization.

1.6 Notifying Data and session

1.6.1 Notification Operations

FirePushDataListTask firePushSessionListTask notifies all data/ sessions of metaServer changes.

public class MetaStoreService implements StoreService<MetaNode> {

    @Override
    public NodeChangeResult addNode(MetaNode metaNode) {

            // Store to repository (automatically synchronize to cluster via jraft)
            metaRepositoryService.put(ipAddress, new RenewDecorate(metaNode,
                RenewDecorate.DEFAULT_DURATION_SECS));

            // Trigger notification (need to notify data/session), that's what we're talking about
            nodeChangeResult = getNodeChangeResult();
            firePushDataListTask(nodeChangeResult, "addMetaNode");
            firePushSessionListTask(nodeChangeResult, "addMetaNode"); }}Copy the code

FirePushDataListTask and firePushSessionListTask in turn send messages to taskListenerManager,

private void firePushDataListTask(NodeChangeResult nodeChangeResult, String nodeOperate) {
    TaskEvent taskEvent = new TaskEvent(nodeChangeResult, TaskType.DATA_NODE_CHANGE_PUSH_TASK);
    taskEvent.setAttribute(Constant.PUSH_NEED_CONFIRM_KEY, false);
    taskEvent.setAttribute(Constant.PUSH_TARGET_TYPE, NodeType.DATA);
    taskEvent.setAttribute(Constant.PUSH_TARGET_OPERATOR_TYPE, nodeOperate);
    taskListenerManager.sendTaskEvent(taskEvent);
}

private void firePushSessionListTask(NodeChangeResult nodeChangeResult, String nodeOperate) {
    //notify all session node
    TaskEvent taskEvent = new TaskEvent(nodeChangeResult, TaskType.DATA_NODE_CHANGE_PUSH_TASK);
    taskEvent.setAttribute(Constant.PUSH_TARGET_TYPE, NodeType.SESSION);
    taskEvent.setAttribute(Constant.PUSH_TARGET_OPERATOR_TYPE, nodeOperate);
    taskListenerManager.sendTaskEvent(taskEvent);
}
Copy the code

This is the trigger notification, so the data node, the session node gets notified.

How to notify data node, session node? Here with the help of the Listener, which calls on DefaultTaskListenerManager # sendTaskEvent.

1.6.2 Sending Notification Messages

TaskListenerManager is a class that distributes various notification messages.

public class DefaultTaskListenerManager implements TaskListenerManager {

    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for(TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); }}}Copy the code

In taskListenerManager. AddTaskListener (taskListener); A number of handlers have been registered to handle the message in the. You can see the logic in the following text of the run variable.

this.taskListeners = {ArrayListMultimap} 
 expectedValuesPerKey = 3
 map = {HashMap}  size = 4
  {TaskEvent$TaskType} "RECEIVE_STATUS_CONFIRM_NOTIFY_TASK" -> {ArrayList}  size = 1
    0 = {ReceiveStatusConfirmNotifyTaskListener} 
  {TaskEvent$TaskType} "PERSISTENCE_DATA_CHANGE_NOTIFY_TASK" -> {ArrayList}  size = 1
    0 = {PersistenceDataChangeNotifyTaskListener} 
  {TaskEvent$TaskType} "SESSION_NODE_CHANGE_PUSH_TASK" -> {ArrayList}  size = 1
    0 = {SessionNodeChangePushTaskListener} 
  {TaskEvent$TaskType} "DATA_NODE_CHANGE_PUSH_TASK" -> {ArrayList}  size = 1
    0 = {DataNodeChangePushTaskListener} 
Copy the code

The relevant message handler is called in sendTaskEvent to handle it.

1.6.3 Asynchronously Processing Messages

To process messages, we use the following examples:

The class DataNodeChangePushTaskListener is used to handle the DataNode related news.

public class DataNodeChangePushTaskListener implements TaskListener {
  
    private TaskDispatcher<String, MetaServerTask> dataSingleTaskDispatcher;
    private TaskDispatcher<String, MetaServerTask> sessionSingleTaskDispatcher;
  
    public void handleEvent(TaskEvent event) {

        NodeType nodeType = (NodeType) event.getAttribute(Constant.PUSH_TARGET_TYPE);
        switch (nodeType) {
            case SESSION:
                MetaServerTask sessionNodeChangePushTask = new DataNodeChangePushTask(
                    NodeType.SESSION, metaServerConfig);
                sessionNodeChangePushTask.setTaskEvent(event);
                sessionSingleTaskDispatcher.dispatch(sessionNodeChangePushTask.getTaskId(),
                    sessionNodeChangePushTask, sessionNodeChangePushTask.getExpiryTime());
                break;
            case DATA:
                MetaServerTask dataNodeChangePushTask = new DataNodeChangePushTask(NodeType.DATA,
                    metaServerConfig);
                dataNodeChangePushTask.setTaskEvent(event);
                dataSingleTaskDispatcher.dispatch(dataNodeChangePushTask.getTaskId(),
                    dataNodeChangePushTask, dataNodeChangePushTask.getExpiryTime());
                break; }}}Copy the code

TaskDispatcher dispatches asynchronous messages followed by TaskExecutors for asynchronous operations.

If be a DataNode, will call to DataNodeChangePushTask finally, it is to be executed by DataNodeSingleTaskProcessor.

The DataNodeChangePushTask is used to interact with each DataNode.

public class DataNodeChangePushTask extends AbstractMetaServerTask {
    private final SessionNodeService sessionNodeService;
    private final DataNodeService    dataNodeService;
    final private MetaServerConfig   metaServerConfig;
    final private NodeType           nodeType;
    private NodeChangeResult         nodeChangeResult;
    private Map<String, DataNode>    targetNodes; 

    public void execute(a) {
        switch (nodeType) {
            case SESSION:
                sessionNodeService.pushDataNodes(nodeChangeResult);
                break;
            case DATA:
                dataNodeService
                    .pushDataNodes(nodeChangeResult, targetNodes, confirm, confirmNodeIp);
                break; }}}Copy the code

For example, the Datanodomain Impl is used to complete node communication.

public class DataNodeServiceImpl implements DataNodeService {

    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private StoreService          dataStoreService;

    @Autowired
    private AbstractServerHandler dataConnectionHandler;

    @Override
    public void pushDataNodes(NodeChangeResult nodeChangeResult, Map<String, DataNode> targetNodes,
                              boolean confirm, String confirmNodeIp) {

        if(nodeChangeResult ! =null) {

            List<Throwable> exceptions = new ArrayList<>();
            NodeConnectManager nodeConnectManager = getNodeConnectManager();

            Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);

            // add register confirm
            StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
            DataCenterNodes dataCenterNodes = storeService.getDataCenterNodes();
            Map<String, DataNode> registeredNodes = dataCenterNodes.getNodes();

            for (InetSocketAddress address : connections) {
                try {
                    if(targetNodes ! =null && !targetNodes.isEmpty()) {
                        if(! targetNodes.keySet().contains(address.getAddress().getHostAddress())) {continue; }}else {
                        if(! registeredNodes.keySet().contains( address.getAddress().getHostAddress())) {continue;
                        }
                    }

                    Request<NodeChangeResult> nodeChangeRequestRequest = new Request<NodeChangeResult>() {
                        @Override
                        public NodeChangeResult getRequestBody(a) {
                            return nodeChangeResult;
                        }

                        @Override
                        public URL getRequestUrl(a) {
                            return newURL(address); }};// Communicate between nodes
                    Response response = dataNodeExchanger.request(nodeChangeRequestRequest);

                    if (confirm) {
                        Object result = response.getResult();
                        if (result instanceof CommonResponse) {
                            CommonResponse genericResponse = (CommonResponse) result;
                            if (genericResponse.isSuccess()) {
                                confirmStatus(address, confirmNodeIp);
                            } 
                        } 
                    }
                } 
            }
        }
    }
}  
Copy the code

1.6.4 Another way to generate notification

ExecutorManager’s pushNodeListChange checks periodically, generating notifications if necessary.

The DataConfirmStatusService node changes.

scheduler.schedule(
        new TimedSupervisorTask("CheckDataNodeListChangePush", scheduler, checkNodeListChangePushExecutor,
                metaServerConfig.getSchedulerCheckNodeListChangePushTimeout(), TimeUnit.SECONDS,
                metaServerConfig.getSchedulerCheckNodeListChangePushExpBackOffBound(),
                () -> metaServerRegistry.pushNodeListChange(NodeType.DATA)),
        metaServerConfig.getSchedulerCheckNodeListChangePushFirstDelay(), TimeUnit.SECONDS);
Copy the code

Such as

public class DataStoreService implements StoreService<DataNode> {

    public void pushNodeListChange(a) {
        NodeOperator<DataNode> fireNode;
        if((fireNode = dataConfirmStatusService.peekConfirmNode()) ! =null) {
            NodeChangeResult nodeChangeResult = getNodeChangeResult();
            Map<String, Map<String, DataNode>> map = nodeChangeResult.getNodes();
            Map<String, DataNode> addNodes = map.get(nodeConfig.getLocalDataCenter());
            if(addNodes ! =null) {
                Map<String, DataNode> previousNodes = dataConfirmStatusService.putExpectNodes(
                    fireNode.getNode(), addNodes);

                if(! previousNodes.isEmpty()) {// generate a notification
                    firePushDataListTask(fireNode, nodeChangeResult, previousNodes, true); }}// generate a notificationfirePushSessionListTask(nodeChangeResult, fireNode.getNodeOperate().toString()); }}}Copy the code

0x02 Node Is registered

2.1 DataApplication

2.1.1 DataConnectionHandler

When a DataApplication is started, the DataConnectionHandler first responds.

connected:, DataConnectionHandler (com.alipay.sofa.registry.server.meta.remoting.connection)
onEvent:, ConnectionEventAdapter (com.alipay.sofa.registry.remoting.bolt)
onEvent:, ConnectionEventListener (com.alipay.remoting)
run:, ConnectionEventHandler$1 (com.alipay.remoting)
runWorker:, ThreadPoolExecutor (java.util.concurrent)
run:, ThreadPoolExecutor$Worker (java.util.concurrent)
run:, Thread (java.lang)
Copy the code

It just feels like a setup.

public class DataConnectionHandler extends AbstractServerHandler implements NodeConnectManager {
    private Map<String/*connectId*/, InetSocketAddress> connections = new ConcurrentHashMap<>();

    @Override
    public void connected(Channel channel) throws RemotingException {
        super.connected(channel); addConnection(channel); }}Copy the code

2.1.2 DataNodeHandler

And then the DataNodeHandler will respond.

reply:, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
handleRequest:, SyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
dispatchToUserProcessor:, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
doProcess:, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
run:, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
runWorker:, ThreadPoolExecutor (java.util.concurrent)
run:, ThreadPoolExecutor$Worker (java.util.concurrent)
run:, Thread (java.lang)
Copy the code

The specific code is as follows.

public class DataNodeHandler extends AbstractServerHandler<DataNode> {
    @Autowired
    private Registry            metaServerRegistry;

    public Object reply(Channel channel, DataNode dataNode) {
        NodeChangeResult nodeChangeResult;
        nodeChangeResult = metaServerRegistry.register(dataNode);
        returnnodeChangeResult; }}Copy the code

The next step is to call the Store service to add nodes.

public NodeChangeResult register(Node node) {
    StoreService storeService = ServiceFactory.getStoreService(node.getNodeType());
    return storeService.addNode(node);
}
Copy the code

The code goes to DataStoreService, which again calls RepositoryService to register and store the node.

public class DataStoreService implements StoreService<DataNode> {
    @Autowired
    private TaskListenerManager                                taskListenerManager;

    @RaftReference(uniqueId = "dataServer")
    private RepositoryService<String, RenewDecorate<DataNode>> dataRepositoryService;

    @RaftReference(uniqueId = "dataServer")
    private NodeConfirmStatusService<DataNode>                 dataConfirmStatusService;

    public NodeChangeResult addNode(DataNode dataNode) {
        NodeChangeResult nodeChangeResult;
        String ipAddress = dataNode.getNodeUrl().getIpAddress();

            dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode,
                RenewDecorate.DEFAULT_DURATION_SECS));
            renew(dataNode, 30);
            nodeChangeResult = getNodeChangeResult();
            dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD);

        returnnodeChangeResult; }}Copy the code

DataConfirmStatusService. PutConfirmNode are stored at the same time a change event to the queue, it is mainly used for data delivery, consumption.

@Override
public void putConfirmNode(DataNode node, DataOperator nodeOperate) {
	expectNodesOrders.put(new NodeOperator(node, nodeOperate));
}
Copy the code

2.1.3 DataConfirmStatusService

DataConfirmStatusService is also annotated with RaftService, which indicates that it is a storage synchronized by Raft protocol.

The following storage structures will be synchronized.

  • expectNodesOrdersUsed to store node change events;
  • expectNodesNodes used to store change events that need to be acknowledged, that isNodeOperatorOnly after the confirmation of other nodes is obtained, theexpectNodesOrdersRemove;
  • snapShotFileNamesYes Snapshot file name;
@RaftService(uniqueId = "dataServer")
public class DataConfirmStatusService extends AbstractSnapshotProcess
                                      implements NodeConfirmStatusService<DataNode> {
  
	private ConcurrentHashMap<DataNode/*node*/, Map<String/*ipAddress*/, DataNode>> expectNodes             = new ConcurrentHashMap<>();
	private BlockingQueue<NodeOperator>                                               expectNodesOrders       = new LinkedBlockingQueue();
	private Set<String>   snapShotFileNames 
      
  public void putConfirmNode(DataNode node, DataOperator nodeOperate) {
      expectNodesOrders.put(new NodeOperator(node, nodeOperate));
  }    
    
  public NodeOperator<DataNode> peekConfirmNode(a) {
      returnexpectNodesOrders.peek(); }}Copy the code

2.1.4 consumption

ExpectNodesOrders are stored in BlockingQueue expectNodesOrders, where are they to be consumed? Look at the source code found, not as imagined using a thread blocking read.

ExecutorManager starts a timed task that polls the queue for data. Periodically, the registration #pushNodeListChange method is called to get the head node of the queue and consume it. Data and Session each correspond to a task.

The specific process is shown in the figure below:

  1. ExpectNodesOrders is first fetched, and if Null is returned;
  2. Get the list of current data center nodes and store it in expectNodes.
  3. Submit the node change push task (firePushXxListTask).
  4. Processing tasks, that is, call XxNodeService pushXxxNode method, that is, through ConnectionHandler to get all node connections, send the node list;
  5. After receiving a reply, it is called if confirmation is requiredStoreService#confirmNodeStatusMethod, remove the node from expectNodes;
  6. After all the nodes are removed from expectNodes, remove the operation from expectNodesOrders.

The details can be detailed as follows.

2.1.4.1 Periodically Submitting a Node Change Push task

This section corresponds to the first three steps in the preceding steps.

The MetaServerRegistry’s pushNodeListChange function is called periodically to handle a Data scheduled task:

scheduler.schedule(
        new TimedSupervisorTask("CheckDataNodeListChangePush", scheduler, checkNodeListChangePushExecutor,
                metaServerConfig.getSchedulerCheckNodeListChangePushTimeout(), TimeUnit.SECONDS,
                metaServerConfig.getSchedulerCheckNodeListChangePushExpBackOffBound(),
                () -> metaServerRegistry.pushNodeListChange(NodeType.DATA)),
        metaServerConfig.getSchedulerCheckNodeListChangePushFirstDelay(), TimeUnit.SECONDS);
Copy the code

PushNodeListChange in MetaServerRegistry is defined as follows:

@Override
public void pushNodeListChange(NodeType nodeType) {
    StoreService storeService = ServiceFactory.getStoreService(nodeType);
    if(storeService ! =null) { storeService.pushNodeListChange(); }}Copy the code

DataStoreService does the following here.

  1. ExpectNodesOrders is first fetched, and if Null is returned;
  2. Get the list of current data center nodes and store it in expectNodes.
  3. Submit the node change push task (firePushXxListTask).
public void pushNodeListChange(a) {
    NodeOperator<DataNode> fireNode;
    ExpectNodesOrders (expectNodesOrders);
    if((fireNode = dataConfirmStatusService.peekConfirmNode()) ! =null) {
        NodeChangeResult nodeChangeResult = getNodeChangeResult();
        Map<String, Map<String, DataNode>> map = nodeChangeResult.getNodes();
        Map<String, DataNode> addNodes = map.get(nodeConfig.getLocalDataCenter());
        if(addNodes ! =null) {
            // Get the list of current data center nodes and store it in expectNodes;
            Map<String, DataNode> previousNodes = dataConfirmStatusService.putExpectNodes(
                fireNode.getNode(), addNodes);
            // Submit the node change push task (firePushXxListTask);
            if(! previousNodes.isEmpty()) { firePushDataListTask(fireNode, nodeChangeResult, previousNodes,true); }}// Submit the node change push task (firePushXxListTask);firePushSessionListTask(nodeChangeResult, fireNode.getNodeOperate().toString()); }}Copy the code

This submits the Task to the Task Task.

2.1.4.2 Periodically Processing Tasks asynchronously

This section corresponds to the last three steps above.

The following are timed asynchronous processing tasks.

  1. Processing tasks, that is, call XxNodeService pushXxxNode method, that is, through ConnectionHandler to get all node connections, send the node list;
  2. After receiving a reply, it is called if confirmation is requiredStoreService#confirmNodeStatusMethod, remove the node from expectNodes;
  3. After all the nodes are removed from expectNodes, remove the operation from expectNodesOrders.
public class DataNodeServiceImpl implements DataNodeService {

    public void pushDataNodes(NodeChangeResult nodeChangeResult, Map<String, DataNode> targetNodes,
                              boolean confirm, String confirmNodeIp) {

        if(nodeChangeResult ! =null) {
            NodeConnectManager nodeConnectManager = getNodeConnectManager();
            Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);

            // add register confirm
            StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
            DataCenterNodes dataCenterNodes = storeService.getDataCenterNodes();
            Map<String, DataNode> registeredNodes = dataCenterNodes.getNodes();

            // Get all node connections through ConnectionHandler and send the list of nodes.
            for (InetSocketAddress address : connections) {
                try {
                    Request<NodeChangeResult> nodeChangeRequestRequest = new Request<NodeChangeResult>() {
                        @Override
                        public NodeChangeResult getRequestBody(a) {
                            return nodeChangeResult;
                        }

                        @Override
                        public URL getRequestUrl(a) {
                            return newURL(address); }};// Send the node list;
                    Response response = dataNodeExchanger.request(nodeChangeRequestRequest);

                    if (confirm) {
                        Object result = response.getResult();
                        if (result instanceof CommonResponse) {
                            CommonResponse genericResponse = (CommonResponse) result;
                            if (genericResponse.isSuccess()) {
                             // After receiving the reply, if confirmation is needed, the 'StoreService#confirmNodeStatus' method will be called to remove the node from expectNodes;
                                confirmStatus(address, confirmNodeIp);
                            }
                        } 
                    }

                } 
            }
        }
    }  
}
Copy the code
2.1.4.3 Confirmation Procedure

Further refine the “validation steps” mentioned above

private void confirmStatus(InetSocketAddress address, String confirmNodeIp) {
    String ipAddress = address.getAddress().getHostAddress();
    dataStoreService.confirmNodeStatus(ipAddress, confirmNodeIp);
}
Copy the code

And then the DataStoreService

  1. After receiving a reply, it is called if confirmation is requiredStoreService#confirmNodeStatusMethod, remove the node from expectNodes;
  2. After all the nodes are removed from expectNodes, remove the operation from expectNodesOrders.

public void confirmNodeStatus(String ipAddress, String confirmNodeIp) {
    NodeOperator<DataNode> fireNode = dataConfirmStatusService.peekConfirmNode();
    if(fireNode ! =null) {
        String fireNodeIp = fireNode.getNode().getIp();
        Map<String/*ipAddress*/, DataNode> waitNotifyNodes = dataConfirmStatusService
            .getExpectNodes(fireNode.getNode());

        if(waitNotifyNodes ! =null) {
            Set<String> removeIp = getRemoveIp(waitNotifyNodes.keySet());
            removeIp.add(ipAddress);

            // Remove the node from expectNodes;
            waitNotifyNodes = dataConfirmStatusService.removeExpectConfirmNodes(
                fireNode.getNode(), removeIp);

            if (waitNotifyNodes.isEmpty()) {
                //all node be notified,or some disconnect node be evict
                try {
                    // When all the nodes are removed from expectNodes, remove the operation from expectNodesOrders.
                    if (null! = dataConfirmStatusService .removeExpectNodes((dataConfirmStatusService.pollConfirmNode()) .getNode())) {//add init status must notify
                        if (fireNode.getNodeOperate() == DataOperator.ADD) {
                            notifyConfirm(fireNode.getNode());
                        }
                    }
                }
            }
        } else {
            try {
                //wait node not exist,dataConfirmStatusService.pollConfirmNode(); }}}}Copy the code
2.1.4.4 Summarize the consumption process

The consumption process is summarized as follows:

+-------------------+ +-----------+ +------------------------+ +----------------+ |ServiceStateMachine+------> | Processor | +-----> |DataConfirmStatusService| | loop | +-------------------+ +-----------+ +-----+------------------+ v  | | +------+------+ | | put |TaskExecutors| | v +------+------+ | +--------+--------+ remo^e | | +------------------> |expectNodesOrders| <-------+ | | | loop +--------+--------+ | v | | | | +------------+--------------+ | | | peekConfirmNode | |DataNodeSingleTaskProcessor| | | | | +------------+--------------+ | | | | | | +---------------------------+--------------------+ v | | | | ExecutorManager | +----+---+ | v | | +--------------------------------------------+ | |fireNode| | +----------+-----------+ | | | TimedSuper^isorTask | | +----+---+ | |DataNodeChangePushTask| | | | +---------------------------------------+ | | | | +----------+-----------+ |  | | | metaServerRegistry.pushNodeListChange | | | | putExpectNodes | | | | | +---------------------------------------+ | | | | | | | +--------------------------------------------+ | v | v | +------------------------------------------------+ +------+------+ remo^e | +---------+---------+ | ^ | expectNodes | <---------+------------------<--------+ |DataNodeServiceImpl| | | +------+------+ | +---------+---------+ | | | | | | | v | | | | +-------------+--------+ | v | | | firePushDataListTask +-----------------------------------> | +------+-------+ | | +-------------+--------+ taskListenerManager.sendTaskEvent | | StoreService | | | | | +------+-------+ ^ | v | | | | +-------------+-----------+ | v | ^<--------------+ firePushSessionListTask +--------------------------------> | +-----+-------+ | +-------------------------+taskListenerManager.sendTaskEvent +-----+confirmStatus| | +-----+-------+ | | | >----------------+Copy the code

See below on your phone

2.1.5 summary

The data node registration process is summarized as follows.

                                                                                                       +----------------------------+
                                                              put        +----------------------+      | Map(String, NodeRepository)|
                                                            +----------> |dataRepositoryService +----> | registry                   |
                                                            |            +----------------------+      +----------------------------+
                                                            |
                 register                   addNode         |
+----------------+      +------------------+        +-------+--------+
| DataNodeHandler+----->+metaServerRegistry+------->+DataStoreService|                                       TimedSupervisorTask
+----------------+      +------------------+        +-------+--------+
                                                            |            +------------------------+          +------------------+
                                                            +----------> |dataConfirmStatusService| +------> |pushNodeListChange|
                                                          putConfirmNode +------------------------+          +------------------+

Copy the code

See below on your phone

2.2 SessionNodeHandler

The registration of the Session node is almost similar to that of the Data node.

SessionNodeHandler is defined as follows:

public class SessionNodeHandler extends AbstractServerHandler<SessionNode> {

    @Autowired
    private Registry            metaServerRegistry;

    public Object reply(Channel channel, SessionNode sessionNode) {
        NodeChangeResult nodeChangeResult;
        try {
            nodeChangeResult = metaServerRegistry.register(sessionNode);
        } 
        returnnodeChangeResult; }}Copy the code

Then there is the SessionStoreService for business processing.

public class SessionStoreService implements StoreService<SessionNode> {
  
    public NodeChangeResult addNode(SessionNode sessionNode) {

        try {
            String ipAddress = sessionNode.getNodeUrl().getIpAddress();

            sessionRepositoryService.put(ipAddress, new RenewDecorate(sessionNode,
                RenewDecorate.DEFAULT_DURATION_SECS));

            sessionVersionRepositoryService.checkAndUpdateVersions(nodeConfig.getLocalDataCenter(),
                System.currentTimeMillis());

            renew(sessionNode, 30);
            sessionConfirmStatusService.putConfirmNode(sessionNode, DataOperator.ADD);

        } 

        returndataStoreService.getNodeChangeResult(); }}Copy the code

And then SessionRepositoryService.

Note that the map key is IP, which is inconsistent with Data.

@RaftService(uniqueId = "sessionServer")
public class SessionRepositoryService extends AbstractSnapshotProcess
                                                                     implements
                                                                     RepositoryService<String.RenewDecorate<SessionNode>> {
    /** * session node store */
    private ConcurrentHashMap<String/*ipAddress*/, RenewDecorate<SessionNode>> registry          = new ConcurrentHashMap<>();
  
    public RenewDecorate<SessionNode> put(String ipAddress, RenewDecorate
       
         sessionNode, Long currentTimeMillis)
        {
        try {
            RenewDecorate oldRenewDecorate = registry.get(ipAddress);
            registry.put(ipAddress, sessionNode);
        }
        returnsessionNode; }}Copy the code

As you can see from the previous section, DataServer and SessionServer both have handlers to handle node registration requests. The act of registration is done by Registry.

Such as metaServerHandlers.

@Bean(name = "metaServerHandlers")
public Collection<AbstractServerHandler> metaServerHandlers(a) {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(metaConnectionHandler());
    list.add(getNodesRequestHandler());
    return list;
}
Copy the code

0x03 Registration Information Is Renewed

3.1 Key classes

When a node is registered, node information is wrapped up in RenewDecorate. This is the key to renewing and evicting registration information. This class is defined as follows:

public class RenewDecorate<T> implements Serializable {
    public static final int DEFAULT_DURATION_SECS = 15;
    private T               renewal;  // Encapsulate the node object
    private long            beginTimestamp; // Register events
    private volatile long   lastUpdateTimestamp; // Renewal time
    private long            duration; // The timeout period
  
    public boolean isExpired(a) {
        return System.currentTimeMillis() > lastUpdateTimestamp + duration;
    }

    public void renew(a) {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }

    public void renew(long durationSECS) {
        lastUpdateTimestamp = System.currentTimeMillis() + durationSECS * 1000; }}Copy the code

The object contains information about the registered node, including the registration time, last renewal time, and expiration time.

The operation of renewal is to modify lastUpdateTimestamp. Whether the expiration date is determined by whether system.currentTimemillis () -lastupdateTimestamp > duration is established. If so, the node is deemed to have timed out and expelled.

3.2 Execution Path

As you can see, renew can be called from multiple execution paths.

A path:

renew:, RenewDecorate (com.alipay.sofa.registry.server.meta.store)
replace:, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:, RepositoryService (com.alipay.sofa.registry.server.meta.repository)
invokeInterface_L3_L: (java.lang.invoke.LambdaForm$DMH)
reinvoke: (java.lang.invoke.LambdaForm$BMH)
invoker: (java.lang.invoke.LambdaForm$MH)
invokeExact_MT: (java.lang.invoke.LambdaForm$MH)
invokeWithArguments:, MethodHandle (java.lang.invoke)
process:, Processor (com.alipay.sofa.registry.jraft.processor)
onApply:, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
doApplyTasks:, FSMCallerImpl (com.alipay.sofa.jraft.core)
doCommitted:, FSMCallerImpl (com.alipay.sofa.jraft.core)
runApplyTask:, FSMCallerImpl (com.alipay.sofa.jraft.core)
access$:, FSMCallerImpl (com.alipay.sofa.jraft.core)
onEvent:, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
onEvent:, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
run:, BatchEventProcessor (com.lmax.disruptor)
run:, Thread (java.lang)
Copy the code

Path 2:

renew:, DataStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:, DataStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:, DataStoreService (com.alipay.sofa.registry.server.meta.store)
register:, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
reply:, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
reply:, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
handleRequest:, SyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
dispatchToUserProcessor:, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
doProcess:, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
run:, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
runWorker:, ThreadPoolExecutor (java.util.concurrent)
run:, ThreadPoolExecutor$Worker (java.util.concurrent)
run:, Thread (java.lang)
Copy the code

For example, in DataStoreService

@Override
public NodeChangeResult addNode(DataNode dataNode) {
    NodeChangeResult nodeChangeResult;
    String ipAddress = dataNode.getNodeUrl().getIpAddress();
    write.lock();
    try {
        dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode,
            RenewDecorate.DEFAULT_DURATION_SECS));
        renew(dataNode, 30);  / / contract
        nodeChangeResult = getNodeChangeResult();
        dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD);
    } finally {
        write.unlock();
    }
    return nodeChangeResult;
}
Copy the code

Call renew


public void renew(DataNode dataNode, int duration) {

        String ipAddress = dataNode.getNodeUrl().getIpAddress();
        RenewDecorate renewer = dataRepositoryService.get(ipAddress);
        if (renewer == null) {
            addNode(dataNode); / / new
        } else {
            / / contract
            if (duration > 0) {
                dataRepositoryService.replace(ipAddress, new RenewDecorate(dataNode, duration));
            } else {
                dataRepositoryService.replace(ipAddress, newRenewDecorate(dataNode, RenewDecorate.DEFAULT_DURATION_SECS)); }}}Copy the code

Because it is a Raft of Repository for data consistency maintenance, so dataRepositoryService. Replace will be replaced by Proxy, then came to the ProxyHandler.

@Override
public Object invoke(Object proxy, Method method, Object[] args) {
    try {
        ProcessRequest request = new ProcessRequest();
        request.setMethodArgSigs(createParamSignature(method.getParameterTypes()));
        request.setMethodName(method.getName());
        request.setMethodArgs(args);
        request.setServiceName(serviceId);

        if (Processor.getInstance().isLeaderReadMethod(method)) {
            return doInvokeMethod(request);
        }
        returnclient.sendRequest(request); }}Copy the code

When Raft comes to the server, the stack looks like this:

replace:, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:, RepositoryService (com.alipay.sofa.registry.server.meta.repository)
invokeInterface_L3_L: (java.lang.invoke.LambdaForm$DMH)
reinvoke: (java.lang.invoke.LambdaForm$BMH)
invoker: (java.lang.invoke.LambdaForm$MH)
invokeExact_MT: (java.lang.invoke.LambdaForm$MH)
invokeWithArguments:, MethodHandle (java.lang.invoke)
process:, Processor (com.alipay.sofa.registry.jraft.processor)
onApply:, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
doApplyTasks:, FSMCallerImpl (com.alipay.sofa.jraft.core)
doCommitted:, FSMCallerImpl (com.alipay.sofa.jraft.core)
runApplyTask:, FSMCallerImpl (com.alipay.sofa.jraft.core)
access$100:, FSMCallerImpl (com.alipay.sofa.jraft.core)
onEvent:, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
onEvent:, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
run:, BatchEventProcessor (com.lmax.disruptor)
run:, Thread (java.lang)
Copy the code

The specific renew code is as follows, which finally completes the update:

@Override
public RenewDecorate<DataNode> replace(String ipAddress, RenewDecorate
       
         dataNode, Long currentTimeMillis)
        {
    try {
        String dataCenter = dataNode.getRenewal().getDataCenter();
        NodeRepository<DataNode> dataNodeRepository = registry.get(dataCenter);

        if(dataNodeRepository ! =null) {
            Map<String/*ipAddress*/, RenewDecorate<DataNode>> dataNodes = dataNodeRepository
                .getNodeMap();
            RenewDecorate<DataNode> oldRenewDecorate = dataNodes.get(ipAddress);
            if(oldRenewDecorate ! =null&& oldRenewDecorate.getRenewal() ! =null) { oldRenewDecorate.setRenewal(dataNode.getRenewal()); oldRenewDecorate.renew(); }}returndataNode; }}Copy the code

The details are shown below.

+---------------------------------------+ +---------------------------------------------+ | +------------------------------+ | | +----------------------------------+ | | | +----------------+ registry |Client| | Server| +----------------------+registry | | | | |DataStoreService| | | | | | DataRepositoryService| | | | | +-----+----------+ | | | | +---------+------------+ | | | | | replace | | | | ^ replace | | | | | | | | | | | | | | v | | | | +------+----+ | | | | +-----+--------------------+ | | | | | Processor | | | | | |DataRepositoryService stub| | | | | +------+----+ | | | | +-----+--------------------+ | | | | ^ onApply | | | | | | | | | | | | | | v | | | | +-------+------+ | | | | +-+---+ | | | | | StateMachine | | | | | |Proxy| | | | | +-------+------+ | | | | +-+---+ | | |  | ^ process | | | | | invoke | | | | | | | | | v | | | | | | | | | +----+-------+ | | | | +------+------+ | | | | |ProxyHandler| | | | | |FSMCallerImpl| | | | | +----+-------+ | | | | +------+------+ | | | | | sendRequest | | | | ^ | | | | v | | | | | received | | | | +---+------+ | | | | | | | | | |RaftClient| | | | | +-----------------+ | | | | +----------+ | | network| | |RaftServerHandler| | | | | | +-------------------> | +-----------------+ | | | +------------------------------+ | | +----------------------------------+ | | | | | +---------------------------------------+ +---------------------------------------------+Copy the code

Here’s what you see on your phone

3.3 ReNewNodesRequestHandler

Renewals are handled by the renewals Handler as well as the renewals Handler, and are ultimately handled by StoreService. In addition, if the registered node is not queried during the renewal, the node registration operation will be triggered.

The ReNew Handler is set during initialization.

        @Bean(name = "sessionServerHandlers")
        public Collection<AbstractServerHandler> sessionServerHandlers(a) {
            Collection<AbstractServerHandler> list = new ArrayList<>();
            list.add(sessionConnectionHandler());
            list.add(sessionNodeHandler());
            list.add(renewNodesRequestHandler()); // Register with Server handler
            list.add(getNodesRequestHandler());
            list.add(fetchProvideDataRequestHandler());
            return list;
        }

        @Bean(name = "dataServerHandlers")
        public Collection<AbstractServerHandler> dataServerHandlers(a) {
            Collection<AbstractServerHandler> list = new ArrayList<>();
            list.add(dataConnectionHandler());
            list.add(getNodesRequestHandler());
            list.add(dataNodeHandler());
            list.add(renewNodesRequestHandler()); // Register with Server handler
            list.add(fetchProvideDataRequestHandler());
            return list;
        }

Copy the code

Specific definitions are as follows:

public class RenewNodesRequestHandler extends AbstractServerHandler<RenewNodesRequest> {
    @Autowired
    private Registry            metaServerRegistry;

    public Object reply(Channel channel, RenewNodesRequest renewNodesRequest) {
        Node renewNode = null;
        renewNode = renewNodesRequest.getNode();
        metaServerRegistry.renew(renewNode, renewNodesRequest.getDuration());
        return null;
    }

    public Class interest(a) {
        return RenewNodesRequest.class;
    }

    public HandlerType getType(a) {
        returnHandlerType.PROCESSER; }}Copy the code

0 x04 away

See ExecutorManager#startScheduler for details. One of the tasks calls Registry#evict, which iterates through a stored Map to get an outdated list. Remove them from Repository by calling the StoreService#removeNodes method, which also triggers change notifications. By default, this task is executed every three seconds.

4.1 configuration

public void startScheduler(a) {
    scheduler.schedule(new TimedSupervisorTask("HeartbeatCheck", scheduler, heartbeatCheckExecutor,
                    metaServerConfig.getSchedulerHeartbeatTimeout(), TimeUnit.SECONDS,
                    metaServerConfig.getSchedulerHeartbeatExpBackOffBound(), () -> metaServerRegistry.evict()),
            metaServerConfig.getSchedulerHeartbeatFirstDelay(), TimeUnit.SECONDS);
}  
Copy the code

4.2 remove

This is a case of going through storeServices, fetching expired nodes, and then expelling them.

public class MetaServerRegistry implements Registry<Node> {

    public void evict(a) {
        for (NodeType nodeType : NodeType.values()) {
            StoreService storeService = ServiceFactory.getStoreService(nodeType);
            if(storeService ! =null) {
                Collection<Node> expiredNodes = storeService.getExpired();
                if(expiredNodes ! =null && !expiredNodes.isEmpty()) {
                    storeService.removeNodes(expiredNodes);
                }
            }
        }
    }  
}    
Copy the code

DataStoreService will continue to be invoked.

@Override
public void removeNodes(Collection<DataNode> nodes) {
    write.lock();
    try {
        if(nodes ! =null && !nodes.isEmpty()) {
            for (DataNode dataNode : nodes) {
                String ipAddress = dataNode.getNodeUrl().getIpAddress();
                RenewDecorate<DataNode> dataNodeRemove = dataRepositoryService
                    .remove(ipAddress);
                if(dataNodeRemove ! =null) { dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.REMOVE); }}}}finally{ write.unlock(); }}Copy the code

Finally, the DataRepositoryService is called

@Override
public RenewDecorate<DataNode> remove(Object key, Long currentTimeMillis) {
    try {
        String ipAddress = (String) key;
        String dataCenter = nodeConfig.getLocalDataCenter();

        NodeRepository<DataNode> dataNodeRepository = registry.get(dataCenter);
        if(dataNodeRepository ! =null) {
            Map<String/*ipAddress*/, RenewDecorate<DataNode>> dataNodes = dataNodeRepository
                .getNodeMap();
            if(dataNodes ! =null) {
                RenewDecorate<DataNode> oldRenewDecorate = dataNodes.remove(ipAddress);
                dataNodeRepository.setVersion(currentTimeMillis);
                returnoldRenewDecorate; }}}}Copy the code

0x05 Node List Query

Data, Meta, and Session Servers all provide getNodesRequestHandler, which handles requests to query the current node list, essentially fetching Data from the underlying Repository. The structure of the returned result is shown in the NodeChangeResult class, which contains a list of nodes and version numbers for each data center.

public class NodeChangeResult<T extends Node> implements Serializable {
    private final NodeType                                               nodeType;
    private Map<String/*dataCenter id*/, Map<String /*ipAddress*/, T>> nodes;
    private Long                                                         version;
    private Map<String/*dataCenter*/, Long /*version*/>                dataCenterListVersions;
    /** local dataCenter id */
    private String                                                       localDataCenter;
}
Copy the code

5.1 configuration

Data, Meta, and Session Server all provide getNodesRequestHandler. Details are as follows:

@Bean(name = "sessionServerHandlers")
public Collection<AbstractServerHandler> sessionServerHandlers(a) {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(sessionConnectionHandler());
    list.add(sessionNodeHandler());
    list.add(renewNodesRequestHandler());
    list.add(getNodesRequestHandler()); / / do the configuration
    list.add(fetchProvideDataRequestHandler());
    return list;
}

@Bean(name = "dataServerHandlers")
public Collection<AbstractServerHandler> dataServerHandlers(a) {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(dataConnectionHandler());
    list.add(getNodesRequestHandler()); / / do the configuration
    list.add(dataNodeHandler());
    list.add(renewNodesRequestHandler()); 
    list.add(fetchProvideDataRequestHandler());
    return list;
}

@Bean(name = "metaServerHandlers")
public Collection<AbstractServerHandler> metaServerHandlers(a) {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(metaConnectionHandler());
    list.add(getNodesRequestHandler()); / / do the configuration
    return list;
}
Copy the code

The Bean configuration for getNodesRequestHandler is generated as follows:

@Configuration
public static class MetaServerRemotingConfiguration {
        @Bean
        public AbstractServerHandler getNodesRequestHandler(a) {
            return newGetNodesRequestHandler(); }}Copy the code

5.2 the response handler

NodesRequest uses Bolt to respond to messages.

public class GetNodesRequestHandler extends AbstractServerHandler<GetNodesRequest> {
    @Autowired
    private Registry            metaServerRegistry;

    public Object reply(Channel channel, GetNodesRequest getNodesRequest) {
        NodeChangeResult nodeChangeResult;
        try {
            nodeChangeResult = metaServerRegistry.getAllNodes(getNodesRequest.getNodeType());
        } 
        returnnodeChangeResult; }}Copy the code

The details are shown in the figure below:

                      +-------------------------+
                      | SyncUserProcessorAdapter|
                      +-----------+-------------+
                                  |
                                  |
                                  v  handleRequest
                       +----------+-----------+
                       |GetNodesRequestHandler|
                       +----------+-----------+
                                  |
                                  |
                                  v  getAllNodes
                         +--------+---------+
                         |MetaServerRegistry|
                         +--------+---------+
                                  |
                                  |
                                  v  getNodeChangeResult
                          +-------+--------+
                          |MetaStoreService|
                          +----+------+----+
                               |      |
                   +-----------+      +------------+
                   |                               |
      getNodeMap   v                               v  getNodeMap
+------------------+-----------+        +----------+-------------------+
|dataCenter, metaNodeRepository|  ...   |dataCenter, metaNodeRepository|
+------------------------------+        +------------------------------+

Copy the code

We can go a little deeper.

5.3 the Registry operation

Registry simply calls StoreService.

public class MetaServerRegistry implements Registry<Node> {
    @Override
    public NodeChangeResult getAllNodes(NodeType nodeType) {
        StoreService storeService = ServiceFactory.getStoreService(nodeType);
        returnstoreService.getNodeChangeResult(); }}Copy the code

5.4 StoreService operation

Service iterates through the data center, obtains the Node list and version, and finally returns.

public class MetaStoreService implements StoreService<MetaNode> {

    public NodeChangeResult getNodeChangeResult(a) {

        NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.META);
        String localDataCenter = nodeConfig.getLocalDataCenter();
        Map<String/*dataCenter*/, NodeRepository> metaRepositoryMap = metaRepositoryService.getNodeRepositories();
        ConcurrentHashMap<String/*dataCenter*/, Map<String/*ipAddress*/, MetaNode>> pushNodes = new ConcurrentHashMap<>();
        Map<String/*dataCenter*/, Long> versionMap = new ConcurrentHashMap<>();

        metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> {

            if (localDataCenter.equalsIgnoreCase(dataCenter)) {
                nodeChangeResult.setVersion(metaNodeRepository.getVersion());
            }
            versionMap.put(dataCenter, metaNodeRepository.getVersion());

            Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap();
            Map<String, MetaNode> newMap = new ConcurrentHashMap<>();
            dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
            pushNodes.put(dataCenter, newMap);
        });

        nodeChangeResult.setLocalDataCenter(localDataCenter);
        nodeChangeResult.setNodes(pushNodes);
        nodeChangeResult.setDataCenterListVersions(versionMap);
        returnnodeChangeResult; }}Copy the code

Results:

nodeChangeResult = 
 nodeType = {Node$NodeType} "DATA"
 nodes = {ConcurrentHashMap}  size = 1
  "DefaultDataCenter" -> {ConcurrentHashMap}  size = 0
 version = 1601126414990
 dataCenterListVersions = {ConcurrentHashMap}  
  "DefaultDataCenter" -> 1601126414990
 localDataCenter = "DefaultDataCenter"
Copy the code

5.5 Data Synchronization during Node Change

MetaServer senses when a new node comes online or goes offline through the network connection. All dataservers run a ConnectionRefreshTask that periodically polls MetaServer to obtain information about data nodes. It should be noted that in addition to DataServer actively fetching node information from MetaServer, MetaServer also actively sends NodeChangeResult requests to each node to inform the node of information changes. The final effect of pushing and pulling to obtain information is consistent.

The next article explores how MetaServer uses Raft in detail.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Due to the word limit, at the end of the series together issued, forgive me.