Pulsar is introduced

Apache Pulsar, as the top project of Apache Software Foundation, is the next generation cloud native distributed message flow platform, which integrates message, storage and lightweight function computing, and adopts the architecture design of computing and storage separation. Supports multi-tenant, persistent storage, cross-region replication, strong consistency, high throughput, low latency, and high scalability streaming data storage features.

Pulsar was launched in 2012 as a way to integrate other messaging systems within Yahoo and build a unified logic messaging platform that supports large clusters and cross-regions. Other messaging systems at the time, including Kafka, failed to meet Yahoo’s needs, such as large cluster multi-tenant, reliable IO quality of service, million-level Topic, and cross-geographical replication.

The key features of Pulsar are as follows:

  • A single instance of Pulsar natively supports multiple clusters, seamlessly replicating messages between clusters across machine rooms.
  • Extremely low release latency and end-to-end latency.
  • Seamlessly scales to over a million topics.
  • Simple client API, support Java, Go, Python and C++.
  • Supports multiple Topic subscription modes (exclusive subscription, shared subscription, failover subscription).
  • Message delivery is guaranteed through the persistent message store mechanism provided by Apache BookKeeper.
  • Stream-native data processing is implemented by Pulsar Functions, a lightweight Serverless computing framework.
  • Pulsar IO, a Serverless Connector framework based on Pulsar Functions, makes it easier to move data in and out of Apache Pulsar.
  • Tiered storage offloads data from hot storage to cold/long term storage (such as S3 and GCS) when data becomes stale.

Community:

Apache Pulsar currently has a star count of 10K+ on Github with a total of 470+ contributors. And is constantly updated, the community is more active.

concept

Producer

The source of the message, and the publisher of the message, is responsible for sending the message to the topic.

Consumer

Message consumers, responsible for subscribing to and consuming messages from topics.

Topic

The carrier of message data, in Pulsar Topic can be specified into multiple partitions, if not set to default to only one partition.

Broker

The Broker is a stateless component that receives messages from Producer and delivers them to the Consumer.

BookKeeper

A distributed write-ahead logging system that provides storage services for messaging systems and cross-machine replication for multiple data centers.

Bookie

Bookie is the Apache BookKeeper server that provides persistence for messages.

Cluster

Apache Pulsar instance cluster, consisting of one or more instances.

Cloud Native Architecture

Apache Pulsar uses a computation-storage architecture that is decoupled from computing logic, allowing data to scale independently and recover quickly. With the development of cloud native, the discrete architecture of computing and storage appears more and more frequently in various systems. Pulsar’s Broker layer is a layer of stateless computational logic that receives and distributes messages, while the storage layer consists of Bookie nodes that store and read messages.

Pulsar’s discrete computing and storage architecture enables unlimited horizontal expansion. If there are many producers and consumers in the system, it can directly expand the calculation of logical brokers without being affected by data consistency. Without this architecture, the computing logic and storage changes in real time during capacity expansion, which can easily be limited by data consistency. At the same time, the logic of the computing layer is complex and prone to errors, while the logic of the storage layer is relatively simple and has a low probability of errors. In this architecture, if errors occur at the computing layer, the storage layer can be recovered unilaterally.

Pulsar also supports tiered storage of data, allowing old messages to be moved to a cheaper storage solution while the most recent messages can be stored on SSD. This saves costs and maximizes resources.

Cluster architecture

The Pulsar cluster consists of multiple instances of Pulsar, including

  • Multiple Broker instances that receive and distribute messages
  • A ZooKeeper service that coordinates cluster configuration
  • BookKeeper server cluster Bookie for message persistence
  • Messages are synchronized between clusters through cross-region replication

Design principle

Pulsar uses a publish-subscribe design pattern (pub-SUB), in which a producer publishes messages to a topic and a consumer subscribing to a topic and sends an ACK after processing.

Producer

Delivery mode

There are two modes of sending messages: publishing messages to the broker in sync or async mode.

  • Sending a message synchronously means that after a Producer sends a message, the message is considered successful until the broker confirms it. If the message does not receive an acknowledgement, the message is considered to have failed to be sent.

    MessageId MessageId = producer.send(" Synchronous sent messages ".getBytes(standardCharsets.utf_8));Copy the code
  • Sending messages asynchronously is when a Producer sends messages, puts them in a blocking queue and immediately returns them. There is no need to wait for broker confirmation.

    CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync( "Asynchronously sent messages ".getBytes(StandardCharsets.utf_8));Copy the code

access

