0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

In this article, we describe how SOFARegistry handles changes to a Data node in your computer room. In this article, we describe how SOFARegistry handles changes to a Data node.

0x02 Business category

2.1 DataServer Data Consistency

DataServer performs core data storage functions in SOFARegistry. Data is stored in consistent Hash fragments based on dataInfoId and supports multi-copy backup to ensure high data availability. This layer can be expanded as the size of the service data volume grows.

If DataServer fails, MetaServer notifies all DataServer and sessionServers that data fragments can be failover to other replicas, and that the DataServer migrates the fragmented data within the cluster.

2.2 Local Equipment Room Policy

Data Center stands for the local computer room.

  • Data backup is only done in the local machine room;
  • Each data center has its own hash;

Ali has different machine room backup, should be the Global part, but no open source.

So we have to focus on how the local node continues to process.

0x03 General Logic

DataServer informs each other about the version number of the data.

  • A NotifyOnlineRequest tells other online nodes that I’m new and you can configure it, and tells me what version number data you have.
  • The online service node uses NotifyFetchDatumRequest to tell the new node, I have the version number data you need, come and get it.

Therefore, we conclude as follows: After receiving the data Server change message from Meta Server, all data servers in the same data Center will inform each other of the upgrade version number.

  • NotifyOnline sends a NotifyOnlineRequest, and the NotifyOnlineHandler of other Data servers does the corresponding processing.
  • NotifyToFetch sends NotifyFetchDatumRequest, and notifyFetchDatumHandler of other Data servers does the corresponding processing.

0 x04 message

4.1 LocalDataServerChangeEvent

Mentioned above, in DataServerChangeEventHandler, when handling DataServerChangeEvent, if the current node is the DataCenter, trigger the LocalDataServerChangeEvent events.

public class LocalDataServerChangeEvent implements Event {
    private Map<String, DataNode> localDataServerMap;
    private long                  localDataCenterversion;
    private Set<String>           newJoined;
    private long                  version;
}
Copy the code

4.2 source

Is DataServerChangeEventHandler LocalDataServerChangeEvent sources.

MetaServer senses when a new node comes online or goes offline through the network connection. All dataservers run a ConnectionRefreshTask that periodically polls MetaServer to obtain information about data nodes. It should be noted that in addition to DataServer actively fetching node information from MetaServer, MetaServer also actively sends NodeChangeResult requests to each node to inform the node of information changes. The final effect of pushing and pulling to obtain information is consistent.

When the data node returned by polling information changes, a DataServerChangeEvent will be sent to EventCenter. If the processor of this event determines that the node information in the equipment room has changed, LocalDataServerChangeEvent will deliver a new event, the event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes, as shown:

In DataServerChangeEventHandler doHandle function, can produce LocalDataServerChangeEvent.

0x05 Message processing

LocalDataServerChangeEventHandler is data node with the room change event handlers, or the same cluster data synchronizer.

LocalDataServerChangeEvent event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes. Therefore, the Data Server newly added to the Data Center is processed.

5.1 LocalDataServerChangeEventHandler

The key is in the midst of LocalDataServerChangeEventHandler:

private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque<>(); 

private class LocalClusterDataSyncer implements Runnable
Copy the code

The explanation is as follows:

  • AfterPropertiesSet starts a thread called LocalClusterDataSyncer for asynchronous processing.
  • During doHandle, LocalClusterDataSyncer is called through Events for asynchronous processing.

Made a unified within LocalDataServerChangeEventHandler delaying asynchronous processing. After get LocalDataServerChangeEvent from EventCenter, will turn out for the events in this event, then the internal LocalClusterDataSyncer asynchronous execution will be in the following.

Inside LocalClusterDataSyncer is:

  • If it is in a working state, start comparing Data and notify the relevant Data Servers. If local server is working, compare sync data;
  • If the server is not working, it is a new server and notifies other servers. Newer if local server is not working, notify others that I am newer;

LocalDataServerChangeEventHandler are defined as follows:

