Tencent Angel PowerFL Federal learning platform

Federation Learning, as the foundation technology of a new generation of ARTIFICIAL intelligence, will reshape the fields of finance, medical care and urban security by solving the problems of data privacy and data silos.

Tencent Angel PowerFL federal learning platform built on Angel machine learning platform, using Angel-PS to support trillion-level model training ability, a lot of computing on Worker to PS (parameter server) side; Angel PowerFL provides computing, encryption, storage, state synchronization and other basic operation interfaces for federated learning algorithm. The process scheduling module coordinates the task execution status of participants, and the communication module completes the transmission of all data in the process of task training. Angel PowerFL federal learning has started landing in Tencent financial cloud, Tencent advertising joint modeling and other businesses, and achieved initial results.

Angel Machine Learning platform: github.com/Angel-ML

Angel PowerFL’s requirements for federal communications services

Angel PowerFL Federal Learning Platform in the course of training tasks, the message communication between participants is very high requirements, the message system must be stable and reliable, maintain high performance and guarantee data security. During the training process, a large amount of encrypted data will be transmitted between participants through the communication module. Angel PowerFL has the following requirements for communication services:

➡️ Stable and reliable

Angel PowerFL’s learning task duration ranges from a few minutes to a few hours. Algorithm execution requires high data accuracy. Different algorithms also have different data transmission peaks, which requires that the communication module’s service is stable enough and data cannot be lost.

➡️ High performance transmission

Angel PowerFL is calculated using Spark. Concurrent execution of Executor generates a lot of intermediate data to be transmitted. The communication module needs to transfer the encrypted data to the other party in time, which requires the communication service to achieve low latency and high throughput.

➡️ Data Security

Although all Angel PowerFL data is encrypted via an encryption module, businesses participating in federated Learning may be distributed across different companies; For transmission across public networks, the communication module must be secure and not vulnerable to attacks.

Why Pulsar

In the technical pre-research of federated communication service, three technical solutions including RPC direct connection, HDFS synchronization, and MQ synchronization have been considered. Considering the high requirements on security and performance, THE RPC direct connection and HDFS synchronization scheme are excluded, and the MQ synchronization scheme is adopted.

There are many MQ services to choose from, such as Pulsar, Kafka, RabbitMQ, TubeMQ, etc. Given Angel PowerFL’s high requirements for stability, reliability, high-performance transmission and data security, we consulted the Tencent Data Platform MQ team and they recommended Pulsar to us.

Then we did some in-depth research on Pulsar and found that many of the features built into Pulsar were exactly what we needed for a messaging system. Pulsar Broker and Bookie use a layered architecture of computing and storage to ensure data stability and good performance. Pulsar supports geo-replication, which solves the problem of PowerFL synchronizing MQ across federation. Pulsar’s authentication and authorization mode also ensures transport security.

Cloud-native computing and storage layered architecture

Apache Pulsar is a next-generation cloud-native distributed message and event flow platform with a layered architecture for computing and storage: Pub/Sub related calculations are performed on brokers and data is stored on Apache BookKeeper.

This architecture has obvious advantages over traditional messaging platforms such as Kafka:

  • The Broker and Bookie are independent of each other and can be independently extended and fault-tolerant to improve system availability.
  • Partitioned storage is not limited by the storage capacity of a single node, and data is more evenly distributed.
  • BookKeeper storage is secure and reliable, ensuring that messages are not lost. In addition, BookKeeper supports batch disk flushing to achieve higher throughput.

Pulsar Geo­-replication

Native support for geo-replication allows you to replicate data synchronously/asynchronously across multiple Pulsar clusters in multiple data centers. You can also control to which clusters messages are replicated at the message level through setReplicationClusters.

In the figure above, whenever Producer P1, P2, and P3 publish messages to Topic T1 in Cluster A, Cluster B, and Cluster C, these messages are immediately replicated to the entire Cluster. Once replication is complete, Consumer C1 and C2 can consume the messages from their own cluster.

Horizontal scaling