Pulsar provides a variety of different types of Topic access modes for Producer:

  • Shared

    By default, multiple producers can publish messages to the same Topic.

  • Exclusive

    Producers are required to access a Topic in exclusive mode, in which other producers fail to connect if a Topic already has producers.

    Topic has an existing exclusive producer: standalone-0-12

  • WaitForExclusive

    If the topic is already connected to a producer, the current producer is suspended until the producer has Exclusive access.

The access mode can be set in the following ways:

Producer<byte[]> producer = pulsarClient.newProducer().accessMode(ProducerAccessMode.Shared).topic("test-topic-1").create();
Copy the code

The compression

Pulsar supports compression of messages sent by Producer. Pulsar supports the following types of compression:

  • LZ4

    LZ4 is a lossless compression algorithm that provides compression speeds of > 500 MB/s per core and is scalable with multi-core cpus. It has extremely fast decoders, with speeds up to several gigabits /s per kernel, often reaching the RAM speed limit on multi-core systems.

  • ZLIB

    Zlib is intended to be a free, universal, legal-free — that is, patent-free — lossless data compression library that can be used on almost any computer hardware and operating system. The Zlib data format itself is portable across platforms.

  • ZSTD

    Zstandard is a fast compression algorithm that provides high compression rates. It also provides a special mode for small data called dictionary compression. The reference library provides a very wide range of speed/compression trade-offs, supported by extremely fast decoders.

  • snappy

    Snappy is a compression/decompression library. It does not aim for maximum compression, nor is it compatible with any other compression library; Instead, it aims for very high speed and reasonable compression.

The batch

Producer support bulk messages sent in a single request, can be (acknowledgmentAtBatchIndexLevelEnabled = true) to open the batch. After all messages of a batch have been confirmed by consumers, the messages of this batch will be confirmed and sent successfully. An unexpected failure may cause all messages for this batch to be redelivered, including those that have been confirmed to have been consumed.

To avoid this problem, Pulsar 2.6.0 introduced batch index validation, in which the broker maintains the validation status of each index and avoids sending confirmed messages to consumers. When all message indexes for this batch have been confirmed, the batch messages are deleted.

The message block

The following points should be noted when using chunkingEnabled=true:

  • Batch and chunking cannot be started at the same time. To start chunking, batch must be disabled in advance.
  • Chunking is supported only for persistent topics.
  • Chunking is supported only for exclusive and failover subscription types.

When chunking is enabled, if the Producer sends more messages than the maximum payload, the Producer splits the original message into blocks and sends each block to the Broker. The block messages are stored as normal messages. When a Consumer consumes a message, it finds that it is a segmented message, so it needs to cache the segmented message. After collecting all the segmented messages, it combines them into original pieces and puts them in the receiver queue for the client to consume. If the Producer fails to send all of the partitioned messages, the Consumer has an expiration mechanism. The default expiration time is one hour.

A partitioned message model of a producer and an ordered consumer:

Partitioned message model with multiple producers and one ordered consumer:

Consumer

A Consumer is a Consumer of messages and gets messages from the broker by subscribing to a specified Topic.

The Consumer sends a flow request to the broker to retrieve the message. There is a queue on the Consumer side to receive messages pushed from the broker. You can configure the queue size using the receiverQueueSize parameter. The default size is 1000). Each time consumer.receive() is called, a message is fetched from the buffer.

Receiving mode