public class LocalDataServerChangeEventHandler extends
                                              AbstractEventHandler<LocalDataServerChangeEvent> {

    @Autowired
    private DataServerConfig                          dataServerConfig;

    @Autowired
    private LocalDataServerCleanHandler               localDataServerCleanHandler;

    @Autowired
    private DataServerCache                           dataServerCache;

    @Autowired
    private DataNodeExchanger                         dataNodeExchanger;

    @Autowired
    private DataNodeStatus                            dataNodeStatus;

    @Autowired
    private DatumCache                                datumCache;

    @Autowired
    private DatumLeaseManager                         datumLeaseManager;
  
    private BlockingQueue<LocalDataServerChangeEvent> events    = new LinkedBlockingDeque<>();  
}
Copy the code

5.1.1 Sending messages

In the doHandle function, the latest messages are placed in BlockingQueue.

public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
    isChanged.set(true);

    // Better change to Listener pattern
    localDataServerCleanHandler.reset();
    datumLeaseManager.reset();

    events.offer(localDataServerChangeEvent);
}
Copy the code

5.1.2 Starting the Engine

After the Bean is started, the consumption engine is started via afterPropertiesSet.

@Override
public void afterPropertiesSet(a) throws Exception {
		super.afterPropertiesSet();
    start();
}

public void start(a) { Executor executor = ExecutorFactory .newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName());  executor.execute(new LocalClusterDataSyncer());
}
Copy the code

LocalClusterDataSyncer performs business-specific consumption messages.

0x06 Consumption notification message

In the engine, LocalClusterDataSyncer continues to consume.

private class LocalClusterDataSyncer implements Runnable {

