About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

The author of this article is Jiang Moujing, senior R&D engineer of Yipin Fresh. Led the design and development of data pipeline system, used Apache Pulsar as a data synchronization tool, and implemented the requirements of various application scenarios for incremental data synchronization. He plans to further platform and visualize data pipelines and access richer database type support.

background

Data pipes are used to transfer data from one place to another through certain transmission media to achieve data synchronization or replication to meet application requirements. As the volume of business and data increases dramatically, our existing microservices need to be refined (split) again.

System split how to do so that users do not perceive it? When it goes online, some users are diverted to new services through the diversion strategy. The new and old systems are required to run in parallel for a period of time to support the trial operation of new services until they are fully implemented, thus minimizing production failures. In order for the new service data to be consistent in real time with the data in the old system services, the data needs to be synchronized. To increase the query speed, copy data to ElasticSearch.

There are open source data synchronization products and commercial data channel tools on the market that enable bilateral data synchronization replication without human intervention. However, some table structure or table object changes may occur in system reconstruction, which is not compatible with commercial data synchronization and requires the intervention of developers for relevant processing. We adopted Maxwell + Pulsar’s self-developed solution: Maxwell was used to read binlog and Pulsar was used to transmit data. Maxwell + Pulsar realizes data reading at the upper layer and data synchronization logic at the downstream side. For example, you can reconstruct split data synchronization and read/write separation to synchronize data replication to a search engine such as ElasticSearch.

Why Pulsar?

In the system refactoring of the data pipeline, we chose Apache Pulsar for the following reasons:

  • Stateless. In microservices architecture, middleware is best stateless. It starts fast, can be replaced at any time and can achieve seamless scaling and elastic expansion. Kafka is not stateless. Each Broker contains all the logs of the partition. If a Broker fails, no one Broker can take over. In the Pulsar architecture, data is stripped from the Broker and stored inside a shared store. The upper layer is the stateless Broker, which replicates message distribution and services (computation), and the lower layer is the persistent storage layer (Bookie). Therefore, data calculation and storage are independent of each other, and independent expansion and fast recovery of data can be realized.
  • Pulsar supports both stream processing and traditional message queuing, greatly increasing subscription flexibility.
  • Pulsar cloud native architecture facilitates horizontal elastic scaling and supports replication across geographies.
  • Pulsar supports partitioning, high throughput and low latency.
  • Open source community is active, technical support is responsive and service is good.

How does Pulsar ensure order in distributed consumption

First, let’s take a look at Pulsar’s subscription model. Pulsar has four subscription modes: exclusive mode, failover mode, Shared mode and Key_Shared mode. Exclusive mode has only one consumer that receives all messages from a Topic.

(Pulsar Exclusive mode)

In Failover mode, only one active consumer is available at a time, and the remaining consumers act as standby nodes to replace the master consumer when the master consumer becomes unavailable. (This mode is applicable to the scenario where a small amount of data is required to solve a single point of failure.)

In Shared mode, multiple consumers can connect to the same subscription topic. Messages are distributed among consumers in a polling manner, and any given message is delivered to only one consumer. At first, we adopted Shared mode, because Shared mode has distributed consumption capacity and fast consumption speed. However, data deviations and inconsistencies frequently occur between the source database and the synchronized target library (ElasticSearch and MySQL) during production. After investigation, it was found that the consumption order was abnormal. When multiple MQ messages were generated by frequent operation of a certain piece of data, multiple consumers in Shared mode consumed messages in parallel.

(Pulsar Shared mode consumption strategy)

Pulsar released Key_Shared mode based on Shared mode in version 2.4.0. In Key_Shared mode, multiple consumers can attach to the same subscription. Messages are distributed among consumers, and messages with the same key or order key are delivered to only one consumer, and are sent to the same consumer no matter how many times the message is resent. When a consumer connects or disconnects, the consumer of the service changes certain message keys. The Key_Shared mode ensures that messages with the same Key are sent to the same consumer in Shared mode, ensuring sequence while being concurrent.

(Pulsar Key_Shared mode consumption strategy)

Data synchronization scenarios have high requirements on the order of messages. When a user constantly updates a piece of data, the corresponding records in the database table are constantly updated. When the amount of data is large and the data is concurrent, ensure that the sequence of messages generated by changing data is consistent with the sequence of operations. Otherwise, the synchronized data may be inconsistent with the source data, resulting in system faults.

Order problem is a common problem in distributed consumption. To ensure orderly client consumption, we use the Key_Shared subscription model. The Key_Shared pattern is an extension of the Shared subscription pattern. A partition can have several consumers consume messages in parallel, but messages with the same key are routed to only one consumer. Its principle is to determine the target user through hash, each consumer provides a fixed range of hash values; The entire range of hash values can cover all consumers. The key is then specified when the message is produced (as shown below), forming a closed loop that allows orderly storage to the specified partition and orderly consumption of the message. Please refer to the Pulsar website for detailed principles and usage.

Key: {" database ", "you_db_name", "table" : "you_table_name", "pk. Id" : "you_table_Primary key}"Copy the code

How do I filter duplicate messages?

There are three guarantees for message transmission: At least once, At most once, and Exactly once.

  • At least once: Each message is successfully transmitted At least once.
  • At most once: A message is transmitted At most once and may be lost.
  • Exactly once: Each message is transmitted only once. The message transfer is neither lost nor repeated.

In data synchronization scenarios, to maximize message reachability, Maxwell’s At least once mode is used to ensure message transmission as far as possible. When the network is not ideal, the message may have been delivered to the target, but after receiving a timeout response or failing, Pulsar will deliver again, resulting in a “duplicate message.”

To solve the problem of duplicate messages, we added a filter to the data pipeline data link model to filter some duplicate, invalid, retry messages.(Other components of microservice architecture such as logging and link tracing are not shown in the figure for focus function)

conclusion

When a large amount of incremental data needs to be synchronized, Maxwell + Pulsar’s self-developed solution is adopted. Whether the Pulsar Key_Shared subscription mode can effectively solve the order problem in the distributed message consumption process and add filters in the data link of the data pipeline to ensure that the messages are not repeated and not missed.

In the future, we plan to build on the existing solution and make full use of Pulsar’s features to make the data pipeline into a visual data synchronization platform, access to more database extensions, and complete monitoring and logging systems.