0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This article, part 15, analyzes how to implement the ChangeNotifier to inform related modules: Hi, new data changes are coming, brothers, let’s go.

0x01 Business Category

1.1 an overview of the

When a publisher goes online, the publishDataProcessor or unPublishDataHandler is triggered, which adds a data change event to the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data. After receiving the event, the event Change center adds the event to the queue.

  1. In this case, dataChangeEventCenter will asynchronously process the upstream and downstream data according to different event types. That is, change the event change information into ChangeNotifier, then into Operator, and put it in AbstractAcceptorStore.

  2. At the same time, the DataChangeHandler will publish the event change information through the ChangeNotifier, notifying other nodes to synchronize data.

Due to space constraints, the previous part of the ChangeNotifier section is only skipped, this article will explain the ChangeNotifier event change notification in detail. Here we will link the process together again, which will refer to some of the previous articles.

I’m going to show you what ChangeNotifier does.

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+
                 |
                 |
                 v
             +---+------+
             | notifier |
             +---+------+
                 |
                 v
     +-----------+---------------+
     |                           |
     v                           v
+----+----------------+   +------+----------+
|SessionServerNotifier|   |  BackUpNotifier |
+----+----------------+   +------+----------+
     |                           |
     |                           |
     |                           |
     |                           v
  +--v------------+       +------+----------------+
  | sessionServer |       | AbstractAcceptorStore |
  +---------------+       +-----------------------+
Copy the code

1.2 Data Changes

The data changes in two directions

  • Data server node changes;

  • The change of data, namely the change of Publisher and Scriber;

ChangeNotifier is responsible for notifying related modules of Publisher and Scriber changes. Change notifications are a form of decoupling.

0x02 Data structure

We first need to look at the data structure of the notification.

2.1 Interface Definition

IDataChangeNotifier is the notification interface definition:

public interface IDataChangeNotifier {
    Set<DataSourceTypeEnum> getSuitableSource(a);

    / * * * *@param datum
     * @param lastVersion
     */
    void notify(Datum datum, Long lastVersion);
}
Copy the code

2.2 derived class

IDataChangeNotifier has four derived classes, which correspond to the four possible changes of specific data. The purpose can be roughly determined from the name.

public class BackUpNotifier implements IDataChangeNotifier

public class SessionServerNotifier implements IDataChangeNotifier

public class SnapshotBackUpNotifier implements IDataChangeNotifier

public class TempPublisherNotifier implements IDataChangeNotifier
Copy the code

2.3 the Bean

The corresponding Bean is as follows:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers(a) {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}
Copy the code

0 x03 process

Let’s go over the process from the beginning.

3.1 Adding Messages

When a publisher goes online, the publishDataProcessor or unPublishDataHandler is triggered, which adds a data change event to the dataChangeEventCenter. Used to asynchronously notify the event Change center of changes to data. After receiving the event, the event Change center adds the event to the queue.

Here’s how it works at DataServer:

3.1.1 PublishDataHandler

PublishDataHandler responds to a PublishDataRequest. When Publisher is available, put a message into the DataChangeEventCenter. Call below to place the message

dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
Copy the code

The specific code is as follows:

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {

    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if(publisher.getPublishType() ! = PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId);// record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        returnCommonResponse.buildSuccessResponse(); }}Copy the code

The specific logic is as follows:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
Copy the code

3.1.2 DataChangeEventCenter

At the heart of the DataChangeEventCenter is an array of DataChangeEventQueue,

The datachangeEventCenter. onChange function first retrieves the Hash value from Publisher’s DataInfoId to determine which queue to put DataChangeEvent messages in. That’s calling the onChange function on this queue.

public class DataChangeEventCenter {
    /** * queues of DataChangeEvent */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DatumCache             datumCache;

    @PostConstruct
    public void init(a) {
        if (isInited.compareAndSet(false.true)) {
            queueCount = dataServerConfig.getQueueCount();
            dataChangeEventQueues = new DataChangeEventQueue[queueCount];
            for (int idx = 0; idx < queueCount; idx++) {
                dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this, datumCache); dataChangeEventQueues[idx].start(); }}}/**
     * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
     *
     * @param publisher
     * @param dataCenter
     */
    public void onChange(Publisher publisher, String dataCenter) {
        int idx = hash(publisher.getDataInfoId());
        Datum datum = new Datum(publisher, dataCenter);
        if (publisher instanceof UnPublisher) {
            datum.setContainsUnPub(true);
        }
        if(publisher.getPublishType() ! = PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));
        } else {
            dataChangeEventQueues[idx].onChange(newDataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); }}}Copy the code

The main data members of DataChangeEventQueue are as follows:

public class DataChangeEventQueue {
    /** * a block queue that stores all data change events */
    private final BlockingQueue<IDataChangeEvent>      eventQueue;

    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}
Copy the code

The execution engine is a thread that blocks on top of BlockingQueue eventQueue, and when there is a message, it pulls it out and does different processing for the message type.

