At the TGIP-CN live event on April 11th, we invited StreamNative engineer 徐昀泽 to share a preview of the new KoP 2.8.0 features. Below is a concise text version of the video shared by 徐昀泽 for your reference.

A preview of KoP 2.8.0 features was shared by 徐昀泽, a software engineer from StreamNative, on TGIP-CN on April 11. The following is a concise text version of the shared video for your reference.

Today’s content is “KoP (Kafka on Pulsar) 2.8.0 New feature Preview”. First of all, I would like to introduce myself briefly: I work for StreamNative, as a Contributor for Apache Pulsar and the main maintainer of KoP.

About the KoP version number specification

First, let’s talk about the KoP version number.

Apache Pulsar has the Major Release. Due to the chaos of KoP’s early version management, the version number of KoP was basically the same as that of Pulsar since 2.6.2.0. The KoP Master branch will update the dependent Pulsar version irregularly. That way, if you want some new functionality, you can add a PR in Pulsar, and KoP can rely on this method. KoP 2.8.0 is a release that can be adapted for production.

Today, I will start the live broadcast for you from the following four main points:

  • Why KoP
  • The basic implementation of KoP
  • Recent progress in the KoP 2.8.0-Snapshot release
  • Recent plans and future prospects

Kafka Vs Pulsar

First of all, I would like to talk about the two systems Kafka and Pulsar. Apart from some miscellaneous features, the two systems are very similar, but the biggest difference is their storage model.

Kafka’s Broker is both computational and stored. The so-called calculation refers to the abstraction of the data sent by the Client into different topics and partitions, and may also do some processing such as schema. After processing, the message will be written to the storage. Kafka’s Broker writes directly to the file System on the machine. Pulsar, on the other hand, writes to a Bookie cluster, where each Bookie node is peer to peer.

Hierarchical storage brings many benefits, such as adding a Broker if you want to increase throughput. If you want to increase disk capacity, you can add Bookie. Since each node is peer, there is no need to rebalance or have a Follower as Kafka’s Leader. While that’s not the point of this Talk, the biggest architectural difference is that Kafka writes to a local file and Pulsar writes to a Bookie.

On the other hand, although I think there is no absolute superiority between the two, people have the freedom to choose. I’m sure there are plenty of scenarios where Pulsar could be used instead of Kafka.

Migrate from Kafka to Pulsar

If I want to migrate from Kafka to Pulsar because I like some of the advantages of Pulsar, what problems will I encounter?

  1. Promote services to replace clients?
  • The business says it’s too much trouble to change.
  • Pulsar adaptors? (Pulsar has released an Adaptor, Kafka code does not need to be changed to change maven dependencies)
  • It looks good, but I’m not using a Java client.
  • I wouldn’t bother, but I only know PHP.
  1. What if users directly use Kafka connectors (hundreds of them) to connect to external systems?

  2. What if users use external system connectors to connect to Kafka?

KoP (Kafka on Pulsar)

The KoP (Kafka on Pulsar) project was born in the face of the above multiple problems of migrating from Kafka to Pulsar. KoP introduces the Kafka protocol processing plug-in into Pulsar Broker, thus enabling Apache Pulsar to support the native Apache Kafka protocol. With KoP, users can take advantage of Pulsar’s power by migrating existing Kafka applications and services to Pulsar without changing the code. About the background of KoP project, you can learn about KoP related materials, which will not be described here.

As shown above, starting with Pulsar 2.5.0, a Protocol Handler is introduced that runs on top of the Broker’s services. The default is that the Pulsar Protocol Handler is really just a concept that communicates with the Pulsar client. The Kafka Protocol Handler is loaded dynamically and is configured to load a layer of plug-ins that communicate with the Kafka client.

Using KoP is as simple as putting the NAR package of the Protocol Handler into the Protocols subdirectory under the Pulsar directory, adding the configuration to either broker.conf or standalone. Port 9092 is started by default, similar to Kafka.

Currently, KoP supports:

  • Java > = 1.0
  • C/C++: librdkafka
  • Golang: sarama
  • NodeJS:
  • Other rdKafka-based clients

Protocol Handler

The Protocol Handler is actually an interface, and we can implement our own Protocol Handler. Broker startup process:

Load the Protocol Handler from the directory, load the Class, use the Accept and protocolName methods to verify this, and follow three steps:

  • initialize()
  • start()
  • newChannelInitializer()

The first step is to load the configuration of the Protocol Handler. The Protocol Handler shares the same configuration as the Broker, so ServiceConfiguiation is also used here. The Start step is the most important because it passes in the BrokerService parameter.

BrokerService controls all resources for each Broker:

  • Link producers, subscriptions
  • The held topic and its corresponding managed Ledgers
  • Built-in Admin and client

The realization of the KoP

Topic & Partition