Messages can be received synchronously or async from the broker. They can also be returned via a MessageListener: the user’s MessageListener is called back after the message is received.

  • Synchronously receiving messages will be blocked until a message is available.

    Message<byte[]> message = consumer.receive(); System.out.println(" Receive message content: "+ new String(message.getData())); consumer.acknowledge(message); // Confirm the consumption messageCopy the code
  • Receiving messages asynchronously will immediately return a future value. Use CompletableFuture. If CompletableFuture finishes receiving the message, it should then call receiveAsync(), otherwise it creates a backlog of receiving requests in the application.

    Cancel the returned Future before completion by calling.cancel(false) (CompletableFuture.cancel(Boolean)) to remove it from the backlog of requests received.

    CompletableFuture<Message<byte[]>> messageCompletableFuture = consumer.receiveAsync(); Message<byte[]> message = messageCompletableFuture.get(); System.out.println(" Receive message content: "+ new String(message.getData())); consumer.acknowledge(message); // Confirm the consumption messageCopy the code
  • The client library provides the listener implementation for the consumer. For example, the Java client provides a MesssageListener interface that calls the received method every time a new message is received.

    pulsarClient.newConsumer().topic("test-topic-1").messageListener((MessageListener<byte[]>) (consumer, MSG) -> {system.out.println (" accept message content: "+ new String(msg.getData())); try { consumer.acknowledge(msg); / / confirm consumption message} the catch (PulsarClientException e) {consumer. NegativeAcknowledge (MSG); // message consumption failed}}). SubscriptionName ("test-subscription-1").subscribe();Copy the code

Consumer to confirm

Successful confirmation:

When a Consumer has successfully consumed a message, it needs to send a message acknowledgement to the broker. Messages will not be deleted until all subscriptions have been confirmed. If you want to store messages that have been consumed successfully, you need to set a message saving policy. Otherwise Pulsar will immediately delete all messages confirming successful consumption.

For batch messages, you can confirm messages in either of the following ways:

  • The message is confirmed separately. With separate validation, the consumer needs to validate each message and send a confirmation request to the broker.
  • Messages are cumulative confirmations. With cumulative confirmation, the consumer only needs to confirm the last message it received. All messages in the flow up to the provided message are not redelivered to the consumer.

Failure confirmation:

When a Consumer fails to consume a message and wants to consume it again, it needs to send a negative acknowledgement to the broker. Indicating that the message was not consumed successfully, the broker redelivers the message. Messages are denied confirmation individually or cumulatively, depending on the type of consumption subscription:

  • In exclusive and failover subscription types, consumers only negate the last message they received.
  • In shared and Key_Shared subscription types, you can negate confirmation messages separately.

Confirm timeout:

If a message is not consumed successfully, you want to trigger the broker to automatically resend the message by using the unacknowledged message resend mechanism. It is recommended to use failure confirmation in preference to more accurately control the redelivery of a single message.

Dead-letter queue

Apache Pulsar has a built-in dead-letter queue feature that allows Apache Pulsar to automatically retry when a denial Ack is received after a message processing failure. If the number of retries is exceeded, messages can be placed on a dead-letter queue to ensure that new messages can be processed.

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .build())
              .subscribe();
Copy the code

Consumption model

Apache Pulsar provides the unification of queue model and stream model. Only one piece of data needs to be saved at the Topic level. The same piece of data can be consumed multiple times. There are four subscription types in Apache Pulsar: exclusive, shared, failover, and key_shared. These types are shown below.

Topic

Topic named

In Pulsar a Topic is responsible for passing messages from producer to consumer, and a Topic name is a URL with a well-defined structure:

{persistent|non-persistent}://tenant/namespace/topic
Copy the code
  • persistent / non-persistent

    Represents the type of topic, which can be persistent or nonpersistent. The default type is persistent. Persistent topics save messages to disk, while non-persistent topics do not.

  • tenant

    The tenant of a topic in a Pulsar instance, the tenant is critical to the multi-tenant in Pulsar and is distributed in a cluster.

  • namespace

    Managing related topics as a group is the basic unit of managing topics. Each tenant can have one or more namespaces.

  • topic

    Topics in Pulsar are named channels and are used to transfer messages from producers to consumers.

Create topics that do not need to be displayed in Pulsar, and if you try to send or receive a message to a Topic that does not exist, create topics in the default tenant and namespace.

Topic partition

Ordinary topics are stored in a single broker, while topics can be divided into multiple partitions, stored in different brokers, and processed by multiple brokers, which greatly improves the throughput of topics.

As shown in the figure above, Topic1 is divided into 5 partitions, namely P0, P1, P2, P3 and P4. The five partitions are split into three brokers (Broker1, Broker2, Broker3). Since there are more partitions than brokers, the first two brokers handle two partitions each, and the third Broker only handles one, Pulsar automatically handles this partition distribution.

Routing mode must be specified when publishing to a partitioning topic. The routing pattern determines which partition each message should be published to.

There are three messagerOutingModes available:

  • RoundRobinPartition

    If no key is provided, the producer will publish messages in a circular fashion across all partitions to achieve maximum throughput. Note that the loop is not done for a single message, but is set to the same boundary as the batch delay to ensure that the batch is valid. If a key is specified on the message, the partition’s producer hashes the key and assigns the message to the specific partition.

  • SinglePartition

    If no key is provided, the partition producer randomly selects a partition and publishes all messages to that partition. If a key is provided on a message, the partition producer hashes the key and assigns the message to a specific partition.

  • CustomPartition

    Use the custom message router implementation that will be invoked to determine the partitioning of a particular message.

multi-tenant

The multi-tenant feature of Apache Pulsar can meet the management needs of enterprises. Tenant and namespace are the two core concepts of Apache Pulsar to support multi-tenant.

  • At the tenant level, Pulsar reserves appropriate storage space and application authorization and authentication mechanisms for specific tenants.
  • At the namespace level, Pulsar provides a number of configuration policies, including storage quotas, flow control, message expiration policies, and isolation policies between namespaces.

Cross-geographical replication

The cross-region replication mechanism provides redundancy for multiple data centers (DCS) in a large distributed system, preventing service failures. It also provides a foundation for cross-regional production and cross-regional consumption.

A hierarchical

Pulsar’s tiered storage feature allows older backlog data to be moved from BookKeeper to longer-term and cheaper storage, reducing storage costs while still allowing clients to access the backlog as if nothing had changed. You can configure a namespace size threshold policy to automatically migrate data to long-term storage.

component

Pulsar Schema Registry

Schema Registry enables producers and consumers to communicate Topic data structures through brokers without the need for external coordination mechanisms, thereby avoiding potential problems such as serialization and deserialization.

Pulsar Functions

Pulsar Functions is a lightweight computing framework that provides users with a FAAS (Function as a Service) platform with simple deployment, operation and maintenance, and simple API. The goal is to help users easily create various levels of complex processing logic without deploying a separate computing system.

Pulsar IO

Pulsar IO supports Apache Pulsar to interact with external systems such as databases and other messaging systems, such as Apache Cassandra. Users do not need to pay attention to implementation details and can run it quickly with a single command.

Source imports data from the external system to Apache Pulsar, Sink exports data from Apache Pulsar to the external system.

Pulsar SQL

Pulsar SQL is a query layer built on Top of Apache Pulsar that enables users to dynamically query all old data streams stored within Pulsar. Users can clean, transform and query data streams while injecting data into the same system, greatly simplifying the data pipeline.

Quick learning

Binary installation

Here is a simple example of Pulsar Demo, only install a Pulsar server, first download Pulsar through the following command:

Wget < https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz >Copy the code

After downloading the package to the local PC, run the following command to decompress the apache-pulsar-2.1.1-bin.tar. gz package:

Tar XVFZ apache - pulsar - 2.8.1 - bin. Tar. GzCopy the code

Then CD to apache-pulsar-2.8.1 containing the following directories:

  • Bin: command line tool for Pulsar, for example[pulsar](<https://pulsar.apache.org/docs/en/reference-cli-tools#pulsar>)and[pulsar-admin](<https://pulsar.apache.org/tools/pulsar-admin/>).
  • Conf: Pulsar configuration file, including broker configuration and ZooKeeper configuration.
  • Examples: Java JAR file that contains examples of Pulsar functions.
  • Lib: JAR file used by Pulsar.
  • Licenses:.txtIn PulsarCode baseLicense files exist in the form of various components.

Start Pulsar independently

Pulsar when you installed locally, you can use Pulsar [is] (< https://pulsar.apache.org/docs/en/reference-cli-tools#pulsar >) is stored in the bin directory of the command to start the local cluster, And specify that you want to start Pulsar in standalone mode.

$ bin/pulsar standalone
Copy the code

If you have successfully started Pulsar, you will see INFOlevel log messages like this:

2017-06-01 14:46:29.192 - INFO - [main:WebSocketService@95] - Configuration Store Cache started 2017-06-01 14:46:29.192 - INFO - [main: AuthenticationService @ 61] - Authentication is disabled the 14:46:29 2017-06-01, 192 - INFO - [main:WebSocketService@108] - Pulsar WebSocket Service startedCopy the code

A send/receive message case

public class PulsarDemo { private static PulsarClient PULSAR_CLIENT = null; Build (); static {try {// create pulsar client PULSAR_CLIENT = pulsarClient.builder ().serviceurl ("pulsar://127.0.0.1:6650").build(); } catch (PulsarClientException e) { e.printStackTrace(); }} public static void main(String[] args) throws PulsarClientException {// Create Producer Producer<byte[]> Producer = PULSAR_CLIENT.newProducer().topic("test-topic-1").create(); // MessageId MessageId = producer.send(" Synchronous sent messages ".getBytes(standardCharsets.utf_8)); System.out.println(" message sent successfully, messageId: "+ messageId); Consumer<byte[]> Consumer = pulsar_client.newconsumer (). Topic ("test-topic-1") .subscriptionName("test-subscription-1").subscribe(); Message<byte[]> Message = consumer.receive(); System.out.println(" Received message content: "+ new String(message.getData())); // Acknowledge that the consumption was successful so that Pulsar can delete the message consumer.acknowledge(message); // Close the client producer.close(); consumer.close(); PULSAR_CLIENT.close(); }}Copy the code

Output:

Description Message ID: 66655:0:-1:0 Received message content: Synchronized messageCopy the code

reference

The Pulsar Apache’s official website

StreamNative Product Manual

Apache Pulsar and Apache Kafka performance comparison analysis in financial scenarios

A Comprehensive Comparison between Pulsar and Kafka

Comprehensive Comparison between Pulsar and Kafka (Part 2)

Kafka is out of date, corner meet Pulsar!