Since Pulsar’s storage design is based on shards, Pulsar divides the topic partition into smaller chunks called shards. Each shard is stored as an Apache BookKeeper Ledger, so that the collection of shards that make up the partition is distributed across the Apache BookKeeper cluster. This is designed to allow us to manage capacity and horizontal scaling and to meet high throughput requirements.

  • ** Simple capacity management: ** The capacity of the subject partition can be expanded to the capacity of the entire BookKeeper cluster, and is not limited by the capacity of a single node.
  • ** Expansion is simple: ** Expansion does not require data rebalancing or replication. When a new storage node is added, the new node is used only for the new fragment or its copy, and Pulsar automatically balances the fragment distribution and traffic in the cluster.
  • ** High throughput: ** Write traffic is distributed across the storage tier, so there is no partition write contention for a single node resource.

After in-depth research, we decided to use Apache Pulsar on the Tencent Angel PowerFL federated learning platform.

Federated communication scheme based on Apache Pulsar

Federally learned services (Angel PowerFL calls them parties, and each Party has a different ID, such as 10000/20000) may be distributed in different departments of the same company (no network isolation) or in different companies (across the public network). Each Party replicates synchronously through Pulsar cross-region replication function. The overall design scheme is as follows:

Each training task of federated learning is connected to the Pulsar cluster of the Party through the producer and consumer of the message. The cluster name is fl-pulSAR -[partyID]. After the training task produces intermediate data to be transmitted, The producer sends this data to the local Pulsar cluster.

After receiving data, the Pulsar cluster sends the data to the user Party through the synchronous replication network channel established by the Pulsar Proxy. The consumers of Party will always listen to the topic corresponding to the training task. When data arrives, they will directly consume the data for the next calculation.

When Angel PowerFL performs a training task, the driver and each partition create a variable of type channel, which corresponds to the specific topic in Pulsar. The data to be exchanged will be sent to the topic by the producer.

Angel PowerFL supports multi-party federation, so there will be 2+ Pulsar clusters that need to replicate data synchronously. Each federated learning task specifies the participants through its respective Parties task parameters, and the producer calls the setReplicationClusters interface when sending messages to ensure that data is transmitted only between the participating parties.

In the communication module of Angel PowerFL, we make full use of geo-replication, topic limiting, Token Authentication and other functions of Pulsar. Let me describe in detail how to use Pulsar in the Angel PowerFL Federated Learning platform.

Geo-replication removes the Global ZooKeeper dependency

Deploying a complete set of Pulsar on the Angel PowerFL federated Learning platform relies on two ZooKeeper clusters, Local ZooKeeper and Global ZooKeeper. Local ZooKeeper is similar to The ZooKeeper in Kafka and is used to store metadata. Global ZooKeeper shares configuration information across multiple clusters in Pulsar.

In the Angel PowerFL scenario, before each Party joins, it must first deploy a Global ZooKeeper child node, or share a set of cross-company or cross-region public ZooKeeper, which not only increases the difficulty of deployment, but also increases the risk of attack. Not conducive to new parties joining.

Global ZooKeeper stores metadata, including cluster names, service addresses, and namespace permissions. Pulsar supports creating and joining new clusters. To remove the dependency on Global ZooKeeper, register Pulsar cluster information with local ZooKeeper by following two steps:

** Step 1: ** Register the Pulsar cluster newly joined to Party

# OTHER_CLUSTER_NAME = Pulsar cluster name # OTHER_CLUSTER_BROKER_URL = Pulsar cluster broker address./bin/ Pulsar -admin clusters create ${OTHER_CLUSTER_NAME} --url http://${OTHER_CLUSTER_HTTP_URL} --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}Copy the code

** Step 2: ** Grant the namespace used for training access to the cluster

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}


Copy the code

For newly joined parties, you can register them by providing the cluster name/service address of the corresponding Pulsar, and Geo-Replication can synchronize data from the registration information.

The Client added Token authentication

Pulsar, as the communication module for Angel PowerFL, has no user-level permission control. In order to further ensure the security of production and consumption data of the client, we added token authentication according to Pulsar Client Authentication Using Tokens Based on JSON Web Tokens. The Angel PowerFL training task needs to configure the Admin token in addition to the service address used by the current Party.

pulsar.apache.org/doc… Since the entire system of Angel PowerFL was deployed on Kubernetes, we prepared the Public/Private keys and other files needed by the Pulsar cluster through the container and registered them in K8S Secret.