    @Override
    public void run(a) {
        while (true) {
            try {
                LocalDataServerChangeEvent event = events.take();
                //if size of events is greater than 0, not handle and continue, only handle the last one in the queue
                if (events.size() > 0) {
                    continue;
                }
                long changeVersion = event.getVersion();
                isChanged.set(false);
                if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
                    //if local server is working, compare sync data
                    notifyToFetch(event, changeVersion);
                } else {
                    dataServerCache.checkAndUpdateStatus(changeVersion);
                    //if local server is not working, notify others that i am newernotifyOnline(changeVersion); dataServerCache.updateItem(event.getLocalDataServerMap(), event.getLocalDataCenterversion(), dataServerConfig.getLocalDataCenter()); }}}}Copy the code

【 Key notes 】

Each data server from meta server receives the DataServerChangeEvent, because is the local data server, so will be converted to LocalDataServerChangeEvent.

Since every Data Server will receive it, the newly online server will receive it, and the already online server will also receive it. This is the main point of this presentation.

6.1 the new node

In the new node, LocalDataServerChangeEvent event processor LocalDataServerChangeEventHandler will decide whether the current node is a new node, If it is a new node, a NotifyOnlineRequest request is sent to other nodes, as shown:

The logic of a new node when a graph DataServer node goes online

The figure above shows the processing logic of newly added nodes receiving the message of node change. If the nodes already running online receive the message of node change, the previous processing process is the same. Difference in LocalDataServerChangeEventHandler the change node is calculated according to the Hash ring (expansion scenarios, change the node is a new node, should the scenarios, The change node is the data fragment scope and backup node of the offline node’s successor node in the Hash ring.

A newly added node uses a NotifyOnlineRequest to tell the other nodes that are already online that I am new and you can configure it accordingly.

6.1.1 notifyOnline

NotifyOnline gets all the dataserVerNodes in the current Local Data Center from the DataServerNodeFactory and sends NotifyOnlineRequest one by one: I am online.

Other online Data servers then start interacting with the new node when notified.

/**
 * notify other dataservers that this server is online newly
 *
 * @param changeVersion
 */
private void notifyOnline(long changeVersion) {
    Map<String, DataServerNode> dataServerNodeMap = DataServerNodeFactory
        .getDataServerNodes(dataServerConfig.getLocalDataCenter());
    for (Entry<String, DataServerNode> serverEntry : dataServerNodeMap.entrySet()) {
        while (true) {
            String ip = serverEntry.getKey();
            DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode(
                dataServerConfig.getLocalDataCenter(), ip);
            try {
                final Connection connection = dataServerNode.getConnection();
                CommonResponse response = (CommonResponse) dataNodeExchanger.request(
                    new Request() {

                        @Override
                        public Object getRequestBody(a) {
                            return new NotifyOnlineRequest(DataServerConfig.IP,
                                changeVersion);
                        }

                        @Override
                        public URL getRequestUrl(a) {
                            return newURL(connection.getRemoteIP(), connection .getRemotePort()); } }).getResult(); }}}}Copy the code

6.2 Online Service Nodes

The current online service node traverses the data items in its own memory, filters out the data items that fall within the fragment range of the change node, and then sends NotifyFetchDatumRequest to the change node and its backup node. After receiving the request, the change node and its backup node The processor will synchronize data to the sender (NotifyFetchDatumHandler. FetchDatum), as shown.

Note that this figure is the opposite of the placement of the JVM to the left and right of the figure above.

The logic of the existing node when the graph DataServer node changes

That is, the online service node tells the new node through NotifyFetchDatumRequest that I have the data you need and you should come and get it.

Here are some important functions:

6.2.1 notifyToFetch

Notify onlined newly dataservers to fetch datum, so you can update yourself according to the request message.

The specific functions of notifyToFetch are:

  • First of all, a new Server is obtained from event. This data is set in three formats, namely Map format dataServerMapIn and List format dataServerNodeList. The dataServerMap format is ConcurrentHashMap.
  • Generate a consistentHash with the new Server;
  • usetoBeSyncMap = getToBeSyncMap(consistentHash);Get the map to be synchronized. GetToBeSyncMap does thisWhich IP needs to synchronize which things; Get a map of datum to be synced.
    • Traverse toBeSyncMap, and for each toBeSyncEntry that needs to be synchronized, get its IP and dataInfoMap, which isMap<String, Map<String, BackupTriad>>Type;
      • Iterate over all of the values in dataInfoMapEntry<String, Map<String, BackupTriad>> dataCenterEntry, the key of the entry is a dataCenter;
        • Iterate over all of dataTriadEntryEntry<String, BackupTriad> dataTriadEntry, whose key is dataInfoId;
          • Get Datum from datumCache with dataInfoId;
          • Get the Datum version numberversionMap.put(dataInfoId, datum.getVersion());
      • Build a unified big-version map for this dataCenter: allVersionMap,allVersionMap.put(dataCenter, versionMap);
      • If allVersionMap is empty, do the following:
        • Remove the CORRESPONDING IP address from dataServerCache.
        • Notify the corresponding data server of this IP that you need to synchronize these: doNotify(IP, allVersionMap, changeVersion); Tell the IP that you need to synchronize the datainfoids in the dataCenter with their version numbers;
        • Remove IP from dataServerCache
  • If the dataServerMap of the ConcurrentHashMap format is non-empty, iterate over its key, which is a targetIp, and remove the targetIp from the dataServerCache;
  • DataServerCache updates server list based on dataServerMapIn.

6.2.2 getToBeSyncMap

The logic of getToBeSyncMap is to find out the list of IP addresses that need to be notified and which datainfoids need to be synchronized for each IP as follows:

  • The function argument is consistentHashs newly calculated from the new Servers
  • ConsistentHashOld computs an old hash based on the old configuration of dataServerConfig.
  • For each Datum in the datumCache, a new triAD is computed; Details are as follows:
  • Get all datumCache data, build an allMap, traversal all dataCenterEntry in allMap:
    • For the data center, traverse all datummaps for the data Center:
    • Traverse the datumMap with dataInfoId:
      • Calculate the new backupNodes with the new consistentHash;
      • Old consistentHashOld to get old backupTriad;
      • Get newJoinedNodes from backupTriad, that is, remove backupTriad and NotWorking from new backupTriad;
      • Traversing newJoinedNodes, for each node, construct toBeSyncMap = Map

        >>
  • Return toBeSyncMap; This is theWhich IP needs to synchronize which things;
private Map<String/*ip*/, Map<String/*datacenter*/, Map<String/*datainfoId*/, BackupTriad>>> getToBeSyncMap(ConsistentHash<DataNode> consistentHash) {

    Map<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = new HashMap<>();
    Map<String, List<DataNode>> triadCache = new HashMap<>();

    ConsistentHash<DataNode> consistentHashOld = dataServerCache
        .calculateOldConsistentHash(dataServerConfig.getLocalDataCenter());
}
Copy the code

6.2.3 getNewJoined

GetNewJoined is to find those that are not in the stored Triad or are in it but are not in working state.

public List<DataNode> getNewJoined(List<DataNode> newTriad, Set<String> notWorking) {
    List<DataNode> list = new ArrayList<>();
    for (DataNode node : newTriad) {
        String ip = node.getIp();
        if (!ipSetOfNode.contains(ip) || notWorking.contains(ip)) {
            list.add(node);
        }
    }
    return list;
}
Copy the code

6.2.4 BackupTriad

BackupTriad is used to backup the DataNode list corresponding to dataInfoId.

public class BackupTriad {
    /** dataInfoId */
    private String         dataInfoId;

    /**
     * calculate current dataServer list Consistent hash to get dataInfoId belong node and backup node list
     * @see  ConsistentHash#ConsistentHash(int, java.util.Collection)
     * @see  com.alipay.sofa.registry.consistency.hash.ConsistentHash#getNUniqueNodesFor(java.lang.Object, int)
     */
    private List<DataNode> triad;

    private Set<String>    ipSetOfNode = new HashSet<>();

    /**
     * constructor
     * @param dataInfoId
     * @param triad
     */
    public BackupTriad(String dataInfoId, List<DataNode> triad) {
        this.dataInfoId = dataInfoId;
        this.triad = triad;
        for(DataNode node : triad) { ipSetOfNode.add(node.getIp()); }}}Copy the code

The runtime is as follows:

backupTriad = {BackupTriad@1400} "BackupTriad {dataInfoId = 'TestDataInfoId, ipSetOfNode = [192.168.0.2, 192.168.0.1 192.168.0.3]}"
 dataInfoId = "TestDataInfoId"
 triad = {ArrayList@1399}  size = 3
  0 = {DataNode@1409} "DataNode = {IP 192.168.0.1}"
  1 = {DataNode@1410} "DataNode = {IP 192.168.0.2}"
  2 = {DataNode@1411} "DataNode = {IP 192.168.0.3}"
 ipSetOfNode = {HashSet@1403}  size = 3
  0 = "192.168.0.2"
  1 = "192.168.0.1"
  2 = "192.168.0.3"
Copy the code

Where does 0x07 changeVersion come from

In the above code, will get a version from LocalDataServerChangeEvent, thus using this version for subsequent processing, at the same time also can give dataServerCache sets the version number.

LocalDataServerChangeEvent event = events.take();
long changeVersion = event.getVersion();
if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) {
    //if local server is working, compare sync data
    notifyToFetch(event, changeVersion);
} else {
    dataServerCache.checkAndUpdateStatus(changeVersion);
    //if local server is not working, notify others that i am newer
    notifyOnline(changeVersion);
}
Copy the code

Now we wonder where this version came from when changes were made to Data Server. Let’s go back to the source. This is going backwards.

7.1 Version number and changes

7.1.1 DataServerCache

Since the dataServerCache setting version number is mentioned, we’ll go back to dataServerCache. As you can see, DataServerCache has two related variables: curVersion and DataServerChangeItem.

This is to get the corresponding data Center version number from newDataServerChangeItem, set to DataServerCache.

DataServerCache is defined as follows:

public class DataServerCache {
  
    /** new input dataServer list and version */
    private volatile DataServerChangeItem                 newDataServerChangeItem = new DataServerChangeItem();
  
    private AtomicLong                                    curVersion              = new AtomicLong(-1L);

    public Long getDataCenterNewVersion(String dataCenter) {
        synchronized (DataServerCache.class) {
            Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
            if (versionMap.containsKey(dataCenter)) {
                return versionMap.get(dataCenter);
            } else {
                return null; }}}}Copy the code

7.1.2 Settings and Usage

In DataServerCache, only addStatus controls curVersion assignments. In the external interface, only Synced and addNotWorkingServer call addStatus.

NewDataServerChangeItem is set in compareAndSet.

public Map<String, Set<String>> compareAndSet(DataServerChangeItem newItem, FromType fromType) {
            if (!changedMap.isEmpty()) {
                newDataServerChangeItem = newItem;
            }
}
Copy the code

The logic is as follows:

                        +-----------------------------+
                        |[DataServerCache]            |
                        |                             |
compareAndSet +-------------> DataServerChangeItem    |
                        |                             |
                        |     curVersion              |
                        |     ^        ^              |
                        |     |        |              |
                        +-----------------------------+
                              |        |
synced +----------------------+        |
                                       |
addNotWorkingServer+-------------------+
Copy the code

7.1.3 Two Design points

There are two design points for DataServerCache:

  • What is curVersion used for?
  • What newDataServerChangeItem is used for;

It is now inferred that each Data Center Data Center has a version number that is used for all state control within it. In fact, versionMap can also be seen in the definition of DataServerChangeItem, which is controlled by version number.

DataServerChangeItem is defined as follows:

public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}
Copy the code

Thus know:

  • CurVersion indicates the latest version of the Data Center.
  • NewDataServerChangeItem is the change data corresponding to the latest version number;

Now the question becomes,

  • Where does the DataServerChangeItem come from?
  • Where does curVersion come from?

We read the source code to know that it is obtained from Meta Server, and we will follow the process below.

7.2 the Data Server

7.2.1 Actively capture changes

We need to go over the process.

MetaServer broadcasts a data Server update to all data Servers, or maybe DataServer takes the initiative to periodically check MetaServer for updates.

However, the data Server actively sends GetNodesRequest to obtain the updated content.

Here to take the initiative to update, for example, you can see, can through metaServerService DataServer getDateServers from meta server access to DataServerChangeItem, Build a DataServerChangeEvent.

public class ConnectionRefreshTask extends AbstractTask {

    @Autowired
    private IMetaServerService metaServerService;

    @Autowired
    private EventCenter        eventCenter;

    @Override
    public void handle(a) {
        DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers();
        if(dataServerChangeItem ! =null) {
            eventCenter
                .post(newDataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK)); }}}Copy the code

In DefaultMetaServiceImpl, you can see that the DataServerChangeItem is extracted from the NodeChangeResult retrieved from the Meta Server.

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Override
    public DataServerChangeItem getDateServers(a) {
        Map<String, Connection> connectionMap = metaServerConnectionFactory
            .getConnections(dataServerConfig.getLocalDataCenter());
        String leader = getLeader().getIp();
        if (connectionMap.containsKey(leader)) {
            Connection connection = connectionMap.get(leader);
            if (connection.isFine()) {
                try {
                    GetNodesRequest request = new GetNodesRequest(NodeType.DATA);
                    Object obj = metaNodeExchanger.request(new Request() {
                        @Override
                        public Object getRequestBody(a) {
                            return request;
                        }

                        @Override
                        public URL getRequestUrl(a) {
                            return new URL(connection.getRemoteIP(), connection.getRemotePort());
                        }
                    }).getResult();
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();
                        versionMap.put(result.getLocalDataCenter(), result.getVersion());
                        return new DataServerChangeItem(result.getNodes(), versionMap);
                    }
                } 
            }
        }
        String newip = refreshLeader().getIp();
        return null; }}Copy the code