public void start(a) {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else{ handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum()); }}else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if(scope == DataChangeScopeEnum.SNAPSHOT) { handleSnapshot((DatumSnapshotEvent) event); }}}}); }Copy the code

For Publisher message types, the handleDatum function does different processing depending on whether changeType is COVER or MERGE.

In this step, ChangeData is also put into change_queue.put (ChangeData);

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
    lock.lock();
    try {
        //get changed datumChangeData changeData = getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType);  Datum cacheDatum = changeData.getDatum();if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if(cachePub ! =null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if(! (pubinstanceofUnPublisher) && ! (cachePubinstanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue; } } cachePubMap.put(registerId, pub); cacheDatum.setVersion(targetDatum.getVersion()); }}}finally{ lock.unlock(); }}Copy the code

The specific logic is as follows:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
Copy the code

3.2 Consuming messages & sending notifications

The DataChangeHandler will notify each DataChangeEventQueue for consumption.

public class DataChangeHandler {

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;

    @PostConstruct
    public void start(a) {
        DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
        int queueCount = queues.length;
        Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
        Executor notifyExecutor = ExecutorFactory
                .newFixedThreadPool(dataServerConfig.getQueueCount() * 5.this.getClass().getSimpleName());
        for (int idx = 0; idx < queueCount; idx++) {
            final DataChangeEventQueue dataChangeEventQueue = queues[idx];
            final String name = dataChangeEventQueue.getName();
            executor.execute(() -> {
                while (true) {
                    try {
                        final ChangeData changeData = dataChangeEventQueue.take();
                        notifyExecutor.execute(newChangeNotifier(changeData, name)); }}}); }}}Copy the code

3.2.1 DataChangeHandler

DataChangeHandler periodically extracts messages from the DataChangeEventCenter and processes them.

3.2.2 the class definition

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;
}
Copy the code

3.2.3 Execution Engine

Here is a two-tier thread model.

  • executor = ExecutorFactory.newFixedThreadPool(queueCount)

  • notifyExecutor= ExecutorFactory.newFixedThreadPool(dataServerConfig.getQueueCount() * 5)

You can think of executors as control threads and NotifierExecutors as worker threads, which are five times as many as control threads.

  • The DataChangeHandler will iterate through all datachAngeEventQueues in the DataChangeEventCenter,
  • One thread of control for each dataChangeEventQueue call executor,
  • In this thread of control, ChangeData can be retrieved from the DataChangeEventQueue, and for each ChangeData, a worker thread of notifyExecutor is called, generating a ChangeNotifier for processing.
@PostConstruct
public void start(a) {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5.this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(newChangeNotifier(changeData, name)); }}); }}Copy the code

3.2.4 Service Execution

For ChangeData, a ChangeNotifier is generated for processing. The event change information is published through the ChangeNotifier to inform other nodes to synchronize data.

In ChangeNotifier, the type of changeData is determined and processed differently.

  • If it is SnapshotData, then:
    • Generate SnapshotData;
    • Call datumCache.putSnapshot for storage;
    • Call notify to notify;
  • If it is any other type, then:
    • For pub or unPub merge, datum.updateversion () is required;
    • If PUB_TEMP, notifyTempPub(datum, sourceType, changeType);
    • If the version is updated, notify(datum, sourceType, lastVersion);

Details are as follows:

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run(a) {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if(changeType == DataChangeTypeEnum.MERGE && sourceType ! = DataSourceTypeEnum.BACKUP && sourceType ! = DataSourceTypeEnum.SYNC) {//update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if(datumCache.cleanDatum(dataCenter, dataInfoId)) { ...... }}else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if(lastVersion ! =null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null|| version ! = lastVersion) {if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }
    }
}
Copy the code

The specific logic is as follows:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+
Copy the code

3.2.5 notice

The notify function iterates through the dataChangeNotifiers to find notifiers that support the Datum’s corresponding SourceType.

Exactly how and which functions are supported is set up by getSuitableSource.

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if(notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); }}}Copy the code

The corresponding Bean is:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers(a) {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}
Copy the code

3.2.6 BackUpNotifier synchronization

Is called syncDataService. AppendOperator notify, in fact, the Datum into Operator, endures AbstractAcceptorStore.

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public Set<DataSourceTypeEnum> getSuitableSource(a) {
        Set<DataSourceTypeEnum> set = new HashSet<>();
        set.add(DataSourceTypeEnum.PUB);
        return set;
    }

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(newOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); }}Copy the code

3.2.7 SessionServerNotifier Notifying data changes

SessionServerNotifier is much more complicated.

public class SessionServerNotifier implements IDataChangeNotifier {

    private AsyncHashedWheelTimer          asyncHashedWheelTimer;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;
  
    @Override
    public Set<DataSourceTypeEnum> getSuitableSource(a) {
        Set<DataSourceTypeEnum> set = new HashSet<>();
        set.add(DataSourceTypeEnum.PUB);
        set.add(DataSourceTypeEnum.SYNC);
        set.add(DataSourceTypeEnum.SNAPSHOT);
        returnset; }}Copy the code