Kafka is similar to Pulsar in many ways. TopicPartition in Kafka is a string and an int; Pulsar is a little more complicated and has the following sections:

  • Persistent or not
  • The tenant
  • The namespace
  • The theme
  • Partition number

There are three configurations in KoP:

  • Default tenant: kafkaTenant=Public
  • Default namespace: kafkaNamespace=default
  • Prohibit to automatically create a non – partitioned topic: allowAutoTopicCreationType = partitioned

Why have a configuration that prevents automatic creation of non-partitioned topics? Because Kafka only has the concept of partitioned topic and not the concept of non-partitioned topic. If a Pulsar client is used to automatically create a topic, Kafka’s client may not be able to access the topic. Do some simple things in KoP to map the default tenant to the namespace independently.

Produce & Fetch requests

PRODUCE the request:

  • Find the PersistentTopic object (containing the ManagedLedger) by topic name.
  • Transform the message format.
  • Asynchronously write messages to Bookie.

The FETCH request:

  • Find the PersistentTopic object by topic name.
  • Find the corresponding ManagedCursor using Offset.
  • Reads an Entry from the position of the ManagedCursor.
  • The message is returned to the client after the Entry format is transformed.

Group Coordinator

A Group Coordinator is used to perform rebalance and determine the mapping between partitions and groups. Because a Group has multiple consumers, the Group Coordinator can determine which partitions the consumers access.

When a consumer joins (subscribes) a group:

  • A JoinGroup request is sent to notify the Broker that a new consumer has joined.
  • A SyncGroup request is sent for partition allocation.

The message is also sent to the Client, where the consumer makes a new request to retrieve some of the information assigned by the Broker. A Group Coordinator writes the group-related information to a particular topic.

This particular topic will be stored in a default namespace with a default number of partitions of 8. Kafka Group is basically equivalent to Pulsar Failover Subscription. If you want Kafka Offset to be recognized by the Pulsar client, you need to ACK the MessageId corresponding to the Offset. So there’s a component in KoP called OffsetAcker that maintains a set of consumers. Each time a Group Coordinator wants to ACK a partition, a consumer is created to ACK the Group.

The concept of a “namespace bundle” is mentioned here. A Group Coordinator determines the mapping between a consumer and a partition.

In Apache Pulsar, each broker has (own) Bundle ranges (as shown in the example above); Topics are hashed by name to one of the Bundle ranges, whose owner broker is the owner broker of the topic, so that the topic you subscribe to is connected to the broker. The bundle may split (which you can configure to prevent), and the Broker may hang, causing the bundle’s mapping to change. Therefore, in order to prevent the occurrence of these two problems, KoP registers a listener, which can sense the change of bundle ownership. Once the Bundle Ownership changes, the Group Coordinator calls a handler to process it.

Kafka Offset

Firstly, Kafka Offset and Pulsar MessageId are introduced. Kafka Offset is a 64-bit integer that identifies the location where the message is stored. Kafka messages are stored on the local machine, so the number of the message can be an integer. Pulsar stores messages on Bookie, which may be distributed across multiple machines, so Bookie uses Ledger IDS and Entry ids to indicate the location of messages. Ledger ID can understand the Segment in Kafka, and Entry ID is approximately equivalent to Kafka Offset. The Entry in Pulsar does not correspond to a single message, but a packaged message, thus generating Batch Index. Therefore, Ledger ID, Entry ID and Batch Index fields are required to mark a Pulsar message together.

Therefore, Kafka Offset cannot be simply mapped to Pulsar’s MessageID. Such simple processing may cause the loss of Pulsar messages. Before KoP 2.8.0, Kafka Offset was pieced together by assigning 20 bits, 32 bits and 12 bits to Pulsar LedgerID, Entry ID and Batch Index respectively (as shown in the figure above). This allocation strategy was feasible in most cases. The order of Kafka offset can be guaranteed, but it is still difficult to propose an “appropriate” allocation scheme in the face of MessageID splitting. There are several problems as follows:

  • For example, if 20 bytes are assigned to LedgerID, the LedgerID will run out when 2^20, and the Batch Index bytes will run out easily.
  • Only one entry can be read from the cursor one by oneMaximum offset delta exceededProblem;
  • Some third-party components (such as Spark) rely on the function of continuous offsets

In view of the above problems with Kafka Offset, StreamNative and Tencent engineers jointly proposed an optimization solution based on Broker entry metadata PIP 70: Introduce Lightweight broker entry metadata. The new solution can be seen on the right side of the following figure.

On the left side of the figure, Pulsar messages are composed of Metadata and Payload. Payload refers to written data, and Metadata refers to Metadata such as publishing time stamps. The Pulsar Broker writes the message to the Client and stores the message in the Bookie.