The logic is as follows:

+  Data Server
|
|
|  +------------------+
|  | NodeChangeResult |
|  +-------+----------+
|          |                                +--------------------------+
|          |                                |[DataServerCache]         |
|          |                                |                          |
|          +---------------->compareAndSet------> DataServerChangeItem |
|     DataServerChangeItem                  |                          |
|                                           |     curVersion           |
|                                           |     ^        ^           |
|                                           |     |        |           |
|                                           +--------------------------+
|                                                 |        |
|                             synced +-------------        |
|                                                          |
|                             addNotWorkingServer----------+
|
|
+

Copy the code

7.3 Meta server

7.3.1 Setting the Version number

Let’s come to Meta Server. As you can see, DataStoreService PUT, remove, and other functions call dataNodeRepository to set the version number through the timestamp when the data Server changes.

dataNodeRepository.setVersion(currentTimeMillis);
Copy the code

7.3.2 Extracting the Version Number

When meta Server receives the GetNodesRequest, a NodeChangeResult is generated.

DataStoreService calls dataNodeRepository to get the version number, which is set in NodeChangeResult.

public class DataStoreService implements StoreService<DataNode> {
    @Override
    public NodeChangeResult getNodeChangeResult(a) {

        NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.DATA);

        try {
            String localDataCenter = nodeConfig.getLocalDataCenter();
            Map<String/*dataCenter*/, NodeRepository> dataNodeRepositoryMap = dataRepositoryService
                    .getNodeRepositories();

            ConcurrentHashMap<String/*dataCenter*/, Map<String/*ipAddress*/, DataNode>> pushNodes = new ConcurrentHashMap<>();
            Map<String/*dataCenter*/, Long> versionMap = new ConcurrentHashMap<>();

            dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
               // Get the version number here
                if (localDataCenter.equalsIgnoreCase(dataCenter)) {                
                    nodeChangeResult.setVersion(dataNodeRepository.getVersion());
                }
            
                versionMap.put(dataCenter, dataNodeRepository.getVersion());

                Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
                Map<String, DataNode> newMap = new ConcurrentHashMap<>();
                dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
                pushNodes.put(dataCenter, newMap);
            });

            nodeChangeResult.setNodes(pushNodes);
            nodeChangeResult.setDataCenterListVersions(versionMap);
            nodeChangeResult.setLocalDataCenter(localDataCenter);
        } 
				/ / return
        returnnodeChangeResult; }}Copy the code