Key docker run --rm -v "$(PWD)":/ TMP apachepulsar/pulsar all:2.5.2 /pulsar/bin/pulsar Tokens create-key-pair --output-private-key/TMP/fl-privately. key --output-public-key/TMP /fl-public Echo -n 'docker run --rm -v "$(PWD)":/ TMP apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar K8S kubectl create secret generic tokens :// TMP /fl-private.key --subject admin token-symmetric-key --from-file=TOKEN=admin-token.txt --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}Copy the code

Enable the multi-cluster topic automatic reclamation function

After Geo-replication is enabled in the Pulsar cluster, it is not possible to directly delete used topics through commands. However, the Angel PowerFL training task uses one task each time, and these topics will be useless after the task is finished. If the task is not deleted in time, a large number of topics will accumulate.

For topic by geo – open the replication copy, you can configure the brokerDeleteInactivetopicsEnabled parameters, open topic automatic recovery. To automatically reclaim useless topics, the following conditions must be met:

  • Current topics are not connected by producers or consumers
  • The current topic is not subscribed
  • The current topic has no information to hold

Angel PowerFL deployment of Pulsar cluster, through brokerDeleteInactivetopicsEnabled open topic automatic recovery. During the execution of the training task, each topic is processed after use according to the reclamation conditions. At the same time, we increased

BrokerDeleteInactivetopicsFrequencySeconds configuration, set the frequency of the recycling to 3 hours.

Optimized topic traffic limiting

Training tasks in Angel PowerFL have different traffic spikes for production data across different data sets/algorithms/phases of execution. The maximum data volume of a single task in the production environment exceeds 200 GB/hour. During training, if the Pulsar connection is broken or the production and consumption processes are abnormal, the entire training task needs to be restarted.

To avoid the risk of the Pulsar cluster being overwhelmed by a single training task, we used Pulsar’s current-limiting feature. Pulsar supports message-rate and byter-rate production traffic limiting policies, which limit the number of production messages per second and the size of production messages per second. Angel PowerFL cuts the data into multiple 4M messages and limits the number of messages produced by message-rate. In Angel PowerFL, we limited the namespace to 30 messages (less than <30*4=120M/s) :

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30


Copy the code

When we first tested the stream limiting function of message-Rate, there was an unrestricted situation (the stream limiting setting was invalid). Tencent data platform of MQ Pulsar team is responsible for colleagues to help screen together, found after setting topicPublisherThrottlingTickTimeMillis parameters, limit.

So we found a way to enable precise topic publish rate limits on the broker side, optimize stream limiting, and contribute back to the community. See PR-7078: Introduce precise Topic Publish Rate limits. Github.com/apache/pul….

Optimize the topic semantics configuration

Pulsar dynamically distributes topics to brokers based on the load of the broker cluster. If the broker owning that topic goes down, or if the broker owning that topic is overloaded, the topic is immediately reassigned to another broker; The process of reassigning an object is the topic owner, which means that the topic is closed and the owner is released.

Theoretically, topic semantics is adjusted by load balancing, and the client experiences minimal delay jitter, which usually lasts about 10ms. However, when Angel PowerFL was performing a training mission, a large number of connection anomalies due to the crashing topic were reported in the logs. The log shows that topic class is retry after retry, but with no success:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1sCopy the code

Start with the broker/namespace/bundle/topic of the four relations. A Bundle is a sharding mechanism for a Namespace in Pulsar. A namespace is sharded into a list of bundles, each containing a portion of the namespace’s overall hash range. Topics are not directly assigned to the broker, but are assigned to specific bundles by computing the hash code of the Topic. Each bundle is independently assigned to a different broker.

