takeaway

Apache Pulsar is a multi-tenant, high-performance inter-service message transfer solution that supports multi-tenant, low latency, read/write separation, cross-region replication, rapid expansion, and flexible fault tolerance. Its native support for transcontinental cross-territory replication solutions, combined with its own Tenant and namespace level abstractions, provides the flexibility to support a small variety of cross-territory replication solutions in different scenarios.

The authors introduce

RanXiaoLong

Apache Pulsar Committer, R&D Engineer, Tencent Cloud Micro Service Product Center

Apache BookKeeper Contributor

Demand meaning

Geo-replication was designed to, for one thing, easily spread services across multiple rooms. Second, it can cope with machine room level failure, that is, in the case of a machine room is unavailable, the service can be transferred to another machine room to continue to provide services.

Abstract

Apache Pulsar has the built-in function of multi-cluster replication across regions. Geo-repliaaction refers to enabling clusters scattered in different physical regions to replicate data among clusters through certain configurations.

Depending on whether the message is asynchronously read and written, cross-region replication can be divided into the following two schemes:

  • Synchronous mode: If the Dr Level of data is very high, you can use synchronous cross-city deployment mode. Data copies exist between cities. However, network fluctuation between cities has a significant impact on the performance, because the client succeeds only after data is written to all cities.

  • Asynchronous mode: If the data Dr Level is not high, you can use asynchronous cross-city deployment mode. For example, if there are two independent data centers in Shanghai and Toronto, a message written to Shanghai is asynchronously written to Toronto, which does not affect the main process performance and reduces the storage cost.

Below we discuss the cross-region replication scheme of PulSAR in asynchronous mode.

Pulsar currently supports the following three asynchronous cross-region replication solutions:

  • All connected

  • One-way replication

  • Failover mode

Whether from have configurationStoreServers (global zookeeper) Angle can be divided into the following two asynchronous cross-regional replication solution:

1. Have configurationStoreServers

  • All connected

2. 没有 configurationStoreServers

  • One-way replication

  • Failover mode

A core concept in the whole cross-region replication is whether the data between clusters can communicate with each other. The interaction between them depends on the following configuration information:

  • Cluster (Cluster name)

  • Zookeeper (Local Cluster ZK Servers)

  • Configuration-store (Global ZK Servers)

  • web-service-url

  • web-service-url-tls

  • broker-service-url

  • broker-service-url-tls

When initializing a Pulsar cluster, you can specify the following information:

bin/pulsar initialize-cluster-metadata \\
  --cluster pulsar-cluster-1 \\
  --zookeeper zk1.us-west.example.com:2181 \\
  --configuration-store zk1.us-west.example.com:2181 \\
  --web-service-url http://pulsar.us-west.example.com:8080 \\
  --web-service-url-tls https://pulsar.us-west.example.com:8443 \\
  --broker-service-url pulsar://pulsar.us-west.example.com:6650 \\
  --broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651
Copy the code

Full-mesh(fully connected)

The full-mesh format allows data to be shared across multiple clusters, as shown below:

Concept of analytical

  • ConfigurationStoreServers: store configuration information of each cluster, which asked to perceive each other between cluster address information. Tenant and Namespace information is also stored to simplify operations. When information about one cluster is updated, other clusters can obtain the information through Global ZooKeeper.

  • Tenant: What cluster (- allowed-Clusters) is allowed to operate on the currently created Tenant?

  • Namespace: Which clusters do the namespace currently created allow for data replication between?

The principle of

Data Replication between multiple clusters can be simplified to data Replication between two clusters. Based on this concept, the principle of Geo-replication is shown in the following figure:

Currently, there are two clusters deployed in Beijing and Shanghai respectively. When users use producer to send data in the Beijing cluster, they will first send data to the local cluster (TopIC1) in the Beijing machine room, and at the same time create a replication cursor, which is used to replicate data. Using this cursor information, you can determine which stage the data is being copied to. Replication producer reads data from TopIC1 in Beijing and writes it to TopIC1 in Shanghai. After receiving the request from the producer, the Broker in Shanghai writes the data to the topIC1 in Shanghai. Will be written to the same local topic (topic1). At this time, if users in Shanghai machine room start consumer to consume data, they will receive data information produced by producer in Beijing machine room. And vice versa.

The following issues need to be explained here:

  • In a fully connected scenario, data in the Beijing equipment room is replicated to the cluster in the Shanghai equipment room, and data in the Shanghai equipment room is also replicated to the Beijing equipment room. In this case, the Data in the Beijing equipment room is replicated to the Shanghai equipment room, and then the Shanghai equipment room copies the data back to Beijing, forming an infinite data loop. When a producer sends a message, it knows which cluster it is currently in. When the message is replicated by replication Producer, it will label the message with a label: Replication_from indicates where the message is from and can solve the problem of reverse replication.

  • In the geo-replication scenario, the exactly once semantics of messages (at-least-once + rewriter-side (producer-name + sequence ID)) can also be guaranteed.

  • The replication delay depends on the network delay between the two equipment rooms. If the delay is large, consider the network delay between the two equipment rooms.

After global ZooKeeper is configured, data is replicated bidirectional. All clusters mounted under Global ZooKeeper communicate with each other.

One-way replication

As mentioned above, when global ZooKeeper is configured, one-way data replication is not possible. However, in many scenarios, we do not need all the data between clusters to be fully connected. In such scenarios, we can consider using one-way data replication. One-way replication does not need the user configuration or designated configurationStoreServers alone, Only when the configuration needs to be configurationStoreServers zookeeper address the value of the configuration for the local cluster (zookeeperServers).

How do you do cross-cluster replication without configuring Global ZooKeeper?

As mentioned above, global ZooKeeper is mainly used to store address information of multiple clusters and corresponding namespace information, without additional metadata information. Therefore, in the one-way replication scenario, you need to tell the clusters in other rooms that you need to read the namespace information between the clusters.

Failover mode

Failover mode is a special case of one-way replication.

In Failover mode, the cluster of the remote machine room is only used to back up data, and there are no producers or consumers. Only after the active cluster goes down, Only then will the corresponding producer and consumer switch to the corresponding standby cluster to continue consumption. Because a Replication sub exists, the status of the subscription is also copied to the backup room.

Current problems with Pulsar Geo-replication

  • Pulsar can only ensure the order of messages in a single machine room, but cannot ensure the global order of messages in multiple machine rooms

  • Because cursor snapshot is conducted regularly, the time accuracy is not too high, and there is some deviation.

  • Currently, only the position of “Mark Delete Position” is synchronized. Messages that are individually signed cannot be synchronized.

  • You can only perform a cursor snapshot if all related clusters are in the “available” state.

  • When cursor Snapshot is used, there is some caching that affects subsequent calculations involving the backlog.

Phase to recommend

You may not understand Apache Pulsar’s Message Storage Model

To be or not to be: Pulsar’s Message retention and Expiration Strategies

3 times Pulsar Client Performance in one day!