preface

This chapter discusses the Nacos configuration hub, started in a cluster (-dnacos.standalone =false), using an embedded data source Derby (-dembeddedStorage =true), and data consistency through the JRaft framework since each node has a separate data source. These include write consistency and read consistency (linear consistency).

One, write consistent

1. Differences with MySQL

Reviewing the POST /v1/cs/ configS publish configuration, this interface does several things based on the MySQL data source:

  • Update server configuration (database)
  • Update local configurations on all servers in the cluster
  • Respond to client long polling
// ConfigController
@PostMapping
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,...) throws NacosException {

    // ...
    ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
    configInfo.setType(type);
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            // Update the database configuration
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            // Publish the ConfigDataChangeEvent event
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            // ...}}else {
        // ...
    }
    return true;
}
Copy the code

Clustering based on Derby embedded data sources, publishing configuration pages does a similar thing, but in a different way.

  • Derby stores one copy of data on each node, and writes ensure that the data is consistent on each node.
  • ConfigChangePublisher notifyConfigChange method, for the cluster + Derby to start the way not to do any processing, need in other ways to get the data synchronization to the file system in the Derby and memory, and respond to long polling.
public class ConfigChangePublisher {
    public static void notifyConfigChange(ConfigDataChangeEvent event) {
        // Cluster startup (-dnacos.standalone =false), embedded data source (-dembeddedStorage =true), not processed
        if(PropertyUtil.isEmbeddedStorage() && ! EnvUtil.getStandaloneMode()) {return; } NotifyCenter.publishEvent(event); }}Copy the code

2. Submit tasks

Consistent writing using the SOFA – Jraft framework constructs a Task to submit to the Leader’s NodeImpl (Node.apply(Task)), here’s how Nacos does it.

To update the configuration as an example, the orientation to the Derby PersistService implementation class EmbeddedStoragePersistServiceImpl, updateConfigInfo method is responsible for updating the configuration.

// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser,
        final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
    try {
        // 1. SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info
        // WHERE data_id=? AND group_id=? AND tenant_id=?
        ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
                configInfo.getTenant());
        // ...

        // 2. UPDATE config_info SET content=? , md5 = ? , src_ip=? ,src_user=? ,gmt_modified=? ,app_name=? , c_desc=? ,c_use=? ,effect=? ,type=? ,c_schema=?
        // WHERE data_id=? AND group_id=? AND tenant_id=?
        updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);

        / /... Ignore tag correlation

        // 3. INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5,src_ip,src_user,gmt_modified,op_type) VALUES(? ,? ,? ,? ,? ,? ,? ,? ,? ,? ,?)
        insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");

        // 4. Place ConfigDumpEvent into ThreadLocal as extendInfo
        EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time);
        databaseOperate.blockUpdate();
    } finally{ EmbeddedStorageContextUtils.cleanAllContext(); }}Copy the code

Literally follow a * Atomic method, simply by EmbeddedStorageContextUtils. AddSqlContext (SQL, args) the SQL and parameters are stored. EmbeddedStorageContextUtils SQL and args encapsulation for a ModifyRequest into the ThreadLocal, no actual executing SQL.

// **EmbeddedStoragePersistServiceImpl**
@Override
public void updateConfigInfoAtomic(final ConfigInfo configInfo, final String srcIp, final String srcUser,
        final Timestamp time, Map<String, Object> configAdvanceInfo) {
    / /... Ignore parameter handling
    final String sql = "UPDATE config_info SET content=? , md5 = ? , src_ip=? ,src_user=? ,gmt_modified=? ,app_name=? ,"
            + "c_desc=? ,c_use=? ,effect=? ,type=? ,c_schema=? WHERE data_id=? AND group_id=? AND tenant_id=?";

    final Object[] args = new Object[] {configInfo.getContent(), md5Tmp, srcIp, srcUser, time, appNameTmp, desc,
            use, effect, type, schema, configInfo.getDataId(), configInfo.getGroup(), tenantTmp};
    EmbeddedStorageContextUtils.addSqlContext(sql, args);
}