Pictured right: Improvements proposed by PIP 70 are shown on the right. In the new scheme, the Broker still writes messages to the Client, but to the Bookie is a Raw Message — what is a Raw Message? BrokerEntryMetadata is added to the original Message. As you can see from the figure above, clients cannot get Raw messages. Only brokers can get Raw messages. As mentioned earlier, the Protoctol handler obtains full Broker privileges, so the Protocol handler also obtains Raw messages. If the offset is placed in the Pulsar, the KoP can get the offset.

We did this: there are two fields in the Protocol buffer file, mainly the second field. Index corresponds to Kafka Offset, which is equivalent to implementing Kafka in Pulsar. There are two intercepters: ManagedLedgerInterceptor

   private boolean beforeAddEntry(OpAddEntry addOperation) {
        // if no interceptor, just return true to make sure addOperation will be 
initiate()
        if (managedLedgerInterceptor == null) {
            return true;
        }
        try {
            managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
            return true;
Copy the code

And BrokerEntryMetadataInterceptor.

    public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
        if (op == null || numberOfMessages <= 0) {
            return op;
        }
        op.setData(Commands.addBrokerEntryMetadata(op.getData(), 
brokerEntryMetadataInterceptors, numberOfMessages));
        return op;
    }
Copy the code

AddOperation contains the bytes and number of messages sent from producer, so the interceptor intercepts all produced messages. The role of the Commands. AddBrokerEntryMetadata is a BrokerEntryMetadata before OpAddEntry data. The reason for this is to make it easier to parse, so you can read BrokerEntryMetadata first if the field is a Magic Number, then you can read BrokerEntryMetadata, otherwise you can parse normal Metadata using the normal protocol. BrokerEntryMetadataInterceptor equivalent of interceptors on the Broker.

Therefore, it is easy to implement consecutive offsets based on BrokerEntryMetadata in KoP:

  • FETCH request: Just read Bookie and parse BrokerEntryMetadata;
  • PRODUCE request: pass the ManagedLedger into the asynchronous Bookie writing context and get the Offset from the ManagedLedger interceptor
  • COMMIT_OFFSET request: write topic unchanged for __consumer_offsets and binary lookup for ManagedLedger for the cumulative Acknowledgement of Pulsar.

In view of the above changes, the following configuration must be performed in KoP 2.8.0 to ensure that the Offset operation works properly:

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Copy the code

Encoding and decoding of messages

This is also an important part of the KoP 2.8.0 improvements.

Before KoP 2.8.0, messages produced and consumed by KoP need to be decompressed and batch decompressed, resulting in serious operation delay. We also raised the question: why should KoP be compatible with Pulsar clients? If you migrate from Kafka to Pulsar, in most cases only the Kafka client is likely to exist, and it is unlikely that there will be any interaction between the Kafka client and Pulsar client, making codec of messages unnecessary. When producing messages, ByteBuffer within MemoryRecords is written directly to the Bookie.

It is relatively different in message consumption. We use ManagedCursor to read, and also need to convert several entries into a ByteBuf. However, this method still has high overhead in practical application scenarios. Further investigation found that this was generated when appendWithOffset recalculated the checksum for each message. If the batch number was large, too many computations were performed and unnecessary overhead was incurred. In response to this problem, the BIGO team members submitted a simplified version of appendWithOffset (shown below) that removed unnecessary actions, also based on the previously submitted continuous Offset improvements.

Performance Testing (WIP)

The performance test is still in the WIP (Work in Progress) stage, and some problems have been found. First of all, in the graph peak below, the end-to-end delay is 6ms versus 4ms, which is within the acceptable range. However, in the follow-up investigation, I found that the full GC was often as high as 600 ms, and even the delay was even higher. We are investigating this problem.

The following figures are the monitoring of HandleProduceRequest, ProduceEncode, MessageQueuedLatency and MessagePublish respectively. From the monitoring point of view, HandleProduceRequest (from the start of processing of the PRODUCE request to the time when all the messages of this request are successfully written to the Bookie) takes about 4 ms, similar to the Pulsar client but with one less network round trip.

We mainly look at the time to encode ProduceEncode (Kafka message encoding time), my test is using Kafka EntryFormat, you can see that it only consumes less than 0.1ms time; If Pulsar’s EntryFormat is used, the monitoring results are in the range of a few tenths of ms ~ ms.

There is a bit of a problem with this implementation, because we are still using one queue, so we have MessageQueuedLatency as shown in the following figure. MessageQueuedLatency starts from the message queue for each partition to the time it is ready to send asynchronously. We doubt whether the queue is causing the performance deterioration, but from the perspective of monitoring, the delay of 0.1ms has little effect.

Finally, MessagePublish is a Bookie delay, that is, the time from the time a message from a single partition is sent asynchronously to the time it is successfully written to the Bookie. Monitoring results are good, so we will investigate the source of the GC problem in the near future.

KoP Authentication

Prior to version 2.8.0

In the actual production environment, Authentication must be supported if you want to deploy to the cloud. Prior to 2.8.0, KoP’s support for Authentication was relatively simple, limited to SASL/PLAIN, which is based on Pulsar’s JSON Web Token Authentication, in addition to the basic configuration of the Broker. SaslAllowedMechanism =Plain is an additional configuration. The client needs to enter the namespace and token as the JAAS user name and password.

security.protocol=SASL_PLAINTEXT    # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
Required username=''public/default'' password=''token:xxx'';
Copy the code

Support the 2.0

Recently, KoP 2.8.0 has supported OAuth 2.0 for authentication, which is the SASL/OAUTHBEARER mechanism. Just to make it simple, it uses a simple third-party service. First, the Client obtains an authorization Grant from the Resource Owner. The Resource Owner can be an authorization code similar to the wechat public account, or a real person can give the Grant in advance. Then Grant to Authorization Server, namely the Server of OAuth 2, and obtain Access Token through Authorization code to Access Resource Server, namely the Broker of Pulsar. Boker performs Token authentication. The way to obtain the Token is third-party authentication, which is relatively secure.

KoP’s default Handler is the same as Kafka’s. Like Kafka, KoP also needs to configure Server Callback Handler on the broker side for token validation:

  • KopOauth2AuthenticateCallbackHandler: handler class
  • KopOauth2ConfigFile: indicates the path of the configuration file

This uses the JAAS method, using a separate configuration file. KoP provides an implementation class for validation based on the AutnticationProvider configured for the Pulsar Broker. Because KoP has Broker Service, it has all the permissions of the Broker and can call the provider authentication method configured by the Broker for authentication. Therefore, you only need to set Auth.validate. method= in the configuration file, which corresponds to the return value of the provider’s getAuthNa me method. If JWT authentication is used, the method is the token. This method may be different with OAuth 2 authentication.

The client

For Kafka clients, KoP provides an implementation of the Login Callback Handler. Kafka Java client OAuth 2.0 authentication:

sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
    required oauth.issuer.url="https://accounts.google.com"\
    oauth.credentials.url="file:///path/to/credentials_file.json"\
    oauth.audience="https://broker.example.com";
Copy the code

The Server Callback is used to authenticate the client token. The Login Callback Handler obtains the third-party token from the OAuth 2 service. My implementation is based on Pulsar’s implementation and is configured according to Kafka’s JAAS. There are three configurations to be configured: issueUrl, credentialsUrl, and Audience. It has the same meaning as Pulsar’s Java client authentication, so refer to Pulsar’s documentation. Pulsar Java Client OAuth 2.0 Certification:

String issuerUrl = "https://dev-kt-aa9ne.us.auth0.comH;
String credentialsUrl = "file:///path/to/KeyFile.json";
String audience = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";
PulsarClient client = PulsarClient.builder() 
    .serviceUrl("pulsar://broker.example.com:6650/") 
    .authentication(
        AuthenticationFactoryOAuth2.clientcredentials(issuerUrl, credentialsUrl, audience)) .build();
Copy the code

So KoP’s support for OAuth 2 is that it provides a client-side and a default server-side Callback Handler. When Kafka uses OAuth 2 validation, you need to write your own Handler. But KoP is similar to Pulsar in that you don’t need to write your own Handler, right out of the box.

Other developments in KoP 2.8.0

  • Kafka Transaction Coordinator is migrated. To enable Transaction, add the following configuration:
enableTransactionCoordinator=true
brokerid=<id>
Copy the code
  • Based on the metrics PrometheusRawMetricsProvider added KoP custom. This feature was added by BIGO’s Hang Chen, the monitor just shown.
  • Advertised listeners are exposed to support Envoy Kafka Filter proxies. An unfriendly aspect of the previous KoP was that the configured Listener had to be the same as the Broker’s Advertised Listener. In this release we separate The Listener from the Advertised Listener and can support proxies such as Envoy proxies deployed to the cloud.
  • Improved support for Kafka AdminClient. This is something that has been overlooked before. In fact, some users are used to Kafka AdminClient. On the other hand, some users have configured components with AdminClient built in. If this protocol is not supported, it will affect the use of this protocol.

The recent plan

Pulsar 2.8.0 is slated for release by the end of April. Here are some performance testing issues to check out before the official release:

  1. Add more detailed metrics.
  2. Troubleshoot memory growth and full GC during pressure test.
  3. Conduct more systematic performance testing.
  4. Deal with recent feedback from the community.

reading

  • Pulsar Function Mesh
  • Zhai Jia, Apache Pulsar PMC: Outlook and Planning for Pulsar 2021

Click on thelink, get Apache Pulsar hardcore dry goods information!