Business background

With the rapid development of mobile cloud, more and more customers put forward more requirements for cloud native messaging middleware, which can focus on the application, mainly in the following aspects:

  • Rapid elastic scaling enables computing and storage resources to be expanded on demand to meet the requirements of different traffic peaks and storage specifications. Data balancing is not required for online expansion
  • Provides high security protection and has identity authentication and authorization mechanisms to ensure data security
  • It can accurately discover problems in real time and support monitoring of instance health, throughput, message accumulation and other dimensions
  • It also supports IPv4 and IPv6 dual-stack environments to meet the requirements of different network environments
  • Tenant resource isolation at the instance level provides finer – grained security protection
  • Supports cross-region replication to ensure data synchronization between clusters in a stable and real-time manner

In view of the above demands, and in order to unify public cloud and private cloud architectures, mobile cloud chooses Apache Pulsar and Kubernetes to build cloud native messaging system with excellent performance, security and stability, elastic scaling and easy operation and maintenance.

The overall architecture

Based on Apache Pulsar’s cloud-native architecture for computing and storage separation, we physically separated the Kubernetes cluster for computing and the BookKeeper cluster for storage as follows:

For simplicity, let’s take shared Zookeeper as an example. Zookeeper cluster in Kubernetes) and directly use NodePort’s service exposure method to provide Proxy service to the client (you can also choose appropriate LB cloud service or use open source LB, such as: Metallb: Metallb. Universe. Tf /).

The ground practice

🔧 How to share Bookie resources