// EmbeddedStorageContextUtils
private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT = ThreadLocal.withInitial(ArrayList::new);
  
public static void addSqlContext(String sql, Object... args) {
    ArrayList<ModifyRequest> requests = SQL_CONTEXT.get();
    ModifyRequest context = new ModifyRequest();
    context.setExecuteNo(requests.size());
    context.setSql(sql);
    context.setArgs(args);
    requests.add(context);
    SQL_CONTEXT.set(requests);
}
Copy the code

When all of the above SQL stored to the ThreadLocal EmbeddedStorageContextUtils. OnModifyConfigInfo (configInfo srcIp, time) the configuration information, The package is called ConfigDumpEvent and is placed in another ThreadLocal as an extension.

// EmbeddedStorageContextUtils

private static final ThreadLocal<Map<String, String>> EXTEND_INFO_CONTEXT = ThreadLocal.withInitial(HashMap::new);

public static void onModifyConfigInfo(ConfigInfo configInfo, String srcIp, Timestamp time) {
    if(! EnvUtil.getStandaloneMode()) { ConfigDumpEvent event = ConfigDumpEvent.builder().remove(false).namespaceId(configInfo.getTenant())
                .dataId(configInfo.getDataId()).group(configInfo.getGroup()).isBeta(false)
                .content(configInfo.getContent()).type(configInfo.getType()).handleIp(srcIp)
                .lastModifiedTs(time.getTime()).build();

        Map<String, String> extendInfo = new HashMap<>(2); extendInfo.put(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT, JacksonUtils.toJson(event)); EmbeddedStorageContextUtils.putAllExtendInfo(extendInfo); }}public static void putAllExtendInfo(Map<String, String> map) {
    Map<String, String> old = EXTEND_INFO_CONTEXT.get();
    old.putAll(map);
    EXTEND_INFO_CONTEXT.set(old);
}
Copy the code

At this point, all the necessary data prepared by DatabaseOperate. BlockUpdate perform an update. The implementation class is DistributedDatabaseOperateImpl DatabaseOperate here. Package both ModifyRequest (SQL and ARgs) and ConfigDumpEvent in ThreadLocal into WriteRequest.

// DistributedDatabaseOperateImpl
@Override
public Boolean update(List<ModifyRequest> sqlContext, BiConsumer<Boolean, Throwable> consumer) {
    try {
        // {timestamp}-{group}-{ip:port}-{signature}
        final String key =
                System.currentTimeMillis() + "-" + group() + "-" + memberManager.getSelf().getAddress() + "-"
                        + MD5Utils.md5Hex(sqlContext.toString(), Constants.ENCODE);
        WriteRequest request = WriteRequest.newBuilder().setGroup(group()).setKey(key)
                // List<ModifyRequest>
                .setData(ByteString.copyFrom(serializer.serialize(sqlContext)))
                // There may be ConfigDumpEvent
                .putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
                .setType(sqlContext.getClass().getCanonicalName()).build();
        // Consumer is null for synchronization request
        if (Objects.isNull(consumer)) {
            // JRaftProtocol.write
            Response response = this.protocol.write(request);
            if (response.getSuccess()) {
                return true;
            }
            return false;
        } else {
           / /... Asynchronous request ignore
        }
        return true;
    } catch (TimeoutException e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e.toString());
    } catch (Throwable e) {
        throw newNacosRuntimeException(NacosException.SERVER_ERROR, e.toString()); }}Copy the code

The implementation class for Protocal. write is JRaftProtocol. Generally, synchronous requests are adapted to asynchronous requests, and the timeout period is 10 seconds.

// JRaftProtocol
@Override
public Response write(WriteRequest request) throws Exception {
    CompletableFuture<Response> future = writeAsync(request);
    return future.get(10_000L, TimeUnit.MILLISECONDS);
}

@Override
public CompletableFuture<Response> writeAsync(WriteRequest request) {
    return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}
Copy the code

JRaftServer’s commit method, which distinguishes whether the current node is the leader or follower.

// JRaftServer
public CompletableFuture<Response> commit(final String group, final Message data,
        final CompletableFuture<Response> future) {
    final RaftGroupTuple tuple = findTupleByGroup(group);
    FailoverClosureImpl closure = new FailoverClosureImpl(future);
    // If the current node is the leader, the task will be submitted to the leader first
    final Node node = tuple.node;
    if (node.isLeader()) {
        // If the current node is the leader, execute it directly
        applyOperation(node, data, closure);
    } else {
        // If it is follower, redirect to the leader
        invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
    }
    return future;
}
Copy the code

In the case of the follower node, the node is redirected to the leader node, which performs the write operation.

// JRaftServer
private void invokeToLeader(final String group, final Message request, final int timeoutMillis,
        FailoverClosure closure) {
    try {
        final Endpoint leaderIp = Optional.ofNullable(getLeader(group))
                .orElseThrow(() -> new NoLeaderException(group)).getEndpoint();
        cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
            @Override
            public void complete(Object o, Throwable ex) {
                if (Objects.nonNull(ex)) {
                    closure.setThrowable(ex);
                    closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
                    return;
                }
                closure.setResponse((Response) o);
                closure.run(Status.OK());
            }

            @Override
            public Executor executor(a) {
                return RaftExecutor.getRaftCliServiceExecutor();
            }
        }, timeoutMillis);
    } catch (Exception e) {
        closure.setThrowable(e);
        closure.run(newStatus(RaftError.UNKNOWN, e.toString())); }}Copy the code

If the current leader node is the leader node, the write operation can be performed directly without going through the RPC call. As you can see, sofa- Jraft’s Node.apply(Task) method is used to submit this write request.

// JRaftServer
public void applyOperation(Node node, Message data, FailoverClosure closure) {
    final Task task = new Task();
    task.setDone(new NacosClosure(data, status -> {
        NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus) status;
        closure.setThrowable(nacosStatus.getThrowable());
        closure.setResponse(nacosStatus.getResponse());
        closure.run(nacosStatus);
    }));
    task.setData(ByteBuffer.wrap(data.toByteArray()));
    node.apply(task);
}
Copy the code

3. Apply Log

Once a Task is submitted to the SOFA-JRAFT framework, the framework processes all the processes (log replication, over half commit) and finally invoks the onApply method of the user-implemented state machine, at which point the received log has been submitted by half of the nodes and can be applied to the local state machines of all nodes.

NacosStateMachine is an implementation class of Nacos for SOFA – Jraft StateMachine, targeted to the onApply method.

protected final RequestProcessor processor;
// Commit log is triggered to apply logs to the current node
@Override
public void onApply(Iterator iter) {
    int index = 0;
    int applied = 0;
    Message message;
    NacosClosure closure = null;
    try {
        while (iter.hasNext()) {
            Status status = Status.OK();
            try {
                // If the leader node is used, done is not empty to reduce the cost of deserializing packets
                if(iter.done() ! =null) {
                    closure = (NacosClosure) iter.done();
                    message = closure.getMessage();
                } else {
                    final ByteBuffer data = iter.getData();
                    message = ProtoMessageUtil.parse(data.array());
                }
                / / write requests
                if (message instanceof WriteRequest) {
                    Response response = processor.onApply((WriteRequest) message);
                    postProcessor(response, closure);
                }
                // Consistent read degradation moves raft process
                if (message instanceofReadRequest) { Response response = processor.onRequest((ReadRequest) message); postProcessor(response, closure); }}catch (Throwable e) {
                index++;
                status.setError(RaftError.UNKNOWN, e.toString());
                Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                throw e;
            } finally{ Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status)); } applied++; index++; iter.next(); }}catch (Throwable t) {
        iter.setErrorAndRollback(index - applied,
                new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", ExceptionUtil.getStackTrace(t))); }}Copy the code

WriteRequest eventually be submitted to a RequestProcessor processing, the implementation class is still DistributedDatabaseOperateImpl. All the SQL is executed in a Derby database, and ConfigDumpEvent is retrieved from the extension information for WriteRequest and published.

public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implements BaseDatabaseOperate {
  // Raft cluster master node has been committed, apply log, commit data to DumpEvent
    @Override
    public Response onApply(WriteRequest log) {
        final ByteString byteString = log.getData();
        List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
        final Lock lock = readLock;
        lock.lock();
        try {
            boolean isOk = false;
            if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
                isOk = doDataImport(jdbcTemplate, sqlContext);
            } else {
                sqlContext.sort(Comparator.comparingInt(ModifyRequest::getExecuteNo));
                // 1
                isOk = update(transactionTemplate, jdbcTemplate, sqlContext);
                // 2. DumpEvent
                ConfigExecutor.executeEmbeddedDump(() -> handleExtendInfo(log.getExtendInfoMap()));
            }

            return Response.newBuilder().setSuccess(isOk).build();

        } catch (BadSqlGrammarException | DataIntegrityViolationException e) {
            return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
        } catch (DataAccessException e) {
            throw new ConsistencyException(e.toString());
        } catch (Throwable t) {
            throw t;
        } finally{ lock.unlock(); }}// From the extended information, deserialize ConfigDumpEvent to publish the event
    private void handleExtendInfo(Map<String, String> extendInfo) {
      if (extendInfo.containsKey(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT)) {
        String jsonVal = extendInfo.get(Constants.EXTEND_INFO_CONFIG_DUMP_EVENT);
        if (StringUtils.isNotBlank(jsonVal)) {
          NotifyCenter.publishEvent(JacksonUtils.toObj(jsonVal, ConfigDumpEvent.class));
        }
        return;
      }
      // ...}}Copy the code

The ConfigDumpEvent event is handled just as before with the MySQL data source, dumping the Derby database configuration to the local file system, then synchronizing it to the CacheItem of the memory configuration, and finally responding to the client’s long polling request. The following figure from Chapter 2 shows that the logic between the/Configs interface and ConfigDumpEvent is different from that of a non-Derby cluster.

Second, linear consistent reading

The Derby cluster starts, and instead of reading the Derby database directly, it reads the local file system. For example, here is the GET /configs query configuration interface logic.

// ConfigServletInner
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group, String tenant, String tag, String clientIp) throws IOException, ServletException {
    // ...
    // If deployed on a single machine and using a Derby data source, query the real-time configuration
    if (PropertyUtil.isDirectRead()) {
      configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
    } else {
      // If a cluster is deployed or mysql is used, read the configuration in the local file system
      file = DiskUtil.targetFile(dataId, group, tenant);
    }
		// ...
}
Copy the code

So the linear consistent read here mainly exists in the Dump phase, where data from the database is dumped to the local file system. For example, after the configuration center is started, data from the Derby database needs to be dumped to the local file system for later queries.

PersistService implementation class EmbeddedStoragePersistServiceImpl, queryConfigInfo query configuration method is as follows.

@Override
public ConfigInfoWrapper queryConfigInfo(final String dataId, final String group, final String tenant) {
    String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;
    final String sql = "SELECT ID,data_id,group_id,tenant_id,app_name,content,type,gmt_modified,md5 FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?";

    return databaseOperate.queryOne(sql, new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_WRAPPER_ROW_MAPPER);
}
Copy the code

As is the cluster start, enter DistributedDatabaseOperateImpl# queryOne method.

@Override
public <R> R queryOne(String sql, Object[] args, RowMapper<R> mapper) {
    try {
        byte[] data = serializer.serialize(
                SelectRequest.builder().queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
                        .className(mapper.getClass().getCanonicalName()).build());

        final boolean blockRead = EmbeddedStorageContextUtils
                .containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
        
        Response response = innerRead(
                ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
        if (response.getSuccess()) {
            return serializer.deserialize(response.getData().toByteArray(),
                    ClassUtils.resolveGenericTypeByInterface(mapper.getClass()));
        }
        throw new NJdbcException(response.getErrMsg(), response.getErrMsg());
    } catch (Exception e) {
        throw newNacosRuntimeException(NacosException.SERVER_ERROR, e.toString()); }}private Response innerRead(ReadRequest request, boolean blockRead) throws Exception {
    if (blockRead) {
      return (Response) protocol.aGetData(request).join();
    }
    return protocol.getData(request);
}
Copy the code

Finally, we go to JRaftProtocol to handle ReadRequest.

// JRaftProtocol
@Override
public CompletableFuture<Response> aGetData(ReadRequest request) {
    return raftServer.get(request);
}
Copy the code

JRaftServer’s get method, which handles ReadRequest. Here, sofa- Jraft’s Node.readIndex method is called to handle consistent reads, and if it fails, the raft process will be demoted (half commit, log applied, same as the write process).

// JRaftServer
CompletableFuture<Response> get(final ReadRequest request) {
    final String group = request.getGroup();
    CompletableFuture<Response> future = new CompletableFuture<>();
    final RaftGroupTuple tuple = findTupleByGroup(group);
    final Node node = tuple.node;
    final RequestProcessor processor = tuple.processor;
    // 1. ReadIndex is preferred for consistency reading
    try {
        node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
            @Override
            public void run(Status status, long index, byte[] reqCtx) {
                // 2. After the JRaft processing is complete, a callback is received here and the client responds
                if (status.isOk()) {
                    try {
                        Response response = processor.onRequest(request);
                        future.complete(response);
                    } catch (Throwable t) {
                        future.completeExceptionally(new ConsistencyException());
                    }
                    return;
                }
                // ...}});return future;
    } catch (Throwable e) {
        // 3. All RPC calls are abnormal and raft flow is ignored
        readFromLeader(request, future);
        returnfuture; }}Copy the code

When sofA-Jraft framework processes and readIndex<=applyIndex is found, ReadIndexClosure receives a callback and can read data in the local state machine, that is, in the current Derby data source.

// DistributedDatabaseOperateImpl
public Response onRequest(final ReadRequest request) {
    final SelectRequest selectRequest = serializer
            .deserialize(request.getData().toByteArray(), SelectRequest.class);
    final RowMapper<Object> mapper = RowMapperManager.getRowMapper(selectRequest.getClassName());
    final byte type = selectRequest.getQueryType();
    readLock.lock();
    Object data;
    try {
        switch (type) {
            case QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS:
                data = queryOne(jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), mapper);
                break;
            case QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS:
                data = queryOne(jdbcTemplate, selectRequest.getSql(),
                        ClassUtils.findClassByName(selectRequest.getClassName()));
                break;
            // ...
            default:
                throw new IllegalArgumentException("Unsupported data query categories");
        }
        ByteString bytes = data == null ? ByteString.EMPTY : ByteString.copyFrom(serializer.serialize(data));
        return Response.newBuilder().setSuccess(true).setData(bytes).build();
    } catch (Exception e) {
        return Response.newBuilder().setSuccess(false) .setErrMsg(...) .build(); }finally{ readLock.unlock(); }}Copy the code

conclusion

Background: Nacos configura the central cluster to start, using Derby to embed the data source.

  • Written agreement
    • Encapsulate SQL and ARGS as a ModifyRequest into ThreadLocal
    • Place ConfigDumpEvent into ThreadLocal
    • Using the parameters in ThreadLocal, build WriteRequest and call sofa-jraft’s ** node.apply (Task)** method to submit a WriteRequest to the Raft cluster
    • When more than half of the nodes commit logs successfully, the onApply method of NacosStateMachine of all nodes (whether the leader finds more than half of the nodes commit or the followers copy the logs) will be called. The SQL in ModifyRequest will be executed and ConfigDumpEvent in the Request will be published.
  • Linear consistent reading
    • Scenario: Dump data from a Derby database to a local file system
    • JRaftServer calls sofa-jraft’s Node.readIndex method to perform a consistent read. If the consistent read fails, downgrade the raft process as the write process to achieve linear consistency.
    • When sofa- Jraft framework finds readIndex<=applyIndex, ReadIndexClosure receives a callback and can read data in the local state machine, that is, in the current Derby data source.