Details are as follows:

Meta Server + Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +------------------------->  | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet------> DataServerChangeItem | +-----------------+ getVersion | |  DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | +--------------------------+ +----------------------+ +-+-----------------+ | | | | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------- | +----------------------+ +-------------------+ | | setVersion(currentTimeMillis) | addNotWorkingServer----------+ | | +Copy the code

As shown on the phone:

7.4 the Data Server

7.4.1 Get changes

Let’s go back to Data Server.

When the DataServer receives the NodeChangeResult, it extracts the DataServerChangeItem.

public class DefaultMetaServiceImpl implements IMetaServerService {
  
    @Override
    public DataServerChangeItem getDateServers(a) {... GetNodesRequest request =new GetNodesRequest(NodeType.DATA);
                    Object obj = metaNodeExchanger.request(new Request() {
                      
                        ......
                      
                        }
                    }).getResult();
      
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();
                        versionMap.put(result.getLocalDataCenter(), result.getVersion());
                        return new DataServerChangeItem(result.getNodes(), versionMap);
                    }
                } 
            }
        }
    }  
}
Copy the code

And then will return to “actively seek changes” in the front section, send DataServerChangeEvent, then into LocalDataServerChangeEvent, is linked with our code.

0x08 Data Server Follow-up Procedure

We need to look at how dataserverCache. curVersion and newDataServerChangeItem are handled further.

