sequence

This article focuses on the replication approach of major open source products.

replication

Replication and Partition/Sharding are two essential capabilities for distributed systems. For details, see Replication, Sharding, and Routing. For large amounts of data, replication can increase redundancy, ensure system availability, and improve read efficiency. This article focuses on Replication, which assumes that each node is large enough to hold an entire copy.

replication type

According to whether leaders exist and the number of leaders, they can be divided into:

  • Single leader replication is a replication mode in which the leader synchronizes and notifies the followers. Only the leader can accept write operations while the followers can only read but cannot write.

  • Multi leader replication means that multiple leaders are distributed on different nodes and receive write operations at the same time. Each leader is a follower. It is suitable for multi-DC scenarios, but the complexity of conflict resolution is increased for concurrent writes to multi-DC scenarios.

  • Leaderless Replication has no center, no distinction between master and slave replicas, any node can receive requests, and then it notifies other replicas of updates.

leader-based replication way

For details, see the copy update policies

  • Sync replication. This ensures strong consistency, but is rarely used when there are too many followers
  • Async replication Asynchronous replication, which may cause inconsistent reads but high write efficiency
  • Semi Sync replication: Generally, the quorum mechanism is used. When the number of nodes to be written meets the specified conditions, the data is successfully written. Multiple nodes are concurrently requested to ensure read consistency

leaderless replication way

Acentric replication can be divided into three topologies: ring, star/tree, and mesh

replication implementation

It is mainly divided into the following types

  • Statement /trigger-based replication is a database based replication, but there are some problems, such as now(), rand(), seq() and other functions can cause the master/slave synchronization uncertainty. For example, the result of executing now()/rand() from the node is different from that of the master. Mysql5.1 used this before, 5.1+ switched to row-based log replication when there were indeterminate statements

  • Write-ahead-log replication(WAL) WAL is an efficient logging algorithm for databases. For non-memory databases, disk I/O operations are a major bottleneck to database efficiency. For the same amount of data, a database system using WAL logs can perform about half of the disk write operations of traditional rollback logs at transaction commit time, greatly improving the efficiency of database disk I/O operations and thus improving database performance. This is what PG uses.

  • WAL is coupled to the database storage engine. Row-based log, also known as logical log, is independent of the storage engine. The change Data Capture approach is used to facilitate data synchronization between heterogeneous data sources.

Replication issues

replication lag

  • For example, the Oplog of Mongo is too small to keep up with the writing speed. As a result, old operation logs are discarded. As a result, the master/slave delay keeps increasing, leading to copy synchronization failure.
  • The synchronization of new nodes, for example, adding replication online, involves replication of new nodes. In general, this synchronization mode is separate from the synchronization mode of normal online nodes, and the new nodes are synchronized to the normal incremental synchronization mode only after a certain period of time.

master slave failover

General Replication adds redundancy. It is used to perform hot backup (query supported) or warm backup (query not supported) for the master.

  • When the primary node hangs, there is a question of which replication to choose as the primary
  • When the old master is restored, it involves handling data differences between the old master and the new master

    read consistency

    Once Replication supports reads, then read consistency is involved, and in general there are several final consistencies in addition to strong consistency:

  • (1) Causal consistency i.e. process A informs process B after updating data, and the range of the data received by process B is the latest value updated by process A.

  • When process A updates an item of data, it can always access the latest value that it updated.
  • (3) Session consistency Data consistency is defined in the Session to achieve consistency of what is read and written in a Session. That is, after the update, the client can always read the latest value of the data in the same session
  • Monotonic read consistency If a process reads a value of an item from the system, the system should not return the older value for any subsequent access to that process.
  • (5) Monotoic write consistency A system needs to ensure that all write operations from the same process are executed sequentially.

Reading involves reading what has been written, causal reading (for ordered operations), monotone reading (not reading old data)

The Quorum /RWN solution resolves read conflicts

write quorum

Assume that a piece of data needs to be replicated to three nodes. To ensure consistency, only two nodes (i.e., more than half of the nodes) need to confirm the write operation rather than all nodes. In this case, if there are two conflicting writes, only one of them can be recognized by more than half of the nodes, and that’s write quorum. In a slightly more formal way, that’s W>N/2. This inequality means that the number of nodes participating in writes, W, The number of replicas must be more than half of N, which is also known as replication factor.

read quorum

Read quorum, or how many nodes must be contacted to ensure access to the latest data. Assuming that the write operation requires two nodes for confirmation (W=2), we have to contact at least two nodes to get the latest data. However, if some write operation is confirmed by only one node (W=1), then we must communicate with all three nodes to ensure that the data retrieved is up to date. In one case, update conflicts may occur because writes do not get enough node support. However, such conflicts can always be detected if the data is read from a sufficient number of nodes. Therefore, even when the write operation does not have strong consistency, it is possible to achieve strong consistency of the read operation.

RWN

  • R Number of nodes to contact when the read operation is performed R
  • W Number of nodes that need to be consulted to confirm the write operation W
  • N Replicator N

The relationship between the three can be expressed by an inequality, that is, only when R+W>N can the strong consistency of the read operation be guaranteed.

Replication overview of major open source products

product Copy the way implementation other
mysql Master/slave semi-synchronization MySQL 5.0 and earlier only support Statement-based replication. MySQL 5.1+ supports row-based log replication when there are uncertain statements Master/slave delay processing
kafka The primary and secondary ISRs are semi-synchronized The leader writes the message and copies it to all followers. A commit succeeds only when the copy in the ISR is successfully written and an ACK is returned to the leader The producer can choose whether to wait for an ISR ACK or not
elasticsearch If primary/secondary synchronization is semi-synchronous, replication=sync by default Consistency Optional values are quorum, one, and all. The default setting is quorum Tradelog and Fsync and refresh
pg Asynchronous primary/secondary replication Based on the Write – ahead log Archive and Stream
redis Asynchronous primary/secondary replication Incremental Redis Protocol(Full, Incremental, long connection) Sentinel failover
mongo Master/slave asynchronous,Replica set mode Persistent ring-buffer local.oplog. Rs (initial_sync,steady-sync) Arbiter chosen master

For instance, es is based on the quorum mechanism, whereas Kafka is based on the ISR mechanism. Both can be configured with asynchronous replication. For newly added nodes and recovery nodes, the synchronization can be performed asynchronously. Different synchronization modes are adopted. New nodes generally adopt full synchronization, while nodes in the normal state generally adopt incremental synchronization

Kafka’s ISR (Short for in-sync Replicas, it stands for replica synchronization queue)

All replicas are collectively called Assigned Replicas, or AR. The ISR is a subset of AR. The LEADER maintains the ISR list. The followers synchronize data from the leader with a delay. New followers are also stored in the OSR first. AR = ISR + OSR.

After the producer sends a message to the broker, the leader writes the message and copies it to all followers. The message is successfully copied to all synchronized copies after it is committed. Message replication latency is limited by the slowest follower. It is important to detect slow copies quickly. If a follower “falls too far behind” or fails, the leader will remove it from the ISR.

ISR and HW and LEO flow process

Thus, Kafka’s replication mechanism is neither fully synchronous nor purely asynchronous. In fact, synchronous replication requires all working followers to copy before the message is committed. This replication greatly affects throughput. In asynchronous replication mode, the followers asynchronously copy data from the leader, and the data is considered to have been committed once the log is written by the leader. In this case, if the followers have not completed the replication and fall behind the leader, the leader breaks down suddenly, and the data is lost. Kafka uses ISR in a balanced way to ensure data loss and throughput.

Es copy consistency

The consistency of ES mainly has two aspects:

  • Refresh issues with the Lucene indexing mechanism


    Between Elasticsearch and the disk is the file system cache. Documents in the in-memory index buffer are written to a new segment, but here the new segment is written to the file system cache first — a cheaper step, and flushed to disk later — a more expensive step. But once the file is already in the cache, it can be opened and read like any other file.

    In Elasticsearch, the lightweight process of writing and opening a new segment is called refresh. By default, each shard is automatically refreshed once per second. This is why Elasticsearch is near real time: changes to a document are not immediately visible to the search, but become visible within a second. This behavior can cause confusion for new users: they index a document and then try to search for it, but they don’t find it. The solution to this problem is to perform a manual refresh using the REFRESH API. Refresh_interval can be dynamically updated on the existing index. In a production environment, when you are creating large new indexes, you can turn off auto-refresh and then call them back when you start using the indexes.

  • Data from the Elasticsearch node to the response from the Elasticsearch node in the case of replication

  • 1) Client requests are sent to Node1 node, which can also be sent to other nodes
  • 2) Node1 calculates that the data should be stored on Shard0 based on the data _id. According to the cluster state information, it is found that the main shard of Shard0 is on Node3. Node1 forwards the requested data to Node3, and Node3 completes the data index.
  • 3) Node3 forwards data in parallel to shard Node1 and Node2 with shard0 assigned. After receiving a report from any node, Node3 returns to Node1, the original receiving node, declaring that the data has been written successfully. Node1 is successfully returned to the client.

summary

The details of replication vary from product to product, but the general theory is the same. In addition to the above mentioned methods of replication, you need to pay attention to replication anomaly scenarios to achieve a mature application.

doc

  • Deep interpretation of Kafka data reliability
  • RWN and Quorum are strongly consistent
  • Es consistency problem
  • Es writing process
  • Elasticsearch Read/write operations for Elasticsearch fragments
  • ElasticSearch – Sharding internals