3.2.7.1 time round

Set up a 500 millisecond time wheel.

@PostConstruct
public void init(a) {
    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
    threadFactoryBuilder.setDaemon(true);
    asyncHashedWheelTimer = new AsyncHashedWheelTimer(threadFactoryBuilder.setNameFormat(
        "Registry-SessionServerNotifier-WheelTimer").build(), 500, TimeUnit.MILLISECONDS, 1024,
        dataServerConfig.getSessionServerNotifierRetryExecutorThreadSize(),
        dataServerConfig.getSessionServerNotifierRetryExecutorQueueSize(), threadFactoryBuilder
            .setNameFormat("Registry-SessionServerNotifier-WheelExecutor-%d").build(),
        new TaskFailedCallback() {
            @Override
            public void executionRejected(Throwable e) {
                LOGGER.error("executionRejected: " + e.getMessage(), e);
            }

            @Override
            public void executionFailed(Throwable e) {
                LOGGER.error("executionFailed: "+ e.getMessage(), e); }}); }Copy the code

From the business perspective, when there is news about Publisher,

The notify function of the DataChangeHandler iterates through the dataChangeNotifiers to find notifiers that support the Datum’s corresponding SourceType.

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if(notifier.getSuitableSource().contains(sourceType)) { notifier.notify(datum, lastVersion); }}}Copy the code

The notify function in SessionServerNotifier iterates through all connections currently cached and notifies each one.

@Override
public void notify(Datum datum, Long lastVersion) {
    DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(),
        datum.getDataCenter(), datum.getVersion());
    List<Connection> connections = sessionServerConnectionFactory.getSessionConnections();
    for (Connection connection : connections) {
        doNotify(newNotifyCallback(connection, request)); }}Copy the code

Specific notification function:

private void doNotify(NotifyCallback notifyCallback) {
    Connection connection = notifyCallback.connection;
    DataChangeRequest request = notifyCallback.request;
    try {
        //check connection active
        if(! connection.isFine()) {return;
        }
        Server sessionServer = boltExchange.getServer(dataServerConfig.getPort());
      sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()),
            request, notifyCallback, dataServerConfig.getRpcTimeout());
    } catch(Exception e) { onFailed(notifyCallback); }}Copy the code

The time wheel is used in retries of failed calls.

When the maximum number of failed retries is not reached, periodic retries are performed.

private void onFailed(NotifyCallback notifyCallback) {

    DataChangeRequest request = notifyCallback.request;
    Connection connection = notifyCallback.connection;
    notifyCallback.retryTimes++;

    //check version, if it's fall behind, stop retry
    long _currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
    if(request.getVersion() ! = _currentVersion) {return;
    }

    if (notifyCallback.retryTimes <= dataServerConfig.getNotifySessionRetryTimes()) {
        this.asyncHashedWheelTimer.newTimeout(timeout -> {
            //check version, if it's fall behind, stop retry
            long currentVersion = datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
            if(request.getVersion() == currentVersion) { doNotify(notifyCallback); } }, getDelayTimeForRetry(notifyCallback.retryTimes), TimeUnit.MILLISECONDS); }}Copy the code

The specific logic is as follows:

        +--------------------+
        | PublishDataHandler |
        +--------+-----------+
                 |
                 |
                 |  publisher
                 |
                 v
       +---------+------------+
       |DataChangeEventCenter |
       +---------+------------+
                 |
                 |
                 | ChangeData
                 v
       +---------+------------+
       | DataChangeEventQueue |
       +---------+------------+
                 |
                 |
                 |  ChangeData
                 v
         +-------+----------+
         | DataChangeHandler|
         +-------+----------+
                 |
                 |
                 |  ChangeData
                 v
          +------+--------+              +------------+
          | ChangeNotifier|  +-------->  | datumCache |
          +------+--------+              +------------+
                 |
                 |
                 v
             +---+------+
             | notifier |
             +---+------+
                 |
                 v
     +-----------+---------------+
     |                           |
     v                           v
+----+----------------+   +------+----------+
|SessionServerNotifier|   |  BackUpNotifier |
+----+----------------+   +------+----------+
     |                           |
     |                           |
     |                           |
     |                           v
  +--v------------+       +------+----------------+
  | sessionServer |       | AbstractAcceptorStore |
  +---------------+       +-----------------------+
Copy the code

0 x04 summary

This article is a point in the registration of “event change notification ChangeNotifie” for detailed expansion, with SessionServerNotifier and BackUpNotifier as an example, to explain the principle and use of ChangeNotifier. The functions including dataChangeEventCenter are also sorted out, hoping to be helpful to everyone.

At DataServer, data changes in two directions:

  • Data server node changes;

  • The change of data, namely the change of Publisher and Scriber;

ChangeNotifier is responsible for notifying related modules of Publisher and Scriber changes. Change notifications are a form of decoupling.

0 XFF reference

SOFARegistry of Ant Financial launches

Ant Financial SOFARegistry provides service registration and operation logs