0 x00 the

SOFARegistry is ant Financial’s open source, production-grade, time-sensitive, and highly available service registry.

This series of articles focuses on analyzing design and architecture, that is, using multiple articles to summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designs.

This is the fifteenth article, which introduces renewal and elimination.

0x01 Business Category

Renewal and culling are important features for service registration and discovery, such as:

1.1 Failure elimination

Sometimes, our service instance does not necessarily go offline properly. The service may not work properly due to memory overflow, network failure, etc., and the service registry does not receive the “service offline” request.

To remove these instances that cannot provide services from the service list. When the Server is started, it creates a scheduled task. By default, services in the current list that do not renew due to timeout (90s by default) are deleted from the list every 60 seconds.

1.2 Service Renewal

After registering the service, the service provider maintains a heartbeat that continuously tells the Server, “I am alive.” To prevent Server’s Cull task from removing the service instance from the list of services. We call this operation Renew the service.

0x02 DatumLeaseManager

On the Data Server side, DatumLeaseManager implements “failure elimination” and “service renewal” functions.

2.1 define

The main variables of DatumLeaseManager are as follows:

  • ConnectIdRenewTimestampMap will maintain inside each service sends a heartbeat time recently, had it have similar data structure;

  • LocksForConnectId: To operate only one thread at a time; Lock for connectId: every connectId allows only one task to be created;

Specific definitions are as follows:

public class DatumLeaseManager implements AfterWorkingProcess {
    /** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */
    private final Map<String, Long>            connectIdRenewTimestampMap = new ConcurrentHashMap<>();

    /** lock for connectId , format: connectId -> true */
    private ConcurrentHashMap<String, Boolean> locksForConnectId          = new ConcurrentHashMap();

    private volatile boolean                   serverWorking              = false;

    private volatile boolean                   renewEnable                = true;

    private AsyncHashedWheelTimer              datumAsyncHashedWheelTimer;

    @Autowired
    private DataServerConfig                   dataServerConfig;

    @Autowired
    private DisconnectEventHandler             disconnectEventHandler;

    @Autowired
    private DatumCache                         datumCache;

    @Autowired
    private DataNodeStatus                     dataNodeStatus;

    private ScheduledThreadPoolExecutor        executorForHeartbeatLess;

    privateScheduledFuture<? > futureForHeartbeatLess; }Copy the code

2.2 the contract

2.2.1 Data Structure

In the DatumLeaseManager, there are mainly the following data structures for renewal.

private ConcurrentHashMap<String, Boolean> locksForConnectId  = new ConcurrentHashMap();

private AsyncHashedWheelTimer              datumAsyncHashedWheelTimer;
Copy the code

2.2.2 call

Review is called in the following modules, all of which are AbstractServerHandler.

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest>

public class DatumSnapshotHandler extends AbstractServerHandler<DatumSnapshotRequest> 

public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements AfterWorkingProcess	

public class UnPublishDataHandler extends AbstractServerHandler<UnPublishDataRequest>
Copy the code

Then the contract

The DatumLeaseManager here records the latest timestamp and then starts scheduleEvictTask.

public void renew(String connectId) {
    // record the renew timestamp
    connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis());
    // try to trigger evict task
    scheduleEvictTask(connectId, 0);
}
Copy the code

Details are as follows:

  • Returns if the current ConnectionId is locked;
  • Start time wheel, add a timing operation, if the time is up, then:
    • Release the lock for the current ConnectionId.
    • Gets the last renewal time corresponding to the current ConnectionId. If it does not exist, the current ConnectionId has been removed.
    • If the current state is not renewable, set the next scheduled operation time. Because If in a non-working state, cannot clean up because the renew request cannot be received at this time;
    • If the last renewal has expired, evict is used
    • If not, scheduleEvictTask(connectId, nextDelaySec) is called; Set the next operation

The specific code is as follows:

/** * trigger evict task: if connectId expired, create ClientDisconnectEvent to cleanup datums bind to the connectId * PS: every connectId allows only one task to be created */
private void scheduleEvictTask(String connectId, long delaySec) {
    delaySec = (delaySec <= 0)? dataServerConfig.getDatumTimeToLiveSec() : delaySec;// lock for connectId: every connectId allows only one task to be created
    Boolean ifAbsent = locksForConnectId.putIfAbsent(connectId, true);
    if(ifAbsent ! =null) {
        return;
    }

    datumAsyncHashedWheelTimer.newTimeout(_timeout -> {
        boolean continued = true;
        long nextDelaySec = 0;
        try {
            // release lock
            locksForConnectId.remove(connectId);

            // get lastRenewTime of this connectId
            Long lastRenewTime = connectIdRenewTimestampMap.get(connectId);
            if (lastRenewTime == null) {
                // connectId is already clientOff
                return;
            }

            /* * 1. lastRenewTime expires, then: * - build ClientOffEvent and hand it to DataChangeEventCenter. * - It will not be scheduled next time, so terminated. * 2. lastRenewTime not expires, then: * - trigger the next schedule */
            boolean isExpired =
                    System.currentTimeMillis() - lastRenewTime > dataServerConfig.getDatumTimeToLiveSec() * 1000L;
            if(! isRenewEnable()) { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec(); }else if (isExpired) {
                int ownPubSize = getOwnPubSize(connectId);
                if (ownPubSize > 0) {
                    evict(connectId);
                }
                connectIdRenewTimestampMap.remove(connectId, lastRenewTime);
                continued = false;
            } else {
                nextDelaySec = dataServerConfig.getDatumTimeToLiveSec()
                               - (System.currentTimeMillis() - lastRenewTime) / 1000L;
                nextDelaySec = nextDelaySec <= 0 ? 1: nextDelaySec; }}if (continued) {
            scheduleEvictTask(connectId, nextDelaySec);
        }
    }, delaySec, TimeUnit.SECONDS);
}
Copy the code

2.2.4 graphic

The details are shown in the following figure

+------------------+ +-------------------------------------------+ |PublishDataHandler| | DatumLeaseManager | +--------+---------+ | | | | newTimeout | | | +----------------------> | doHandle | ^ + | | | | | | | renew | +-----------+--------------+ | | | +--------------> | | AsyncHashedWheelTimer | | | | | +-----+-----+--------------+ | |  | | | ^ | | | | | | scheduleEvictTask | | | | evict | + v | | | | <----------------------+ | | +-------------------------------------------+ | | | | | | | | v vCopy the code

Or as shown below:

+------------------+  +-------------------+   +------------------------+
|PublishDataHandler|  | DatumLeaseManager |   |  AsyncHashedWheelTimer |
+--------+---------+  +--------+----------+   +-----------+------------+
         |                     |           new            |
      doHandle                 +------------------------> |
         |      renew          |                          |
         +-------------------> |                          |
         |                     |                          |
         |                     |                          |
         |             scheduleEvictTask                  |
         |                     |                          |
         |                     |        newTimeout        |
         |        +----------> +------------------------> |
         |        |            |                          |
         |        |            |                          |
         |        |            |                          |
         |        |            |           No             +
         |        |            | <---------------+ if (ownPubSize > 0)
         |        |            |                          +
         |        |            v                          |
         |        +--+ scheduleEvictTask                  | Yes
         |                     +                          v
         |                     |                        evict
         |                     |                          |
         v                     v                          v
Copy the code

2.3 drive

2.3.1 Data Structure

In the DatumLeaseManager, there are mainly the following data structures for renewal.

private ScheduledThreadPoolExecutor        executorForHeartbeatLess;

privateScheduledFuture<? > futureForHeartbeatLess;Copy the code

There are two invocation paths, so that when the data changes, it will see if it can be expelled:

  • Called at startup;
  • Explicit call;

2.3.2 Explicit Call

LocalDataServerChangeEventHandler class, called the datumLeaseManager. Reset (), then call the evict.

@Override
public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
    isChanged.set(true);

    // Better change to Listener pattern
    localDataServerCleanHandler.reset();
    datumLeaseManager.reset();

    events.offer(localDataServerChangeEvent);
}
Copy the code

DatumLeaseManager reset call the scheduleEvictTaskForHeartbeatLess launched the expulsion of the thread.

public synchronized void reset(a) {
    if(futureForHeartbeatLess ! =null) {
        futureForHeartbeatLess.cancel(false);
    }
    scheduleEvictTaskForHeartbeatLess();
}
Copy the code

2.3.3 Starting the Invocation

When started, the expulsion thread is started.

@PostConstruct
public void init(a) {... executorForHeartbeatLess =new ScheduledThreadPoolExecutor(1, threadFactoryBuilder
        .setNameFormat("Registry-DatumLeaseManager-ExecutorForHeartbeatLess").build());
    scheduleEvictTaskForHeartbeatLess();
}
Copy the code

2.3.4 deportation

Concrete is expelled by starting a timer thread EvictTaskForHeartbeatLess to complete.

private void scheduleEvictTaskForHeartbeatLess(a) {
    futureForHeartbeatLess = executorForHeartbeatLess.scheduleWithFixedDelay(
        new EvictTaskForHeartbeatLess(), dataServerConfig.getDatumTimeToLiveSec(),
        dataServerConfig.getDatumTimeToLiveSec(), TimeUnit.SECONDS);
}
Copy the code

When the time end arrives, all current Connectionids are retrieved from datumCache, and the Connectionids are iterated to see if the last timestamp expired, and expelled if it did.

/** * evict own connectIds with heartbeat less */
private class EvictTaskForHeartbeatLess implements Runnable {

    @Override
    public void run(a) {
        // If in a non-working state, cannot clean up because the renew request cannot be received at this time.
        if(! isRenewEnable()) {return;
        }

        Set<String> allConnectIds = datumCache.getAllConnectIds();
        for (String connectId : allConnectIds) {
            Long timestamp = connectIdRenewTimestampMap.get(connectId);
            // no heartbeat
            if (timestamp == null) {
                int ownPubSize = getOwnPubSize(connectId);
                if (ownPubSize > 0) {
                    evict(connectId);
                }
            }
        }
    }
}
Copy the code

This call

private void evict(String connectId) {
    disconnectEventHandler.receive(new ClientDisconnectEvent(connectId, System
        .currentTimeMillis(), 0));
}
Copy the code

The details are as follows:

+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | |  | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds)    | |
| |                    v                         | |
| |                                              | |
| |         connectIdRenewTimestampMap           | |
| |                                              | |
| |                    |                         | |
| |                    | no heartbeat            | |
| |                    v                         | |
| |                                              | |
| |                  evict                       | |
| |                                              | |
| +----------------------------------------------+ |
+--------------------------------------------------+
Copy the code

2.3.5 Eviction Processing Services

2.3.5.1 Forwarding the expulsion Message

The expulsion message needs to be forwarded, which corresponds to disConnecteventhandler. receive, event_queue.add (event);

public class DisconnectEventHandler implements InitializingBean.AfterWorkingProcess {

    /** * a DelayQueue that contains client disconnect events */
    private final DelayQueue<DisconnectEvent>           EVENT_QUEUE        = new DelayQueue<>();

    @Autowired
    private SessionServerConnectionFactory              sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter                       dataChangeEventCenter;

    @Autowired
    private DataNodeStatus                              dataNodeStatus;

    private static final BlockingQueue<DisconnectEvent> noWorkQueue        = new LinkedBlockingQueue<>();

    public void receive(DisconnectEvent event) {
        if(dataNodeStatus.getStatus() ! = LocalServerStatusEnum.WORKING) { noWorkQueue.add(event);return; } EVENT_QUEUE.add(event); }}Copy the code

AfterPropertiesSet starts a Thread that loops to retrieve messages from the EVENT_QUEUE and processes them:

  • Remove from sessionServerConnectionFactory Connection;
  • Send a ClientChangeEvent notification to dataChangeEventCenter

Details are as follows:

@Override
public void afterPropertiesSet(a) {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName());
    executor.execute(() -> {
        while (true) {
            try {
                DisconnectEvent disconnectEvent = EVENT_QUEUE.take();
                if (disconnectEvent.getType() == DisconnectTypeEnum.SESSION_SERVER) {
                    SessionServerDisconnectEvent event     = (SessionServerDisconnectEvent) disconnectEvent;
                    String                       processId = event.getProcessId();
                    //check processId confirm remove,and not be registered again when delay time
                    String sessionServerHost = event.getSessionServerHost();
                    if (sessionServerConnectionFactory
                            .removeProcessIfMatch(processId,sessionServerHost)) {
                        Set<String> connectIds = sessionServerConnectionFactory
                                .removeConnectIds(processId);

                        if(connectIds ! =null && !connectIds.isEmpty()) {
                            for(String connectId : connectIds) { unPub(connectId, event.getRegisterTimestamp()); }}}}else{ ClientDisconnectEvent event = (ClientDisconnectEvent) disconnectEvent; unPub(event.getConnectId(), event.getRegisterTimestamp()); }}}}); }/ * * * *@param connectId
 * @param registerTimestamp
 */
private void unPub(String connectId, long registerTimestamp) {
    dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, dataServerConfig
        .getLocalDataCenter(), registerTimestamp));
}
Copy the code

As shown in the figure below

+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | |  | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds)    | |         +------------------------+
| |                    v                         | |         |                        |
| |                                              | |         | DisconnectEventHandler |
| |         connectIdRenewTimestampMap           | |         |                        |
| |                                              | |         |    +-------------+     |
| |                    |                         | |         |    | noWorkQueue |     |
| |                    | no heartbeat            | |         |    +-------------+     |
| |                    v                         | | receive |                        |
| |                                              | |         |   +--------------+     |
| |                  evict   +---------------------------------> |  EVENT_QUEUE |     |
| |                                              | |         |   +--------------+     |
| +----------------------------------------------+ |         +------------------------+
+--------------------------------------------------+
Copy the code
2.3.5.1 DataChangeEventCenter forward

The logic then goes to the DataChangeEventCenter, which also serves as a forwarding function.

public class DataChangeEventCenter {

    /** * queues of DataChangeEvent */
    private DataChangeEventQueue[] dataChangeEventQueues;

    /**
     * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue
     *
     * @param publisher
     * @param dataCenter
     */
    public void onChange(Publisher publisher, String dataCenter) {
        int idx = hash(publisher.getDataInfoId());
        Datum datum = new Datum(publisher, dataCenter);
        if (publisher instanceof UnPublisher) {
            datum.setContainsUnPub(true);
        }
        if(publisher.getPublishType() ! = PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));
        } else {
            dataChangeEventQueues[idx].onChange(newDataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); }}}Copy the code
2.3.5.2 DataChangeEventQueue processing

The DataChangeEventQueue does this by calling addTempChangeData and handleDatum to process the data that needs to be expelled.

When the event was removed, according to DataChangeScopeEnum. The DATUM is different, do different processing.

  • If is DataChangeScopeEnum DATUM, the judge dataChangeEvent. GetSourceType ();
    • If DataSourceTypeEnum.PUB_TEMP, addTempChangeData adds ChangeData to CHANGE_QUEUE.
    • If not, handleDatum;
  • If is DataChangeScopeEnum. The CLIENT is handleClientOff ((ClientChangeEvent) event);
  • If it is DataChangeScopeEnum. The SNAPSHOT, handleSnapshot ((DatumSnapshotEvent) event);

Ant Financial SOFARegistry’s Message Bus asynchronous processing

+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | |  | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | |for (allConnectIds)    | |         +------------------------+
| |                    v                         | |         |                        |
| |                                              | |         | DisconnectEventHandler |
| |         connectIdRenewTimestampMap           | |         |                        |
| |                                              | |         |    +-------------+     |
| |                    |                         | |         |    | noWorkQueue |     |
| |                    | no heartbeat            | |         |    +-------------+     |
| |                    v                         | | receive |                        |
| |                                              | |         |   +--------------+     |
| |                  evict   +---------------------------------> |  EVENT_QUEUE |     |
| |                                              | |         |   +--------------+     |
| +----------------------------------------------+ |         +------------------------+
+--------------------------------------------------+               |
                                                                   |
+----------------------+                                           |   onChange
| DataChangeEventQueue |                                           v
|                      |                                  +--------+------------------+
|                      |                                  |   DataChangeEventCenter   |
|     +------------+   |                                  |                           |
|     | eventQueue |   |      add  DataChangeEvent        |                           |
|     +------------+   |                                  | +-----------------------+ |
|                      |  <-----------------------------+ | | dataChangeEventQueues | |
|  addTempChangeData   |                                  | +-----------------------+ |
|                      |                                  +---------------------------+
|  handleDatum         |
|                      |
+----------------------+
Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

How does ant Financial Service registry realize the smooth scaling of DataServer

Ant gold uniform service registry SOFARegistry parsing | service discovery path optimization

The service registry Session storage policy | SOFARegistry parsing

Introduction to the Registry – SOFARegistry architecture for Massive Data

Service registry data fragmentation and synchronization scheme, rounding | SOFARegistry parsing

Ant Financial open source communication framework SOFABolt analysis of connection management analysis

Timeout control mechanism and heartbeat mechanism resolved by SOFABolt, ant Financial’s open source communication framework

SOFABolt protocol framework analysis of Ant Financial open source communication framework

Ant gold uniform service registry data consistency analysis | SOFARegistry parsing

Ant communication framework practice

Sofa – Bolt remote call

Sofa – bolt to study

SOFABolt Design Summary – Elegant and simple design approach

SofaBolt source code analysis – Service startup to message processing

SOFABolt source code analysis

SOFABolt source code analysis 9-userProcessor custom processor design

SOFARegistry introduction

SOFABolt source code analysis of the design of the 13-Connection event processing mechanism