preface

Because RheaKV is going to be a long story, so I’m going to break it up into several chapters, and I’m going to talk about what RheaKV initialization does.

Let’s start with an example. Let’s start with an example:

public static void main(final String[] args) throws Exception {
    final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
            .withFake(true) // use a fake pd
            .config();
    final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
            .withStorageType(StorageType.RocksDB)
            .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
            .withRaftDataPath(Configs.RAFT_DATA_PATH)
            .withServerAddress(new Endpoint("127.0.0.1".8181))
            .config();

    final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
            .withClusterName(Configs.CLUSTER_NAME) //
            .withInitialServerList(Configs.ALL_NODE_ADDRESSES)
            .withStoreEngineOptions(storeOpts) //
            .withPlacementDriverOptions(pdOpts) //
            .config();
    System.out.println(opts);
    final Node node = new Node(opts);
    node.start();
    Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
    System.out.println("server1 start OK");
}
Copy the code

The no PD setting is used here to simplify logic

Implementation of Node:

public class Node {

    private final RheaKVStoreOptions options;

    private RheaKVStore              rheaKVStore;

    public Node(RheaKVStoreOptions options) {
        this.options = options;
    }

    public void start(a) {
        this.rheaKVStore = new DefaultRheaKVStore();
        this.rheaKVStore.init(this.options);
    }

    public void stop(a) {
        this.rheaKVStore.shutdown();
    }

    public RheaKVStore getRheaKVStore(a) {
        returnrheaKVStore; }}Copy the code

So what you’re doing here is initializing a DefaultRheaKVStore and calling its init method to initialize it

RheaKV stores DefaultRheaKVStore by default

Because DefaultRheaKVStore is initialized in the init method, here’s how to initialize DefaultRheaKVStore.

public synchronized boolean init(final RheaKVStoreOptions opts) {
    //1. If already started, return directly
    if (this.started) {
        LOG.info("[DefaultRheaKVStore] already started.");
        return true;
    }
    this.opts = opts;
    // init placement driver
    // 2. Set PD according to PDoptions
    final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
    final String clusterName = opts.getClusterName();
    Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
    Requires.requireNonNull(clusterName, "opts.clusterName");
    // Set the cluster
    if (Strings.isBlank(pdOpts.getInitialServerList())) {
        // if blank, extends parent's value
        pdOpts.setInitialServerList(opts.getInitialServerList());
    }
    // In no PD scenario, RheaKV provides Fake PD Client
    if (pdOpts.isFake()) {
        this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
    } else {
        this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
    }
    // Initialize PD
    if (!this.pdClient.init(pdOpts)) {
        LOG.error("Fail to init [PlacementDriverClient].");
        return false;
    }
    // init store engine
    //3. Initialize the storage engine
    final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
    if(stOpts ! =null) {
        stOpts.setInitialServerList(opts.getInitialServerList());
        this.storeEngine = new StoreEngine(this.pdClient);
        // Initialize the storage engine
        if (!this.storeEngine.init(stOpts)) {
            LOG.error("Fail to init [StoreEngine].");
            return false; }}// Get the IP address and port number of the current node
    final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
    final RpcOptions rpcOpts = opts.getRpcOptions();
    Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
    //4. Initialize a RpcService and override the getLeader method
    this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {

        @Override
        public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
            final Endpoint leader = getLeaderByRegionEngine(regionId);
            if(leader ! =null) {
                return leader;
            }
            return super.getLeader(regionId, forceRefresh, timeoutMillis); }};if (!this.rheaKVRpcService.init(rpcOpts)) {
        LOG.error("Fail to init [RheaKVRpcService].");
        return false;
    }
    // The number of retries. The default value is two
    this.failoverRetries = opts.getFailoverRetries();
    / / the default 5000
    this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
    // Whether to read data only from the leader. Default is true
    this.onlyLeaderRead = opts.isOnlyLeaderRead();
    //5. Initialize kvDispatcher, which Rimmer considers true
    if (opts.isUseParallelKVExecutor()) {
        // Get the current CPU
        final int numWorkers = Utils.cpus();
        // If you move 4 to the left, multiply by 16
        final int bufSize = numWorkers << 4;
        final String name = "parallel-kv-executor";
        final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
                // Whether to enable thread affinity ThreadFactory
                ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
        // Initialize the Dispatcher
        this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT,
                threadFactory);
    }
    this.batchingOpts = opts.getBatchingOptions();
    // The default is true
    if (this.batchingOpts.isAllowBatching()) {
        // These batching do not know what they are used for
        this.getBatching = new GetBatching(KeyEvent::new."get_batching".new GetBatchingHandler("get".false));
        this.getBatchingOnlySafe = new GetBatching(KeyEvent::new."get_batching_only_safe".new GetBatchingHandler("get_only_safe".true));
        this.putBatching = new PutBatching(KVEvent::new."put_batching".new PutBatchingHandler("put"));
    }
    LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
    return this.started = true;
}
Copy the code
  1. Check whether it is started, if it is started, then return directly
  2. Set PD based on PDoptions. PD is the global central controller and manages the scheduling of the entire cluster and maintains the RegionRouteTable routing table. Here we do not enable the PD, so instantiate a FakePlacementDriverClient, and initialized
  3. To initialize the storage engine, the StoreEngine storage engine supports MemoryDB and RocksDB. RocksDB is used here, and init method will be added below
  4. This section describes how to initialize a rheaKVRpcService to encapsulate THE RPC Client of the KV storage service and implement Failover logic. FutureTimeoutMillis is 5000 milliseconds. By default, only data is read from the leader
  5. Initialize kvDispatcher

Initialize the storage engine

Init (); init (); StoreEngine init ();

StoreEngine#init

public synchronized boolean init(final StoreEngineOptions opts) {
    if (this.started) {
        LOG.info("[StoreEngine] already started.");
        return true;
    }
    this.storeOpts = Requires.requireNonNull(opts, "opts");
    Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
    // Get the IP address and port
    final int port = serverAddress.getPort();
    final String ip = serverAddress.getIp();
    // If the incoming IP is empty, set the boot machine IP as the IP of serverAddress
    if (ip == null || Utils.IP_ANY.equals(ip)) {
        serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
        opts.setServerAddress(serverAddress);
    }
    // Get the measurement reporting time
    final long metricsReportPeriod = opts.getMetricsReportPeriod();
    // init region options
    List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
    //1. If RegionEngineOptions is empty, one is initialized by default
    if (rOptsList == null || rOptsList.isEmpty()) {
        // -1 region
        final RegionEngineOptions rOpts = new RegionEngineOptions();
        rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
        rOptsList = Lists.newArrayList();
        rOptsList.add(rOpts);
        opts.setRegionEngineOptionsList(rOptsList);
    }
    // Get the cluster name
    final String clusterName = this.pdClient.getClusterName();
    //2. Traverse the rOptsList collection and set the parameters for the RegionEngineOptions object in it
    for (final RegionEngineOptions rOpts : rOptsList) {
        RegionId: RaftGroupId: RegionId: RaftGroupId: RegionId: RaftGroupId
        rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
        rOpts.setServerAddress(serverAddress);
        rOpts.setInitialServerList(opts.getInitialServerList());
        if (rOpts.getNodeOptions() == null) {
            // copy common node options
            rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
                .getCommonNodeOptions().copy());
        }
        // If the measurement reporting time was not set, reset it
        if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
            // extends store opts 300rOpts.setMetricsReportPeriod(metricsReportPeriod); }}// init store
    // 3. Initialize Store and region in Store
    final Store store = this.pdClient.getStoreMetadata(opts);
    if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
        LOG.error("Empty store metadata: {}.", store);
        return false;
    }
    this.storeId = store.getId();
    // init executors
    //4. Initialize the actuator
    if (this.readIndexExecutor == null) {
        this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
    }
    if (this.raftStateTrigger == null) {
        this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
    }
    if (this.snapshotExecutor == null) {
        this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads());
    }
    // Init RPC executors Default value: false
    final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
    //5. Initialize the RPC remote executor to execute RPCServer Processors
    if(! useSharedRpcExecutor) {if (this.cliRpcExecutor == null) {
            this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
        }
        if (this.raftRpcExecutor == null) {
            this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
        }
        if (this.kvRpcExecutor == null) {
            this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads()); }}// init metrics
    // Do metrics
    startMetricReporters(metricsReportPeriod);
    // init rpc server
    //6. Initialize rpcServer for other services to call
    this.rpcServer = new RpcServer(port, true.true);
    // Add various processors to the server
    RaftRpcServerFactory.addRaftRequestProcessors(this.rpcServer, this.raftRpcExecutor, this.cliRpcExecutor);
    StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
    if (!this.rpcServer.start()) {
        LOG.error("Fail to init [RpcServer].");
        return false;
    }
    // init db store
    //7. Select db according to different types
    if(! initRawKVStore(opts)) {return false;
    }
    // init all region engine
    // 8. Initialize RegionEngine for each region
    if(! initAllRegionEngine(opts, store)) { LOG.error("Fail to init all [RegionEngine].");
        return false;
    }
    // heartbeat sender
    // If self-managed cluster is enabled, the heartbeat sender needs to be initialized
    if (this.pdClient instanceof RemotePlacementDriverClient) {
        HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
        if (heartbeatOpts == null) {
            heartbeatOpts = new HeartbeatOptions();
        }
        this.heartbeatSender = new HeartbeatSender(this);
        if (!this.heartbeatSender.init(heartbeatOpts)) {
            LOG.error("Fail to init [HeartbeatSender].");
            return false; }}this.startTime = System.currentTimeMillis();
    LOG.info("[StoreEngine] start successfully: {}.".this);
    return this.started = true;
}
Copy the code

Let’s look down from the code marked above:

  1. RegionEngineOptionsList checks whether StoreEngineOptions’ regionEngineOptionsList is empty. If it is empty, it initializes one by default and adds it to the rOptsList collection
  2. Iterate over the rOptsList collection and set the cluster information for the RegionEngineOptions object in it
  3. Instantiate the Store and initialize the region in it according to RegionEngineOptions
  4. Initializes the actuator
  5. Initialize the RPC remote executor to execute RPCServer Processors
  6. Initialize rpcServer for other services to call
  7. The StoreEngine storage engine supports MemoryDB and RocksDB. MemoryDB is implemented based on ConcurrentSkipListMap.
  8. Initialize the RegionEngine for each region

Initialize stores and regions in stores

Here is called pdClient getStoreMetadata method to initialize, here we see FakePlacementDriverClient implementation: FakePlacementDriverClient# getStoreMetadata

public Store getStoreMetadata(final StoreEngineOptions opts) {
    // instantiate store
    final Store store = new Store();
    final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
    final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
    store.setId(-1);
    store.setEndpoint(opts.getServerAddress());
    for (final RegionEngineOptions rOpts : rOptsList) {
        // Initialize the Region instance according to rOpts and add it to the regionList
        regionList.add(getLocalRegionMetadata(rOpts));
    }
    store.setRegions(regionList);
    return store;
}
Copy the code

This method instantiates a store and iterates through the rOptsList collection. Within the loop, getLocalRegionMetadata is invoked to instantiate region based on RegionEngineOptions. It is then added to the regionList collection. There is a one-to-one correspondence between the subscripts of the main rOptsList and regionList lists, which is used in the following code.

Here you can understand a little bit:

This diagram makes sense. There are many regions under each store.

Then let’s look at how Region is initialized: Here is to call the superclass AbstractPlacementDriverClient FakePlacementDriverClient getLocalRegionMetadata for initialization AbstractPlacementDriverClient#getLocalRegionMetadata

protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
    final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
    Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
                                                                     + Region.MIN_ID_WITH_MANUAL_CONF);
    Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
                                                                    + Region.MAX_ID_WITH_MANUAL_CONF);
    final byte[] startKey = opts.getStartKeyBytes();
    final byte[] endKey = opts.getEndKeyBytes();
    final String initialServerList = opts.getInitialServerList();
    // Instantiate region
    final Region region = new Region();
    final Configuration conf = new Configuration();
    // region
    region.setId(regionId);
    region.setStartKey(startKey);
    region.setEndKey(endKey);
    region.setRegionEpoch(new RegionEpoch(-1, -1));
    // peers
    Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
    // Resolve the cluster IP address and port to peer
    conf.parse(initialServerList);
    // Each region holds cluster information
    region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
    this.regionRouteTable.addOrUpdateRegion(region);
    return region;
}
Copy the code

Region is the smallest KV data unit, which can be understood as a data partition or fragment. Each Region has an interval [startKey, endKey], where the initialization is null. Automatically split and copy relocation based on request traffic, load, and data volume. Region Multiple replicas Replication Builds Raft Groups to Store data on different Store nodes. Data is synchronized to all nodes in the same Group using the Raft protocol log Replication function.

RegionRouteTable (regionRouteTable); regionRouteTable (regionRouteTable);

public void addOrUpdateRegion(final Region region) {
    Requires.requireNonNull(region, "region");
    Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
    final long regionId = region.getId();
    final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
    final StampedLock stampedLock = this.stampedLock;
    final long stamp = stampedLock.writeLock();
    try {
        this.regionTable.put(regionId, region.copy());
        this.rangeTable.put(startKey, regionId);
    } finally{ stampedLock.unlockWrite(stamp); }}Copy the code

In this method, region is stored into the regionTable based on the regionId, and then into the rangeTable based on the startKey as the key.

Initialize the RegionEngine for each region

RegionEngine is initialized in initAllRegionEngine: StoreEngine#initAllRegionEngine

private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
    Requires.requireNonNull(opts, "opts");
    Requires.requireNonNull(store, "store");
    // Get the home directory
    String baseRaftDataPath = opts.getRaftDataPath();
    if (Strings.isNotBlank(baseRaftDataPath)) {
        try {
            FileUtils.forceMkdir(new File(baseRaftDataPath));
        } catch (final Throwable t) {
            LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
            return false; }}else {
        baseRaftDataPath = "";
    }
    final Endpoint serverAddress = opts.getServerAddress();
    final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
    final List<Region> regionList = store.getRegions();
    // regionList is initialized from rOptsList, so check that the same number is the same
    Requires.requireTrue(rOptsList.size() == regionList.size());
    for (int i = 0; i < rOptsList.size(); i++) {
        // Get the corresponding RegionEngineOptions and Region
        final RegionEngineOptions rOpts = rOptsList.get(i);
        final Region region = regionList.get(i);
        // If the region path is empty, reset the value
        if (Strings.isBlank(rOpts.getRaftDataPath())) {
            final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
            rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
        }
        Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
        // Initialize RegionEngine based on Region
        final RegionEngine engine = new RegionEngine(region, this);
        if (engine.init(rOpts)) {
            //KV Server Server request processing service
            // Each RegionKVService corresponds to a Region and only processes requests in its Region category
            final RegionKVService regionKVService = new DefaultRegionKVService(engine);
            RegionKVServiceTable (regionKVServiceTable
            registerRegionKVService(regionKVService);
            // Set the region and engine mapping table
            this.regionEngineTable.put(region.getId(), engine);
        } else {
            LOG.error("Fail to init [RegionEngine: {}].", region);
            return false; }}return true;
}
Copy the code

The method first initializes a baseRaftDataPath as the primary directory and then pulls out both rOptsList and regionList, iterating over the rOptsList, RegionEngine is instantiated for each region, Finally, RegionKVService is put into the regionKVServiceTable mapping table, and Region is put into the regionEngineTable mapping table

RegionKVServic is the request processing service of the KV Server. A StoreEngine contains many RegionKVServices. Each RegionKVService corresponds to a Region. Only requests within its own Region category are processed.

Initialize RegionEngine# init

public synchronized boolean init(final RegionEngineOptions opts) {
    if (this.started) {
        LOG.info("[RegionEngine: {}] already started.".this.region);
        return true;
    }
    this.regionOpts = Requires.requireNonNull(opts, "opts");
    // Instantiate the state machine
    this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);

    // node options
    NodeOptions nodeOpts = opts.getNodeOptions();
    if (nodeOpts == null) {
        nodeOpts = new NodeOptions();
    }
    // If the measurement interval is greater than zero, then the measurement is enabled
    final long metricsReportPeriod = opts.getMetricsReportPeriod();
    if (metricsReportPeriod > 0) {
        // metricsReportPeriod > 0 means enable metrics
        nodeOpts.setEnableMetrics(true);
    }
    // Initialize the cluster configuration
    nodeOpts.setInitialConf(new Configuration(JRaftHelper.toJRaftPeerIdList(this.region.getPeers())));
    nodeOpts.setFsm(this.fsm);
    // Initializes the paths of various logs
    final String raftDataPath = opts.getRaftDataPath();
    try {
        FileUtils.forceMkdir(new File(raftDataPath));
    } catch (final Throwable t) {
        LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
        return false;
    }
    if (Strings.isBlank(nodeOpts.getLogUri())) {
        final Path logUri = Paths.get(raftDataPath, "log");
        nodeOpts.setLogUri(logUri.toString());
    }
    if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
        final Path meteUri = Paths.get(raftDataPath, "meta");
        nodeOpts.setRaftMetaUri(meteUri.toString());
    }
    if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
        final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
        nodeOpts.setSnapshotUri(snapshotUri.toString());
    }
    LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.".this.region,
        nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
    final Endpoint serverAddress = opts.getServerAddress();
    final PeerId serverId = new PeerId(serverAddress, 0);
    final RpcServer rpcServer = this.storeEngine.getRpcServer();
    this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
    // Initialize the node
    this.node = this.raftGroupService.start(false);
    RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
    if (this.node ! =null) {
        final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
        final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
        //RaftRawKVStore is the storage implementation of the RawKVStore interface based on the Raft replication state machine KVStoreStateMachine in RheaKV
        // The Raft entry for RheaKV, where the Raft process starts
        this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
        // Intercept requests for metrics
        this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
        // metrics config
        if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
            final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
            if(metricRegistry ! =null) {
                final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
                // start raft node metrics reporter
                this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
                    .prefixedWith("region_" + this.region.getId()) //
                    .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
                    .outputTo(LOG) //
                    .scheduleOn(scheduler) //.shutdownExecutorOnStop(scheduler ! =null) //
                    .build();
                this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS); }}this.started = true;
        LOG.info("[RegionEngine] start successfully: {}.".this);
    }
    return this.started;
}
Copy the code

This is where I find some examples that I was familiar with in the first lecture. If not, check out my first article: 1. Source Code Analysis of SOFAJRaft — What does SOFAJRaft do when it starts? This is where the state machine is instantiated, which is an instance of KVStoreStateMachine; Set LogUri, RaftMetaUri, SnapshotUri, and get rpcServer in storeEngine. Start raftGroupService to return the initialized node. We then instantiate raftRawKVStore, which is the Raft entry point to RheaKV from where we start the Raft process and all the RheaKV data is processed.

conclusion

RheaKV initialization also covers a lot of content. This article explains which components need to be initialized when RheaKV starts, what is the relationship between Store and Region, where JRaft is started, where the state machine is set, etc. The content is also very rich. It is also a pleasure to see a good architectural design.