Angel PowerFL did not reuse the early task topic. A LR algorithm training task created more than 2000 topics, and each topic produced different data loads. We judged that the above disconnection problem was due to the short time (minimum task can be finished within 10 minutes, When multiple tasks are running at the same time, topic classes are created and used in large numbers. As a result, the load is unbalanced, and topic classes occur frequently. To reduce the frequency of topic semantics, we adjust the parameters of the Pulsar Bundle:

# increase broker can maximum number assigned topic loadBalancerBrokerMaxTopics = 500000 # enable automatic split namespace bundle LoadBalancerAutoBundleSplitEnabled = true # increased to trigger a split bundles topic loadBalancerNamespaceBundleMaxTopics = 10000 # trigger a split The number of messages bundle loadBalancerNamespaceBundleMaxMsgRate = 10000Copy the code

When creating a namespace, set the number of bundles to 64 by default.

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64


Copy the code

As a result of these adjustments, Angel PowerFL no longer has topic holden disconnection during task execution.

Pulsar on Kubernetes

All of Angel PowerFL’s services are deployed on Kubernetes via Helm. As one of the charts, Pulsar can make good use of the characteristics of K8S, such as resource isolation and rapid expansion and shrinkage. In Angel PowerFL’s practice of deploying Pulsar with Helm, we learned the following:

🎙️ Use Local Persistent volumes as storage resources

Pulsar is an IO sensitive service, especially the Bookie component, and it is recommended to use SSDS or separate disks in production environments. Angel PowerFL was running some large data set tasks, and Pulsar often had the exception “No Bookies Available”. Disk I/O usage is high during this period.

We use Local Persistent Volume to mount other components such as Bookie and ZooKeeper to separate disks, slowing down disk I/O competition. We have also tested that the PV storage of Pulsar is changed to Ceph and NFS, and the performance is not as good as using Local Persistent Volume directly.

Use NodeSelector 🎙 ️

During geo-replication, brokers need to access each other’s Pulsar Proxy container. Angel PowerFL has labelled the gateway machine separately and installed the broker on a gateway machine with extranets access via NodeSelector.

UseHostNameAsBookieID 🎙 ️ configuration

Bookie is a stateful component. After Bookie Pod is rebuilt, you need to configure useHostNameAsBookieID and ensure that the ID registered with ZooKeeper is the HOSTNAME of the Pod.

Future plans

Angel PowerFL has been using Pulsar for almost a year now, with the longest stable cluster running for more than half a year. There are two main plans for using Pulsar in the future.

👍 Upgrade Pulsar to 2.6.x

We are currently using Pulsar 2.5.2, because we will use the Key_Shared function of Pulsar to do Angel-PS Dr Recently. Version 2.6.0 happens to have an enhanced Key_Shared subscription mode, so we expect to upgrade to Pulsar 2.6.x in the next month. Github.com/apache/pul….

👍 Pulsar on K8S Multiple disks can be mounted

Angel PowerFL All services run on Kubernetes (except YARN computing resources used by tasks). Pulsar is deployed with other services as a chart and uses Local Persistent Volume as storage. However, currently Bookie only supports mounting one disk (directory), and we plan to add this feature for machines with multiple disks that are not more fully utilized.

conclusion

We describe the practice of using Pulsar as an Angel PowerFL communication module in an AI application scenario. During the implementation of the solution, we made full use of many built-in features of Pulsar and made relevant optimization according to our own requirements, such as removing the Global ZooKeeper dependency for Geo-Replication, adding token authentication for client, Enable multi-cluster topic automatic collection, and optimize topic flow limiting and topic configuration.

As a next-generation cloud-native distributed messaging and streaming platform, Pulsar has a number of attractive features and has been widely used in the live and short video, retail and e-commerce, media, finance and other industries. We look forward to new cases of Pulsar in different application scenarios.

To thank

Special thanks to the MQ team of Tencent Data Platform department for their technical guidance in the process of using Pulsar on Angel PowerFL platform. The team has years of expertise in Apache Pulsar and TubeMQ and actively contributes to the Pulsar community. The Pulsar community is very active and growing rapidly. We will continue to focus on and work closely with the Apache Pulsar community to contribute optimized features to the Pulsar community, and work with other users in the community to further improve and optimize Pulsar’s features and functions, so as to build a stronger and more complete Pulsar community.

Author’s brief introduction

Zhang Chao, senior engineer of Data Platform Department of Tencent, responsible for Angel PowerFL Federal Communications /PowerFL on K8S, etc. Together with the MQ team of Tencent Data Platform Department, he introduced Apache Pulsar into the PowerFL federated learning platform and started the application of Pulsar in machine learning.