We expect multiple instances of Pulsar in Kubernetes to be able to share the underlying Bookie storage resources at the same time so that computing and storage can be separated more quickly. Prior to version 2.6.0, Pulsar instances did not support setting chroot paths when initializing metadata, and only supported fixed ledger paths, not existing BookKeeper clusters. To do this, we optimized the initialize-cluster-metadata command to support setting the chroot path, And bookkeeperMetadataServiceUri is added in the broker configuration parameter to specify the connection information of BookKeeper cluster. (see:

  • PR-4502:

Github.com/apache/puls…

  • PR-5935:

Github.com/apache/puls…

  • PR-6998:

Github.com/apache/puls… This allows multiple Instances of Pulsar to share an existing BookKeeper cluster, with the following metadata structure:

[zk: localhost:2181(CONNECTED) 1] ls /pulsar
[pulsar1, pulsar2]
[zk: localhost:2181(CONNECTED) 2] ls /pulsar/pulsar1
[counters, stream, bookies, managed-ledgers, schemas, namespace, admin, loadbalance]
[zk: localhost:2181(CONNECTED) 3] ls /bookkeeper
[ledgers]
[zk: localhost:2181(CONNECTED) 4] ls /bookkeeper/ledgers
[00, idgen, LAYOUT, available, underreplication, INSTANCEID, cookies]
Copy the code

How are 🔧 services exposed

Pulsar has introduced an optional Proxy component to address the client’s inability to connect directly to the Broker and the administrative overhead associated with direct connection, such as when running in a cloud environment or in a Kubernetes cluster (see pip-1: Github.com/apache/puls… In addition, yamL templates for each component are available on Pulsar’s official website (see Pulsar-helm-chart: github.com/apache/puls… This is a quick way to build a Pulsar cluster on top of a Kubernetes cluster. We started with the following architecture:

During this period, there were some minor problems, for example, the Proxy could not start properly (the condition of initContainers in the Proxy’s StatefulSet is that at least one Broker is in operation), as follows:

14:33:06. 894 / main - EventThread INFO org. Apache. The pulsar. Zookeeper. ZooKeeperChildrenCache - reloadCache called in ZookeeperChildrenCache for Path /loadbalance/brokers 14:33:36.900 [main-eventThread] WARN org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: Null at java.util.concurrent.Com pletableFuture. TimedGet (CompletableFuture. Java: 1771) ~ [? : 1.8.0 comes with _191] the at Java.util.concurrent.Com pletableFuture. Get (CompletableFuture. Java: 1915) ~ [? : 1.8.0 comes with _191] the at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) Pulsar ~ [org. Apache. - the pulsar - they are - utils - server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) ~ [org. Apache. Pulsar -- pulsar -- proxy server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) ~ [org. Apache. Pulsar -- pulsar -- proxy server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:85) Pulsar ~ [org. Apache. - the pulsar - they are - utils - server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at Java.util.concurrent.Com pletableFuture. UniAccept (CompletableFuture. Java: 656) ~ [? : 1.8.0 comes with _191] the at Java.util.concurrent.Com pletableFuture $UniAccept. TryFire (CompletableFuture. Java: 632) ~ [? : 1.8.0 comes with _191] the at Java.util.concurrent.Com pletableFuture. PostComplete (CompletableFuture. Java: 474) ~ [? : 1.8.0 comes with _191] the at Java.util.concurrent.CompletableFuture.com plete CompletableFuture. Java: (1962) ~ [? : 1.8.0 comes with _191] the at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:434) Pulsar ~ [org. Apache. - the pulsar - they are - utils - server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:618) [org. Apache. Pulsar - pulsar - they are - server - the SNAPSHOT. Jar: server - the SNAPSHOT] the at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510) [org. Apache. Pulsar - pulsar - they are - server - the SNAPSHOT. Jar: server - the SNAPSHOT]Copy the code

You can solve this problem temporarily by changing the podManagementPolicy policy from Parallel to OrderedReady (or changing the initContainers condition of the Proxy to specify that the number of copies of the Broker are running). This is actually a Proxy deadlock problem caused by Masahiro Sakamoto found and fixed (see PR-7690: github.com/apache/puls… Will be released in versions 2.6.2 and 2.7.0. In addition, in the process of constant pressure measurement, the Proxy will occasionally see continuous memory growth, Until Pod restart (Error in writing to the inbound channel. The Closing Java nio. Channels. ClosedChannelException: Null), after some verification and evaluation, we decided to implement Broker service exposure in Pod in other ways, for the following reasons (fYI) :

  • Proxy nodes consume additional CPU and memory resources
  • Forwarding business traffic through the Proxy to the Broker adds additional network overhead
  • If the production rate is too high, the Proxy becomes OOM and the cluster stability deteriorates
  • The Proxy itself does not have the load balancing capability and is not friendly to elastic scaling of instances

The Client can configure multi-hosts by configuring several Proxy urls. In fact, only these proxies are used. You can perform the following steps to verify the configuration:

1.2 the broker: broker3:6650, broker4:6650 2.2 proxy: proxy1:6650, proxy2:6650 3. Create multiple partitioned topics for production and consumption (ensure that the number of namespace bundles is an integer multiple of the broker). When proxy1:6650 is configured, Proxy1 has load (TCP connection), but Proxy2 has no load. 5. Client URL: When proxy1:6650 and Proxy2:6650 are configured, both proxies have load

🔧 can you connect directly to the Broker

Since the Broker’s service address registered in Zookeeper is a podIP or POD domain, clients outside the Kubernetes cluster are not directly accessible, so a service address is required that is visible to external clients. Pulsar introduces PIP-61 in version 2.6.0: github.com/apache/puls… To support the broadcast of multiple URL listening addresses, for example, you can set the following internal/external listening:

Pulsar advertisedListeners = internal: : / / broker - 0. Broker - headless. Pulsardev. SVC. Cluster. The local. : 6650, external: pulsar: / / 10.19 2.6.23:38068Copy the code

Here, we use pod domain (broker – 0. Broker – headless. Pulsardev. SVC. The cluster. The local.) as the communication between the broker address, The actual Worker node IP of Pod and the pre-allocated NodePort are used as the external communication address.

The DNS format for each Pod in StatefulSet is: statefulSetName-{0.. N1}.serviceName.namespace.svc.cluster.local.

  • StatefulSetName is the name of StatefulSet
  • 0.. N-1 is the serial number of Pod, starting from 0 to n-1
  • ServiceName Indicates the name of the Headless Service
  • Namespace indicates the namespace where the Service resides. The Headless Service and StatefulSet must reside in the same namespace
  • . SVC. Cluster. The local. For cluster Domain

To enable clients outside the cluster to connect directly to the Pod of the Broker, we maintain the mapping between the Worker node name and IP and the pre-allocated NodePort in ConfigMap. The command bin/apply-config-from-env.py conf/broker.conf can be used in the containers startup script of StatefulSet. Write the actual Worker node IP to be exposed and the pre-allocated NodePort port to Broker configuration advertisedListeners, Such Broker starts to register in the Zookeeper external correspondence address (external: pulsar: / / 10.192.6.23:38068) for cluster external client is visible. Among them, the more critical step is to through the environment variable will Pod information presented to the container, (refer to the Pod – Info: kubernetes. IO/docs/tasks /… For example, add the following configuration to the Broker’s StatefulSet yamL file:

env:
- name: PULSAR_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: PULSAR_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
Copy the code

In bin/apply-config-from-env.py, we can use this information to get the actual URL to expose based on the node and port information in ConfigMap and write it to the Broker configuration. Then register with Zookeeper to discover the actual service address.

In addition, PIP-61: github.com/apache/puls… Currently, only TCP service discovery is supported. However, advertisedListeners cannot provide help when accessing HTTP from outside the cluster. To solve this problem, we customized a webServiceExternalUrl configuration in the Broker, Then, the actual Worker node IP that needs to be exposed and the pre-allocated NodePort (HTTP protocol) are registered in Zookeeper through the above similar method, so that the Admin Client outside the cluster is visible. In version 2.6.0, we fixed the wrong address returned by the Broker when using advertisedListenerName for the client. (See PR-7737: github.com/apache/puls… In addition, we have fixed null-pointer issues when retrieving bundles and support for client Shell to specify listener names (see:

  • PR-7620:

Github.com/apache/puls…

  • PR-7621:

Github.com/apache/puls… Released in version 2.6.1.

Note: When a Service forwards a request, it needs to use a selector tag corresponding to the name of the Pod node, for example:

>  selector:
>    app: pulsar
>    component: broker
>    statefulset.kubernetes.io/pod-name: broker-0
Copy the code

The above is our exploration of directly connected Broker. Compared with Proxy, the scheme has the following advantages:

  • Reduces the extra overhead of computing resources
  • Improved throughput and stability of Pulsar clusters
  • It can support elastic scaling of Pulsar instances
  • You can use Broker mechanisms to do some clustering optimizations (for example, limiting the flow of the Broker to avoid OOM)

🔧 How to solve IPv4/IPv6 dual stack

For mobile cloud scenarios, more and more users expect cloud products to support IPv4/IPv6 dual-stack to meet the application requirements in various scenarios. Prior to version 2.6.0, Pulsar only supported IPv4 deployment, so we added IPv6 support. (See PR-5713: github.com/apache/puls… In addition, Kubernetes has added dual-stack support for Pods and Services from version 1.16+. (refer to the Dual – Stack: kubernetes. IO/docs/concep… With the above features, we just need to add IPv6 Service to the Pulsar instance in Kubernetes (spec.ipFamily set to IPv6). Then, the IPv6 service address visible to clients outside the cluster can be registered with Zookeeper through the similar service exposure scheme above, as follows:

Pulsar advertisedListeners = internal: : / / broker - 0. Broker - headless. Pulsardev. SVC. Cluster. The local. : 6650, external: pulsar: / / 10.19 Pulsar 2.6.23:38068, external - ipv6: : / / [fc66:5210: a152:12:0:10 1: BBBB: f027] : 39021Copy the code

Note that the system property java.net.preferIPv4Stack defaults to false, and on dual-stack systems that support IPv6, By default, Java sockets create an IPv6 Socket using the underlying native method (which supports both IPv4 and IPv6 host communication). When the TCP client java.net.preferIPv4Stack attribute is set to true, if you want to create a host for IPv6 Socket, throws an exception java.net.SocketException: Protocol family unavailable. Currently, Pulsar clients use IPv4 preferentially when connecting, and this property is set to true in current environment variables and scripts. (See PR-209: github.com/apache/puls… Therefore, in support of IPv6 dual stack, you need to put these scripts (i.e., pulsar, pulsar-admin, pulsar-client in the bin directory, Pulsar-perf) java.net.preferIPv4Stack is set to false. The bin/pulsar script is used when the Broker is started. Ensure that the Broker listens on both IPv4 and IPv6 ports as follows:

[root@k8s1 ~]# kubectl exec -it broker-0 -n pulsardev -- /bin/bash
[pulsar@broker-0 pulsar]$ netstat -anp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp6       0      0 :::8080                 :::*                    LISTEN      1/java
tcp6       0      0 :::6650                 :::*                    LISTEN  
Copy the code

In the Pod result, :::* indicates that both IPv4 and IPv6 are monitored. If 0.0.0.0:* is used, only IPv4 is supported. During the use process, we also fixed and optimized some problems, for example, the client does not support the IPv6 address of mult-hosts. (See PR-8120: github.com/apache/puls…

🔧 How can I easily manage instances

In order to meet the requirements of mobile cloud users for simple and controllable management, we also customized some management functions, which are listed as follows:

  • Pr-6456: Supports Broker configuration to disable automatic subscription creation

Github.com/apache/puls…

  • Pr-6637: Supports automatic subscription creation at the Namespace level

Github.com/apache/puls…

  • Pr-6383: Forcibly deletes a subscription

Github.com/apache/puls…

  • Pr-7993 and PR-8103: You can forcibly delete a Namespace or Tenant

Github.com/apache/puls… Github.com/apache/puls…

The future planning

Pulsar, a mobile cloud message queue, has entered the public testing stage, and the subsequent planning is as follows:

  • Added support for Connector ecology around mobile cloud
  • Added support for cross-domain replication
  • Optimize exposure listening for the HTTP protocol
  • Optimize flow limiting at the Broker level
  • Added support and optimizations for traditional message queuing functionality
  • Multiple Pulsar instances share Bookie storage isolation optimization
  • Publish more tech blogs

The author information

Sun Fangbin

Apache Pulsar Committer, Messaging Middleware Team Leader, China Mobile Cloud Capability Center.

Bonus: Click to apply for free mobile cloud Pulsar experience