8.1 newDataServerChangeItem

DataServerChangeEventHandler doHandle function in use:

for (Entry<String, Set<String>> changeEntry : changedMap.entrySet()) {
    String dataCenter = changeEntry.getKey();
    Set<String> ips = changeEntry.getValue();
    Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter);
}
Copy the code

Call dataServerCache (newDataServerChangeItem);

public Long getDataCenterNewVersion(String dataCenter) {
    synchronized (DataServerCache.class) {
        Map<String, Long> versionMap = newDataServerChangeItem.getVersionMap();
        if (versionMap.containsKey(dataCenter)) {
            return versionMap.get(dataCenter);
        } else {
            return null; }}}Copy the code

Build LocalDataServerChangeEvent, put a local version localDataCenterversion newDataServerChangeItem version.

public LocalDataServerChangeEvent(Map<String, DataNode> localDataServerMap,
                                  Set<String> newJoined, long version,
                                  long localDataCenterversion) {
    this.localDataServerMap = localDataServerMap;
    this.newJoined = newJoined;
    this.version = version;
    this.localDataCenterversion = localDataCenterversion;
}
Copy the code

DataServerCache updates the data accordingly.

dataServerCache.updateItem(dataServerMapIn, event.getLocalDataCenterversion(),
    dataServerConfig.getLocalDataCenter());
Copy the code

8.2 curVersion

With curVersion, we come to notifyToFetch and notifyOnline.

8.2.1 Sending the version number

Previously we just explained how to send a version number, namely:

  • The online service node uses NotifyFetchDatumRequest to tell the new node, I have the data you need, come and get it.
  • A newly added node uses a NotifyOnlineRequest to tell the other nodes that are already online that I am new and you can configure it accordingly.

Therefore, we conclude that after receiving the data Server change message from Meta Server, all data servers in the same data Center will inform each other of the upgrade version number.

  • NotifyOnline sends a NotifyOnlineRequest, and the NotifyOnlineHandler of other Data servers does the corresponding processing.

  • NotifyToFetch sends NotifyFetchDatumRequest, and notifyFetchDatumHandler of other Data servers does the corresponding processing.

8.2.2 Receiving the version number

Let’s take a look at what the new and online nodes of DataServer do after receiving the version number.

  • NotifyFetchDatumHandler —- New node processing

This is a data pull request. When triggered, the Handler notifies the current DataServer node to compare versions. If the version in the request is higher than that in the cache of the current node, the data is synchronized to ensure that the data is up to date.

  • NotifyOnlineHandler —- Online node processing

This is a DataServer notification Handler that is triggered when another node goes online, so that the current node stores new node information in the cache. Used to manage node status, whether it is INITIAL or WORKING.

Thus, both NotifyOnlineHandler and NotifyFetchDatumHandler determine whether to continue processing based on the curVersion stored in the local dataServerCache.

public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {

    @Autowired
    private DataServerCache dataServerCache;

    @Override
    public Object doHandle(Channel channel, NotifyOnlineRequest request) {
        long version = request.getVersion();
        if (version >= dataServerCache.getCurVersion()) {
            dataServerCache.addNotWorkingServer(version, request.getIp());
        }
        returnCommonResponse.buildSuccessResponse(); }}Copy the code

And the NotifyFetchDatumHandler calls sycned.

public class NotifyFetchDatumHandler extends AbstractServerHandler<NotifyFetchDatumRequest> {

    private static final Logger         LOGGER = LoggerFactory
                                                   .getLogger(NotifyFetchDatumHandler.class);

    @Autowired
    private DataServerCache             dataServerCache;

    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter       dataChangeEventCenter;

    @Autowired
    private Exchange                    boltExchange;

    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private DatumCache                  datumCache;

    @Autowired
    private LocalDataServerCleanHandler localDataServerCleanHandler;

    @Override
    public Object doHandle(Channel channel, NotifyFetchDatumRequest request) {
        ParaCheckUtil.checkNotBlank(request.getIp(), "ip");

        //receive other data NotifyFetchDatumRequest,must delay clean datum task until fetch all datum
        localDataServerCleanHandler.reset();

        Map<String, Map<String, Long>> versionMap = request.getDataVersionMap();
        long version = request.getChangeVersion();
        String ip = request.getIp();
        if (version >= dataServerCache.getCurVersion()) {
            if (versionMap.isEmpty()) {
                dataServerCache.synced(version, ip);
            } else {
                ExecutorFactory.getCommonExecutor().execute(() -> {
                    for (Entry<String, Map<String, Long>> dataCenterEntry : versionMap.entrySet()) {
                        String dataCenter = dataCenterEntry.getKey();
                        Map<String, Long> map = dataCenterEntry.getValue();
                        for (Entry<String, Long> dataInfoEntry : map.entrySet()) {
                            String dataInfoId = dataInfoEntry.getKey();
                            Datum datum = datumCache.get(dataCenter, dataInfoId);
                            if(datum ! =null) {
                                long inVersion = dataInfoEntry.getValue();
                                long currentVersion = datum.getVersion();
                                if (currentVersion > inVersion) {
                                    continue;
                                } else if (datum.getVersion() == dataInfoEntry.getValue()) {
                                    //if version same,maybe remove publisher all by LocalDataServerCleanHandler,so must fetch from other node
                                    if(! datum.getPubMap().isEmpty()) {continue; } } } fetchDatum(ip, dataCenter, dataInfoId); } } dataServerCache.synced(version, ip); }); }}returnCommonResponse.buildSuccessResponse(); }}Copy the code

Thus, the present is as follows:

+ | Meta Server | Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +-------------------------> | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet+-----> DataServerChangeItem | +-----------------+ getVersion | | DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + - + - + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | ^ ^ | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------------------------------+ | | | getCurVersion +----------------------+ +-------------------+ | | | | setVersion(currentTimeMillis) | addNotWorkingSer^er+---------------------------------+ | | | +-------------------------------------------+ | | | getCurVersion | | | | | +-------------+---------+ +---------------------+----+ | | NotifyOnlineHandler | | NotifyFetchDatumHandler | | +-------------+---------+ +---------------+----------+ | ^ In Exist Server ^ In New Server |  | | | | | +-----------------------------------------------------------------------------+ | | | | +-------+------------+ +---------+-----------+ | New Data Server | | Exist Data Server | +--------------------+ +---------------------+Copy the code

The mobile phone is as follows:

At this point, the version number process is completely combed out.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

How does ant Financial Service registry realize the smooth scaling of DataServer

Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization

The service registry Session storage policy | SOFARegistry parsing

Introduction to the Registry – SOFARegistry architecture for Massive Data

Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing

Ant Financial open source communication framework SOFABolt analysis of connection management analysis

Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework

SOFABolt protocol framework analysis of Ant Financial open source communication framework

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing