0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This article, part seven, introduces the push and pull model for SOFARegistry network operations.

0x01 Related Concepts

Push or Pull??

1.1 Push model and pull model

In the observer model, it can be divided into push model and pull model.

Push model: The subject object pushes the details of the subject to the observer, usually all or part of the subject object’s data, whether the observer needs it or not.

Pull model: The topic object transmits only a small amount of information when notifying the observer. If the observer needs more specific information, the observer actively goes to the topic object to get it, which is equivalent to the observer pulling data from the topic object.

The detailed analysis of the two models is as follows:

1.1.1 Deduction Model:

Features:
  • Based on client/server mechanism, the server actively sends information to the client technology;
  • The “tweet” approach means that the Subject maintains a list of observers, and whenever an update occurs, the Subject actively pushes the update to various observers.
  • There is no obvious customer request before the server sends the information to the client. The push transaction is initiated by the server.
  • The subject object pushes the details of the subject to the observer, usually all or part of the subject object’s data, whether the observer needs it or not.
  • The push model assumes that the subject object knows the data the observer needs;
Advantages:
  • Push mode can make information actively and quickly find users/customers, and the initiative and real-time of information is better.
  • Efficient. If no update occurs, there is no action to update the message push, which means that every time a message push occurs after an actual update event, it makes sense.
  • In real time. Notification is triggered the first time an event occurs.
  • Notification times can be set by the Subject to avoid some busy times.
  • It can express the sequence of events
Disadvantages:
  • The accuracy is poor, and the information may not necessarily meet the needs of customers. Push mode does not guarantee that information can be sent to the client;
  • Because push mode uses broadcast mechanism, if the client happens to be connected and on the same channel as the server, push mode is effective;
  • The push mode cannot track the state and adopts the open-loop control mode without user feedback.
  • The information pushed is usually all or part of the subject object’s data, whether or not the observer needs it;

1.1.2 the pull model

Features:
  • Is a transaction actively initiated by the client. The server has their own information in the specified address (such as IP, port), the client sent requests to the specified address, the need for their own resources “pull” back;
  • The “pull” method means that each Observer maintains a list of subjects they are interested in and decides to go to the Subject to obtain the corresponding updated data at the appropriate time.
  • The pull model is that the subject object does not know what data the observer needs. If there is no way, it simply passes itself to the observer and allows the observer to value the data as needed.
Advantages:
  • It can not only accurately obtain the resources it needs, but also timely feedback the status of the client to the server.
  • If there are many observers, it may be difficult or bloated for the Subject to maintain a list of subscribers, thus freeing the subscription relationship to be completed by the Observer.
  • An Observer can ignore change events that it does not care about and simply fetch events that it is interested in;
  • The Observer can decide when to fetch update events;
  • The pull format gives the Subject more control over the access of each Observer to each query update;
Disadvantages:
  • The biggest disadvantage is not timely;

1.2 Guava LoadingCache

Guava is a memory caching module in Google Guava for caching data into JVM memory. In actual project development, some common or commonly used data is often cached to facilitate fast access.

Google Guava Cache provides Cache reclamation based on volume, time, and reference. The volume-based approach adopts LRU algorithm in internal implementation, which makes good use of the garbage collection mechanism of Java virtual machine based on reference collection.

The cache constructor, CacheBuilder, provides a cache object with various parameters set in builder mode. The internal class Segment of the cache core class LocalCache is very similar to the ConcurrentHashMap of JDk1.7 and earlier. All inherited from ReetrantLock, and six queues for a rich local caching scheme.

0x02 Business Domain

2.1 Application Scenarios

SOFARegistry’s business features are:

  • SOFARegistry is divided into three clusters: MetaServer, SessionServer, and DataServer.
  • DataServer, SessionServer, and MetaServer are essentially web applications;
  • Complex systems have several areas where consistency concerns need to be considered, such as when a service Publisher goes online or is disconnected, the data is registered to DataServer via SessionServer. At this point, the DataServer data and the SessionServer will have a temporary inconsistency.
  • SOFARegistry takes a different approach to the consistency requirements of different modules. For MetaServer module, Raft protocol with strong consistency is used to ensure the consistency of cluster information. For data modules, SOFARegistry chooses AP to ensure availability while ensuring final consistency.

2.2 points

Through business, we can think of the following problems:

  • New message data needs to be updated in real time. To achieve second-level notifications, this generally requires pushing the model;
  • But the extrapolation model is difficult to ensure stability;
  • Push mode, the client code is simple, by the server to push data, eliminating the unnecessary polling operations of the client. But the push logic needs to be complicated on the server side.
  • Pull mode, need to maintain offset, load balancing, etc.

2.3 Solutions

For the above problems, the industry generally adopts a combination of “push” and “pull”. For example, the server is only responsible for notifying “some data is ready”, and the client determines whether and when to obtain the data.

2.4 Ali Plan

Let’s start by looking at what Ali did.

2.4.1 Application of various models

In SOFARegistry ‘, various models are applied, such as:

  • The communication between SessionServer and DataServer is based on push-pull mechanism
    • Tweet: DataServer notifies the SessionServer when data changes, and the SessionServer checks to see if it needs to update (compare version).
    • Rachel: In addition to DataServer actively pushing, SessionServer will actively query the version information of all datainfoids to DataServer every certain interval (default 30 seconds). Then it compares the version of the SessionServer memory. If the version is changed, it obtains data from the DataServer. This “pull” logic, is mainly a supplement to “push”, if there is a mistake in the process of “push” can be made up in time at this time.
  • SOFARegistry’s service discovery pattern uses a combination of push and pull.
    • When the subscription information is published to the server, the client can query the address list once, obtain the full data, and store the corresponding service ID version information in the Session call layer. If the server releases data changes later, it notifies the Session of the call layer through the service ID version change. Session stores client subscriptions to know which clients need this service information, and then decides whether to push it to subscribers with older versions according to the size of the version number. The client also determines whether to update the result of this push to cover memory by comparing the version.
    • In addition, in order to avoid the failure of obtaining a change notice, the version number difference is regularly compared, and the data required by subscribers with lower versions are regularly pulled to ensure the consistency of data.
  • Between Client and SessionServer, completely based on push mechanism
    • The SessionServer directly pushes dataInfoId data to the Client after receiving DataServer data change notifications or after the SessionServer periodically queries DataServer to find that data has changed and obtains the data again. If the process fails to be successfully pushed to the Client due to network problems, the SessionServer will retry the process a certain number of times (default: 5). If the process fails, the SessionServer will continue to run the DataServer every 30 seconds. Data is pushed to the Client again.

Below is a comparison of data push in the two scenarios.

2.4.2 Push-pull model

In terms of the problem domain involved in this paper, Ant Financial adopts the classic push-pull model to maintain Data consistency. Here, we only take the maintenance of Data consistency between Session Server and Data Server as an example. The general logic is as follows:

SOFARegistry uses a LoadingCache data structure to cache data synchronized from DataServer in SessionServer.

  • The pull model:
    • Each entry in the cache has an expiration time. You can set the expiration time (30s by default) when you pull data from the cache.
    • This expiration allows the cache to periodically query DataServer for datainfoids of all subs of the current session, and compare them with the latest push version of the session record (seecom.alipay.sofa.registry.server.session.store.SessionInterests#interestVersions) Is smaller than DataServer, indicating that it needs to be pushed.
    • Then the SessionServer automatically obtains the dataInfoId data from the DataServer (cached in the cache) and pushes it to the client.
    • This “pull” logic, is mainly a supplement to “push”, if there is a mistake in the process of “push” can be made up in time at this time.
  • The push model:
    • When data is updated in the DataServer, it sends a request to the SessionServer to invalidate the cache entry.
    • When the SessionServer checks that it needs to update (compare version), it actively obtains data from DataServer.
    • SessionServer updates invalid entries.

0x03 Pull Model in Session Server

SOFARegistry uses the Guava LoadingCache data structure. By setting expiration times for entries in the cache, the cache periodically pulls data from the DataServer to replace expired entries.

A rough example of the model is as follows, which will be explained in detail below:

 +-----------------------------------------+
 |            Session Server               |
 |                                         |
 | +-------------------------------------+ |
 | |        SessionCacheService          | |
 | |                                     | |
 | | +--------------------------------+  | |
 | | |                                |  | |
 | | |    LoadingCache<Key, Value>    |  | |
 | | |            +                   |  | |
 | | |            |  expireAfterWrite |  | |
 | | |            |                   |  | |
 | | |            v                   |  | |
 | | |     DatumCacheGenerator        |  | |
 | | |            +                   |  | |
 | | +--------------------------------+  | |
 | +-------------------------------------+ |
 |                |                        |
 |                v                        |
 |       +--------+------------+           |
 |       | DataNodeServiceImpl |           |
 |       +--------+------------+           |
 |                |                        |
 +-----------------------------------------+
                  |
                  |   GetDataRequest
                  |
+-------------------------------------------+
                  |
                  |
                  v
          +-------+-----------+
          |   Data Server     |
          +-------------------+
Copy the code

3.1 the Bean

The related beans are defined as follows, where SessionCacheService applies Guava LoadingCache and DatumCacheGenerator is the concrete load implementation.

@Configuration
public static class SessionCacheConfiguration {

    @Bean
    public CacheService sessionCacheService(a) {
        return new SessionCacheService();
    }

    @Bean(name = "com.alipay.sofa.registry.server.session.cache.DatumKey")
    public DatumCacheGenerator datumCacheGenerator(a) {
        return newDatumCacheGenerator(); }}Copy the code

3.2 Code Analysis

The pull model is implemented in the SessionCacheService class, which has the following code for a truncated version

public class SessionCacheService implements CacheService {
    private final LoadingCache<Key, Value> readWriteCacheMap; 
    privateMap<String, CacheGenerator> cacheGenerators; . }Copy the code

And you can see the core of that is utilization

private final LoadingCache<Key, Value> readWriteCacheMap;
Copy the code

3.2.1 Cache structure

An example of constructing a LoadingCache is as follows:

  • Its cache pool size is 1000, and Guava starts to reclaim old cache entries as they approach that size.

  • Its Settings cache after writing, setting time 31000 ms invalid;

  • Generate a CacheLoader class for automatic loading by calling generatePayload;

The code is as follows:

this.readWriteCacheMap = CacheBuilder.newBuilder().maximumSize(1000L)
    .expireAfterWrite(31000, TimeUnit.MILLISECONDS).build(new CacheLoader<Key, Value>() {
        @Override
        public Value load(Key key) {
            returngeneratePayload(key); }});Copy the code

3.2.2 get value

The function to get value is simple:

@Override
public Value getValue(final Key key) throws CacheAccessException {
    Value payload = null;
    payload = readWriteCacheMap.get(key);
    return payload;
}

@Override
public Map<Key, Value> getValues(final Iterable<Key> keys) throws CacheAccessException {
    Map<Key, Value> valueMap = null;
    valueMap = readWriteCacheMap.getAll(keys);
    return valueMap;
}
Copy the code

3.2.3 Batch Clearing

Clear bulk cache objects. This API is used when the Data Server actively sends Push Data to the Session Server, which triggers an active fetch.

@Override
public void invalidate(Key... keys) {
    for(Key key : keys) { readWriteCacheMap.invalidate(key); }}Copy the code

3.2.4 Automatic Loading

Automatic loading is done using the CacheGenerator.

private Value generatePayload(Key key) {
    Value value = null;
    switch (key.getKeyType()) {
        case OBJ:
            EntityType entityType = key.getEntityType();
            CacheGenerator cacheGenerator = cacheGenerators
                .get(entityType.getClass().getName());
            value = cacheGenerator.generatePayload(key);
            break;
        case JSON:
            break;
        case XML:
            break;
        default:
            value = new Value(new HashMap<String, Object>());
            break;
    }
    return value;
}
Copy the code

3.2.5 Setting Loading

Setup loading is done with the following code.

/**
 * Setter method for property <tt>cacheGenerators</tt>.
 *
 * @param cacheGenerators  value to be assigned to property cacheGenerators
 */
@Autowired
public void setCacheGenerators(Map<String, CacheGenerator> cacheGenerators) {
    this.cacheGenerators = cacheGenerators;
}
Copy the code

The runtime parameters are as follows:

cacheGenerators = {LinkedHashMap@3368}  size = 1
 "com.alipay.sofa.registry.server.session.cache.DatumKey" -> {DatumCacheGenerator@3374} 
Copy the code

3.3 Loading class implementation

Loading the class is done with the DatumCacheGenerator.

public class DatumCacheGenerator implements CacheGenerator {
    @Autowired
    private DataNodeService     dataNodeService;

    @Override
    public Value generatePayload(Key key) {

        EntityType entityType = key.getEntityType();
        if (entityType instanceof DatumKey) {
            DatumKey datumKey = (DatumKey) entityType;

            String dataCenter = datumKey.getDataCenter();
            String dataInfoId = datumKey.getDataInfoId();

            if (isNotBlank(dataCenter) && isNotBlank(dataInfoId)) {
                return newValue(dataNodeService.fetchDataCenter(dataInfoId, dataCenter)); }}return null;
    }

    public boolean isNotBlank(String ss) {
        returnss ! =null&&! ss.isEmpty(); }}Copy the code

As you can see, loading is a DataNodeServiceImpl request to DataServer.

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;
  
    @Override
    public Datum fetchDataCenter(String dataInfoId, String dataCenterId) {

        Map<String/*datacenter*/, Datum> map = getDatumMap(dataInfoId, dataCenterId);
        if(map ! =null && map.size() > 0) {
            return map.get(dataCenterId);
        }
        return null;
    }
  
    @Override
    public Map<String, Datum> getDatumMap(String dataInfoId, String dataCenterId) {

        Map<String/*datacenter*/, Datum> map;

        try {
            GetDataRequest getDataRequest = new GetDataRequest();

            //dataCenter null means all dataCenters
            if(dataCenterId ! =null) {
                getDataRequest.setDataCenter(dataCenterId);
            }

            getDataRequest.setDataInfoId(dataInfoId);

            Request<GetDataRequest> getDataRequestStringRequest = new Request<GetDataRequest>() {

                @Override
                public GetDataRequest getRequestBody(a) {
                    return getDataRequest;
                }

                @Override
                public URL getRequestUrl(a) {
                    return getUrl(dataInfoId);
                }

                @Override
                public Integer getTimeout(a) {
                    returnsessionServerConfig.getDataNodeExchangeForFetchDatumTimeOut(); }}; Response response = dataNodeExchanger.request(getDataRequestStringRequest); Object result = response.getResult(); GenericResponse genericResponse = (GenericResponse) result;if(genericResponse.isSuccess()) { map = (Map<String, Datum>) genericResponse.getData(); map.forEach((dataCenter, datum) -> Datum.internDatum(datum)); }}returnmap; }}Copy the code

The pull model is shown in the figure below:

 +-----------------------------------------+
 |            Session Server               |
 |                                         |
 | +-------------------------------------+ |
 | |        SessionCacheService          | |
 | |                                     | |
 | | +--------------------------------+  | |
 | | |                                |  | |
 | | |    LoadingCache<Key, Value>    |  | |
 | | |            +                   |  | |
 | | |            |  expireAfterWrite |  | |
 | | |            |                   |  | |
 | | |            v                   |  | |
 | | |     DatumCacheGenerator        |  | |
 | | |            +                   |  | |
 | | +--------------------------------+  | |
 | +-------------------------------------+ |
 |                |                        |
 |                v                        |
 |       +--------+------------+           |
 |       | DataNodeServiceImpl |           |
 |       +--------+------------+           |
 |                |                        |
 +-----------------------------------------+
                  |
                  |   GetDataRequest
                  |
+-------------------------------------------+
                  |
                  |
                  v
          +-------+-----------+
          |   Data Server     |
          +-------------------+
Copy the code

0 x04 push model

When data is updated in DataServer, it sends a request to the SessionServer to invalidate the corresponding entry, prompting the SessionServer to update the invalid entry.

4.1 Initiate promotion activities

DataChangeRequest in Data Server

When the Data Server has Data changes, it proactively sends DataChangeRequest to the Session Server.

The specific code is in the SessionServerNotifier, which is related to the previous Notifier:

public class SessionServerNotifier implements IDataChangeNotifier {

    private AsyncHashedWheelTimer          asyncHashedWheelTimer;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;
  
    @Override
    public void notify(Datum datum, Long lastVersion) {
        DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(),
            datum.getDataCenter(), datum.getVersion());
        List<Connection> connections = sessionServerConnectionFactory.getSessionConnections();
        for (Connection connection : connections) {
            doNotify(newNotifyCallback(connection, request)); }}private void doNotify(NotifyCallback notifyCallback) {
        Connection connection = notifyCallback.connection;
        DataChangeRequest request = notifyCallback.request;
        try {
            //check connection active
            if(! connection.isFine()) {return;
            }
            Server sessionServer = boltExchange.getServer(dataServerConfig.getPort());
            sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()),
                request, notifyCallback, dataServerConfig.getRpcTimeout());
        } catch(Exception e) { onFailed(notifyCallback); }}}Copy the code

4.2 Receiving Tweet Messages

DataChangeRequestHandler in Session Server

At Session Server, the DataChangeRequestHandler is responsible for responding to incoming tweet messages, DataChangeRequest.

As you can see, it invalidates the Cache by calling the following code, which then reloads the value from the Data Server.

sessionCacheService.invalidate(new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(
        dataChangeRequest.getDataInfoId(), dataChangeRequest.getDataCenter())));
Copy the code

The code for the abridged version is as follows:

public class DataChangeRequestHandler extends AbstractClientHandler {

    /** * store subscribers */
    @Autowired
    private Interests                        sessionInterests;

    @Autowired
    private SessionServerConfig              sessionServerConfig;

    @Autowired
    private ExecutorManager                  executorManager;

    @Autowired
    private CacheService                     sessionCacheService;

    @Autowired
    private DataChangeRequestHandlerStrategy dataChangeRequestHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) {
        DataChangeRequest dataChangeRequest = (DataChangeRequest) message;
        dataChangeRequest.setDataCenter(dataChangeRequest.getDataCenter());
        dataChangeRequest.setDataInfoId(dataChangeRequest.getDataInfoId());

        //update cache when change
        sessionCacheService.invalidate(new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(
            dataChangeRequest.getDataInfoId(), dataChangeRequest.getDataCenter())));

        try {
            boolean result = sessionInterests.checkInterestVersions(
                dataChangeRequest.getDataCenter(), dataChangeRequest.getDataInfoId(),
                dataChangeRequest.getVersion());
            fireChangFetch(dataChangeRequest);
        } 

        return null;
    }

    private void fireChangFetch(DataChangeRequest dataChangeRequest) { dataChangeRequestHandlerStrategy.doFireChangFetch(dataChangeRequest); }}Copy the code

So our architecture diagram changes to:

+----------------------------------------------------------------+ | Session Server | | | | +-----------------------------------------------------------+ | | | SessionCacheService | | | | | | | | +-------------------------------------------------------+ | | | | | | | | | | | LoadingCache<Key, Value> <----------+ | | | | | | + | | | | | | | | expireAfterWrite | invalidate | | | | | | | | | | | | | | v | | | | | | | DatumCacheGenerator | | | | | | | + | | | | | | +-------------------------------------------------------+ | | | +-----------------------------------------------------------+ | | | | | | v | | | +--------+------------+ +---------+----------------+ | | | DataNodeServiceImpl | | DataChangeRequestHandler | | | +--------+------------+ + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | ^ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | GetDataRequest | | DataChangeRequest | | +--------------------------------------------------------------------+ | | | Pull | Push v | +-+----------------------------+-+ | Data Server | +--------------------------------+Copy the code

Here’s what’s on the phone:

Let’s take a look inside SessionServer and see how the SessionServer does a subsequent push, notifying the Client, when a tweet is received. We mentioned earlier: Client and SessionServer, completely push based mechanism.

4.3 Extended treatment Strategy

DefaultDataChangeRequestHandlerStrategy

The previous code comes to the part that handles the dataChangeRequest.

dataChangeRequestHandlerStrategy.doFireChangFetch(dataChangeRequest);
Copy the code

The rest is strategy-listener – Task (explained in a follow-up article).

public class DefaultDataChangeRequestHandlerStrategy implements DataChangeRequestHandlerStrategy {
    @Autowired
    private TaskListenerManager taskListenerManager;

    @Override
    public void doFireChangFetch(DataChangeRequest dataChangeRequest) {
        TaskEvent taskEvent = newTaskEvent(dataChangeRequest.getDataInfoId(), TaskEvent.TaskType.DATA_CHANGE_FETCH_CLOUD_TASK); taskListenerManager.sendTaskEvent(taskEvent); }}Copy the code

4.4 Extend processing the Listener

DataChangeFetchCloudTaskListener

Support DATA_CHANGE_FETCH_CLOUD_TASK DataChangeFetchCloudTaskListener configuration in the support function.

@Override
public TaskType support(a) {
    return TaskType.DATA_CHANGE_FETCH_CLOUD_TASK;
}
Copy the code

The specific code is as follows:

public class DataChangeFetchCloudTaskListener implements TaskListener {

    @Autowired
    private Interests                                    sessionInterests;

    @Autowired
    private SessionServerConfig                          sessionServerConfig;

    /** * trigger task com.alipay.sofa.registry.server.meta.listener process */
    @Autowired
    private TaskListenerManager                          taskListenerManager;

    @Autowired
    private ExecutorManager                              executorManager;

    @Autowired
    private CacheService                                 sessionCacheService;

    private volatile TaskDispatcher<String, SessionTask> singleTaskDispatcher;

    private TaskProcessor                                dataNodeSingleTaskProcessor;

    public DataChangeFetchCloudTaskListener(TaskProcessor dataNodeSingleTaskProcessor) {
        this.dataNodeSingleTaskProcessor = dataNodeSingleTaskProcessor;
    }

    public TaskDispatcher<String, SessionTask> getSingleTaskDispatcher(a) {
        if (singleTaskDispatcher == null) {
            synchronized (this) {
                if (singleTaskDispatcher == null) {
                    singleTaskDispatcher = TaskDispatchers.createSingleTaskDispatcher(
                        TaskDispatchers.getDispatcherName(TaskType.DATA_CHANGE_FETCH_CLOUD_TASK
                            .getName()), sessionServerConfig.getDataChangeFetchTaskMaxBufferSize(),
                        sessionServerConfig.getDataChangeFetchTaskWorkerSize(), 1000.100, dataNodeSingleTaskProcessor); }}}return singleTaskDispatcher;
    }

    @Override
    public TaskType support(a) {
        return TaskType.DATA_CHANGE_FETCH_CLOUD_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {
        SessionTask dataChangeFetchTask = newDataChangeFetchCloudTask(sessionServerConfig, taskListenerManager, sessionInterests, executorManager, sessionCacheService); dataChangeFetchTask.setTaskEvent(event); getSingleTaskDispatcher().dispatch(dataChangeFetchTask.getTaskId(), dataChangeFetchTask, dataChangeFetchTask.getExpiryTime()); }}Copy the code

4.5 Extend Task processing

DataChangeFetchCloudTask

The DataChangeFetchCloudTask does a subsequent push, which notifies the Client.

public class DataChangeFetchCloudTask extends AbstractSessionTask {
    private final SessionServerConfig sessionServerConfig;

    private Interests                 sessionInterests;

    /** * trigger task com.alipay.sofa.registry.server.meta.listener process */
    private final TaskListenerManager taskListenerManager;

    private final ExecutorManager     executorManager;

    private String                    fetchDataInfoId;

    private final CacheService        sessionCacheService;
}
Copy the code

It gets the IP of each Subscriber and sends several messages to the taskListenerManager, such as:

  • RECEIVED_DATA_MULTI_PUSH_TASK;
  • USER_DATA_ELEMENT_PUSH_TASK;
  • USER_DATA_ELEMENT_MULTI_PUSH_TASK;

So we can push the client.

@Override
public void execute(a) {
    Map<String/*dataCenter*/, Datum> datumMap = getDatumsCache();

    if(datumMap ! =null && !datumMap.isEmpty()) {

        PushTaskClosure pushTaskClosure = getTaskClosure(datumMap);

        for (ScopeEnum scopeEnum : ScopeEnum.values()) {
            Map<InetSocketAddress, Map<String, Subscriber>> map = getCache(fetchDataInfoId,
                scopeEnum);
            if(map ! =null && !map.isEmpty()) {
                for (Entry<InetSocketAddress, Map<String, Subscriber>> entry : map.entrySet()) {
                    Map<String, Subscriber> subscriberMap = entry.getValue();
                    if(subscriberMap ! =null && !subscriberMap.isEmpty()) {
                        List<String> subscriberRegisterIdList = new ArrayList<>(
                            subscriberMap.keySet());

                        //select one row decide common info
                        Subscriber subscriber = subscriberMap.values().iterator().next();

                        //remove stopPush subscriber avoid push duplicateevictReSubscribers(subscriberMap.values()); fireReceivedDataMultiPushTask(datumMap, subscriberRegisterIdList, scopeEnum, subscriber, subscriberMap, pushTaskClosure); } } } } pushTaskClosure.start(); }}Copy the code

Taking RECEIVED_DATA_MULTI_PUSH_TASK as an example, our architecture flow chart changes as follows:

+-------------------------------------------------------------------------------------------------------------------+ | Session Server +-------------------------------------+ | | | ReceivedDataMultiPushTaskListener | | | +-----------------------------------------------------------+ +------+------------------------------+ | | | SessionCacheService | ^ | | | | | RECEIVED_DATA_MULTI_PUSH_TASK | | | +-------------------------------------------------------+ | | | | | | | | +---+------------------------+ | | | | LoadingCache<Key, Value> <----------+ | | | DataChangeFetchCloudTask | | | | | + | | | +---+------------------------+ | | | | | expireAfterWrite | invalidate | | ^ | | | | | | | | | | | | | v | | | | | | | | DatumCacheGenerator | | | +-----+----------------------------+ | | | | + | | | | DataChangeFetchCloudTaskListener | | | | +-------------------------------------------------------+ | +-----+----------------------------+ | | +-----------------------------------------------------------+ ^ | | | | | DATA_CHANGE_FETCH_CLOUD_TASK | | v | | | | +--------+------------+ +---------+----------------+ +--------+--------------------------------+ | | | DataNodeServiceImpl | | DataChangeRequestHandler +-----> | DefaultDataChangeRequestHandlerStrategy | | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | ^ | +-------------------------------------------------------------------------------------------------------------------+ | ^ GetDataRequest | | DataChangeRequest | | +-------------------------------------------------------------------------------------------------------------------+ | ^ | Pull | Push v | +-+----------------------------+-+ | Data Server | +--------------------------------+Copy the code

Here’s what’s on the phone:

0 x05 summary

This paper explains the classic push-pull model adopted by Ant Financial to maintain Data consistency, taking Data consistency maintenance between Session Server and Data Server as an example. The general logic is as follows:

SOFARegistry uses a LoadingCache data structure to cache data synchronized from DataServer in SessionServer.

  • The pull model:
    • Each entry in the cache has an expiration time. You can set the expiration time (30s by default) when you pull data from the cache.
    • This expiration allows the cache to periodically query DataServer for datainfoids of all subs of the current session, and compare them with the latest push version of the session record (seecom.alipay.sofa.registry.server.session.store.SessionInterests#interestVersions) Is smaller than DataServer, indicating that it needs to be pushed.
    • Then the SessionServer automatically obtains the dataInfoId data from the DataServer (cached in the cache) and pushes it to the client.
    • This “pull” logic, is mainly a supplement to “push”, if there is a mistake in the process of “push” can be made up in time at this time.
  • The push model:
    • When data is updated in the DataServer, it sends a request to the SessionServer to invalidate the cache entry.
    • When the SessionServer checks that it needs to update (compare version), it actively obtains data from DataServer.
    • SessionServer updates invalid entries.

We can use it for reference in daily development.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Guava LoadingCache details and utility classes

Google Guava Cache fully parsed

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing