0 x00 the

Horovod is a distributed training framework based on AllReduce. With its support for mainstream deep learning frameworks such as TensorFlow and PyTorch, as well as communication optimization, Horovod is widely used in data parallelization training.

Over the past dozen articles, we have analyzed every aspect of Horovod step by step. Next up is the mountain of Horovod on K8S.

The purpose of this article and the following articles is to learn the functions of Horovod on K8S through analysis and to sort out related concepts, hoping to find out the design ideas. So the way to write is: collate learning a lot of online articles, and then their own analysis of the code. I would like to express my deep thanks to the authors.

This article is the dessert and prerequisite of Horovod on K8S, introducing concepts and the TF-operator of the KubeFlow community.

Links to other articles in this series are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

Horovod (10) — Run on Spark

Horovod (11) — on Spark — GLOO scheme

Horovod (12) — a distributed training framework for deep learning horoVOd (12) — an overall architecture for elastic training

Horovod (13), a distributed training framework for deep learning, is a Driver for elastic training

Horovod (14) — Elastic training discovery node & State

Horovod (15) — Broadcast & Notification

Horovod (16), a distributed training framework for deep learning, is a Worker lifecycle for flexibility training

Horovod (17), a distributed training framework for deep learning, is fault-tolerant for elastic training

0x01 Background information

1.1 Kubernetes

Kubernetes, or K8s for short, is an abbreviation of the eight-character ubernete, which is replaced by eight. Kubernetes is an open source, used for the management of multiple hosts in the cloud platform container applications, the goal of Kubernetes is to make the deployment of container applications simple and efficient (),Kubernetes provides application deployment, planning, update, maintenance of a mechanism.

Kubernetes is an increasingly popular training option for deep neural networks because it offers the flexibility to use different machine learning frameworks through containers, as well as the agility to expand on demand.

When faced with complex model training or large amount of data, the computing capacity of single machine often cannot meet the requirements of computing power. By using distributed training frameworks such as Ali’s AiACC or the community’s Horovod, you can extend a single training task to support distributed training with just a few lines of code.

What is common on Kubernetes is that the TF-operator of the KubeFlow community supports the Tensorflow PS mode, or the Mpi-operator supports the MPI AllReduce mode of Horovod.

1.2 Container as scheduling unit

Why would you want to use a container as a scheduling unit for a deep learning system? Because container pull/start fast. The resource isolation effect is good. Abstractly, you can dispatch the container’s image as part of the job. Of course, the container will introduce gpu, network and other performance costs.

For example, Nvidia Gpus provide support for Docker. Nvidia – Docker can perform create and run operations instead of Docker. Here is the Nvidia – Docker architecture.

1.3 Kubeflow

Kubeflow is an open source Native Kubernetes platform for developing, orchestrating, deploying, and running scalable portable machine learning workloads. Kubeflow can run on any Kubernetes cluster.

Kubeflow is a machine learning kit. It is a set of technology stacks that run on K8s. This technology stack contains a lot of components, and the relationship between the components is relatively loose. We can use them together, or we can use some of them individually.

Kubeflow asks Kubernetes which machines it plans to assign to run each process in a distributed job, and then tells each process the IP addresses and ports of all the other processes. To ensure that processes on a job know each other.

Why do all processes need to know about each other? This is required by TensorFlow Ps-based distribution. The TensorFlow 1.x native distributed training feature lets all processes in a job execute the TensorFlow 1.x Runtime program. These processes communicate with each other and coordinate to form a “distributed Runtime” that interprets and executes the graph representing the deep-learning computation. At the beginning of distributed training, the Graph was broken down into subgraphs by the TensorFlow Runtime; Each process is responsible for executing one subgraph — if any process fails (possibly because it was preempted by a higher-priority job), the execution of the entire larger graph fails. So TensorFlow’s native distributed training capability is not fault-tolerant. However, it is possible to recover from an error — the TensorFlow API provides checkpoint capability; If a job fails, restart the job and continue the execution at the latest checkpoint.

1.4 Tensorflow on Kubeflow

Kubeflow supports two different distributed training approaches to the Tensorflow framework.

  • The first is the native Tensorflow architecture, which relies on a centralized parameter server for coordination between worker threads.
  • The second is a decentralized approach where worker threads communicate directly with each other via the MPI AllReduce primitive, without the use of a parameter server. NVIDIA’s NCCL library already effectively executes most MPI primitives on gpus, while Uber’s Horovod makes it easy to perform multi-GPU and multi-node training using TensorFlow. The second approach optimizes bandwidth better and scales better than the parameter server.

1.5 the Operator

An Operator is a Kubernetes concept used to package, deploy, and manage user tasks.

Operator can be simply understood as CRD + Controller.

  • Custom Resource Definition (CRD) is an extension type of Kubernetes. It is used to customize Resource Definition.
  • The Controller allows the user to manipulate the CRD.

In Java terms, operator is a Class, CRD is a member variable of the Class, and Controller is a member method of the Class.

1.6 TF – Operator

While KubeFlow provides a host of components that cover all aspects of machine learning, model training is certainly KubeFlow’s most important feature. KubeFlow provides the ability to train against a wide variety of machine learning frameworks. The method is to define a variety of operators, which are mainly used to manage tasks in machine learning or deep learning, such as how to manage and maintain multiple nodes of a task, how to manage Pod and task life cycle, how to tolerate fault and so on.

Tf-operator is an extended API provided by the open source community based on K8S, which provides the training capability of TensorFlow. As can be seen from the name, this implementation is a way similar to Job, and its characteristics are as follows:

  • Provide TensorFlow native Ps-worker architecture multi-machine training
  • It is recommended to start PS and worker together
  • Use service to discover services
  • The earliest Operator in the community

Since tF-operator is the earliest Operator in the community, it is necessary to look at it first.

0x02 TensorFlow Distributed

Since the TF-operator is designed to support the Tensorflow PS mode, we first introduce the Tensorflow distribution.

2.1 Parameter Server architecture

In Parameter Server architecture (PS architecture), nodes in a cluster are divided into two categories: Parameter server and worker. The parameter server stores the parameters of the model, and the work server calculates the gradient of the parameters. During each iteration, the worker server takes the parameters from the parameter server and returns the calculated gradients to the parameter server, which aggregates the gradients returned from the worker server, updates the parameters, and broadcasts the new parameters to the worker server.

The gradient update of pS-worker architecture has two ways: synchronous update and asynchronous update:

In synchronous training, all Worker devices are trained with different mini-batch data of the same Batch. After the gradient calculation of this Batch is completed for all devices, the model will update parameters according to all gradients, and then PS will deliver the updated model to each device.

In asynchronous training, no equipment needs to wait for the gradient calculation and parameter update of other equipment, all equipment calculate independently and update the gradient results to the central node (PS). Asynchronous training overall will train speed will be a lot faster, but asynchronous training is a very serious problem of gradient failure problem (stale gradients), start all the devices using the same parameters to train, but the asynchronous case, some equipment to complete the step after the training, may find the model parameters has been updated by other equipment, At this point the gradient calculated by the device is out of date.

2.2 Tensorflow PS – Worker

2.2.1 architecture

Here is just a general introduction, mainly for comparison with tF-operator.

TF mainly divides jobs into Parameter Server and Worker (because TF versions are different, there are special definitions in different stages, such as master or chief).

  • Parameter Job: Perform model related jobs, including model Parameter storage, distribution, summary, update; As the server of distributed training, wait for the terminals (supervisors) to connect.
  • Worker Job: Referred to as supervisors in the code comment of TensorFlow to perform training-related jobs, including inferential calculation and gradient calculation. If the number of parameters is too large for a single machine to handle, it may require multiple Tasks (dynamically understood, one process on the host, statically understood,TaskThat’s the code we wrote).
  • Chief supervisors: One of the computing terminals must be selected as the main computing terminal. The terminal is the first to start in the computing terminal, and its function is to merge the learning parameters of each terminal after operation, save and write them.
  • A Cluster is a collection of Jobs: a Cluster is a Cluster system.

Each specific role network id is unique, that is, distributed on a different IP machine (or the same host but different port number).

In the actual operation, the network building code of each role must be exactly the same. The distributed model process of PS-worker architecture is roughly as follows:

  • Pull: Each worker pulls the latest model parameters from PS according to the topological structure of the data flow graph
  • Feed: Each worker fills different batch data
  • Compute: Each worker computes the gradient based on the same model parameters and different batch data and obtains different gradient values
  • Push each worker to upload the gradient value calculated to PS
  • Update: PS collects the gradient values of all workers, calculates the average value, and updates the model parameters.

2.2.2 code

The specific logic is as follows:

  • TaskYou need to know which hosts are on the cluster and what ports they are listening on.tf.train.ClusterSpec()That’s what it describes.
  • thisClusterThere are twoJob(worker. Ps),workerThere are threeTask(That is, there are threeTaskperformTensorflow opOperation)
  • willClusterSpecAs an argumenttf.train.Server(), while specifying thisTasktheJob_nameandtask_index.
  • Since the same code is running on a different host, pass it injob_nameandtask_indexTo distinguish betweenps_hostsandworker_hostsIs the same for all hosts, used to describe clusters.
  • A TFS. Train. Server contains a set of local devices (GPUs, CPUs) that can be connected to other tasks’ IP :port (stored in a cluster), and a session target for distributing operations. And, most importantly, it creates a server that listens to the port, and if there is any data coming in, it executes it locally (starting the session target, calling the local device to perform the operation), and then returns the result to the caller.
  • To keep ps_server listening, we need to use server.join(). In this case, the process will block here. As for why ps_server joins when created, the reason is that the following code will specify parameters to ps_server for custody, so ps_server can quietly listen.
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker # jobs on hosts worker0, Worker1 and worker2. Cluster_spec = {" worker1 ": ["ps0:2222", "ps1:2222"], "worker": [" worker0:2222 ", "worker1:2222",  "worker2:2222"]} # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join()Copy the code

The slightly more complete code looks like this:

def main(_): Ps_hosts = flags.ps_hosts.split (", ") worker_hosts = flags.worker_hosts.split (", ") ") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Find the worker's master node, The node whose task_index is 0 is IS_CHIEF = (flags.task_index == 0) # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # ComputeCopy the code

By running the following, we can see that all we need to do is write a program on different hosts and pass in different parameters to make it run:

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1
Copy the code

0x03 TF-Operator

3.1 Design ideas of the TF-operator

After understanding the general operation of TF distributed, let’s take a look at the design ideas of TF-operator.

Design Doc TFJob K8s CRD

The goal is to make it easy to run TensorFlow training (especially distributed training) on Kubernetes (K8s). I recommend doing this by creating a K8s custom resource Descriptor (CRD) and associated controller. CRD is responsible for managing the K8s resources required to run the training operations.

Kubernetes makes it easier to manage processes by providing a process-centric, rather than VM-centric, view of the world. Kubernetes also provides basic building blocks for complex distributed applications. For example, K8s provides built-in support for DNS, health check, log collection, measurement collection, storage, and more.

In the K8s, the controller is responsible for ensuring that a set of Pods is functioning. Pod is the basic building block in K8s and describes one or more processes (same IP) that should be co-located. The K8s comes with a number of built-in controllers. You can ensure that N Pods run to a specific specification. The job controller can be used to run binaries.

The built-in controller is insufficient to run distributed TensorFlow jobs. TensorFlow is a stateful application; Each parameter server and worker needs to be uniquely addressable to support all the different distributed training modes. K8s has a statefulset. However, stateful sets are used for stateful services that run permanently (such as memory sharding caching services such as Redis), not for jobs that run to completion.

Thus, running distributed TF jobs on K8s today means piecing together a solution from the built-in primibles. Typically, this means manually managing multiple resources. For example, a user can create a stateful set for the parameter server, a stateful set for the worker, and a job for the master server.

To address the limitations of built-in resources, K8s supports custom resources (CRDS) and controllers. With CRD, it is easy to create controllers with the required semantics for a particular workload while hiding the user in the implementation. The K8s community quickly adopted this model, contributing large amounts of CRD for various workloads.

The K8s team that developed CRD and the various controllers was of the opinion that most controllers use a non-distributed, multi-threaded design and that scalability is not an issue.

TFJob CRD defines TFJob resources for K8s.

The TFJob resource is a collection of TfReplicas. Each TfReplica corresponds to a set of TensorFlow processes that play a role in the work;

I made a clear decision not to try to hide or replace the K8s abstraction. For example, each TfReplica contains a standard K8s PodTemplate to specify the process (including TF) to run in each copy. I did this because K8s already provides an API that is widely adopted and understood. Therefore, it is confusing to introduce new concepts to replace the concept of K8s. In addition, exposing PodTemplate makes it easy for TFJob users to take advantage of K8s features. For example, a TFJob user can use K8s to attach a volume to his TF process. This makes it easy to use TF in conjunction with any storage system supported by K8s, such as PDs, NFS, and so on.

3.2 architecture diagram

The specific architecture diagram is as follows:

3.2.1 What is Pod

So let’s look at the diagram, let’s look at the pod concept in the middle.

Pod is the smallest unit of k8S scheduling. Pod can be understood as a container group, and POD is equivalent to a logical host. After entering POD, it seems to enter a Linux host, and all commands are available (in Linux system). There are many containers in the “host”, and after entering, it seems to enter a Linux host. By default, the file system of each container is completely isolated from the other containers. Each POD has its own IP address. Containers within pods share the same IP and port space.

3.2.2 Why does the Service Exist

First, each Pod is assigned a separate IP address, and each Pod provides a separate Endpoint (Pod IP + ContainerPort) for clients to access. This access is limited to the internal IP address of the cluster.

Second, Pod’s life is limited, and the IP will most likely change if Pod restarts. When the Controller replaces the failed Pod with a new Pod, the new Pod is assigned a new IP address. This raises the question: If a set of Pods provides a service externally (such as HTTP), their IP is likely to change, so how do clients find and access the service?

Kubernetes’ solution is a Service.

Service is an abstraction. A Kubernetes Service logically represents a set of Pods, which are selected by the label. A Service logically abstracts a set of PODS (which do the same thing) into a single entry. It can be simply understood as doing a service load balancing.

The Service has its own IP, and that IP is constant. Clients only need to access the IP address of the Service, and Kubernetes is responsible for establishing and maintaining the mapping between the Service and the Pod. No matter how the back-end Pod changes, there will be no effect on the client because the Service remains the same. So the pod is usually accessed through a service. Core-dns assigns an internal virtual IP address to the service, so the internal service can visit the POD service using this IP address or the serviceName.

Let’s give an example of a service in the source code.

apiVersion: v1
kind: Service
metadata:
  annotations:
    prometheus.io/path: /metrics
    prometheus.io/scrape: "true"
    prometheus.io/port: "8443"
  labels:
    app: tf-job-operator
  name: tf-job-operator
spec:
  ports:
  - name: monitoring-port
    port: 8443
    targetPort: 8443
  selector:
    name: tf-job-operator
  type: ClusterIP
Copy the code

Now we see that we’ve created a Service named tf-job-operator, which will assign a Cluster IP, and that the Service will continue to listen for the Pod under the selector, This will update the Pod information to an Endpoints object named tf-job-operator, which is similar to the Pod set described above.

3.2.3 What is controller

Because Kubernetes’ existing Resource types don’t fit our needs, we need to extend them through the Custom Resource Definition mechanism.

Everything in K8S is resource, such as Deployment, Service and so on.

We can based on CRD (CustomResourceDefinitions) function of new resource, such as I want to customize a Deployment resource, provide different Deployment strategy.

We know that resources can be CURD via K8s RESTFUL apis, and the same is true for CRD created resources.

CRD only defines a resource, we also need to implement a controller, similar to deployment Controller and so on, that listens for CURD events of the corresponding resource and makes corresponding processing, such as POD deployment.

In fact, tF-operator is mainly an implementation of a Controller, which we will mainly explain below.

3.3 the Spec

Let’s start with a Job Spec, so you can follow it up in your code. For example, have a master, 2 workers, and 1 PS.

Kubectl api-versions kind: "TFJob" # Specifies the role/type metadata used to create the resource: TfReplicaType: MASTER template: specifies the number of replicas (replicas: 1) and replicas (replicas: 1). # Spec: containers: -image: gcr. IO /tf-on-k8s-dogfood/tf_sample:dc944ff - --log_dir=gs://my-job/log-dir restartPolicy: OnFailure - replicas: 2 tfReplicaType: WORKER template: spec: containers: - image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff name: tensorflow args: - --log_dir=gs://my-job/log-dir restartPolicy: OnFailure - replicas: 1 tfReplicaType: PSCopy the code

Let’s move on to the world of code.

3.4 TFJob

First, let’s take a look at the definition of TFJob, which can roughly correspond to the above Spec. Since the purpose of this paper is to understand the general idea, we can only analyze these.

// TFJob represents a TFJob resource. type TFJob struct { // Standard Kubernetes type metadata. metav1.TypeMeta `json:",inline"` // Standard Kubernetes object's metadata. // +optional metav1.ObjectMeta `json:"metadata,omitempty"` //  Specification of the desired state of the TFJob. // +optional Spec TFJobSpec `json:"spec,omitempty"` // Most recently observed status of the TFJob. // Populated by the system. // Read-only. // +optional Status commonv1.JobStatus `json:"status,omitempty"` } // TFJobSpec is a desired state description of the TFJob. type TFJobSpec struct { // RunPolicy encapsulates various runtime policies of the distributed training // job, for example how to clean up resources and how long the job can stay // active. RunPolicy commonv1.RunPolicy `json:"runPolicy,inline"` // SuccessPolicy defines the policy to mark the TFJob as succeeded. // Default to "", using the default rules. // +optional SuccessPolicy *SuccessPolicy `json:"successPolicy,omitempty"` // A map of TFReplicaType (type) to ReplicaSpec (value). Specifies the TF cluster configuration. // For example, // { // "PS": ReplicaSpec, // "Worker": ReplicaSpec, // } TFReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec `json:"tfReplicaSpecs"` // // A switch to enable dynamic worker EnableDynamicWorker bool `json:"enableDynamicWorker,omitempty"` }Copy the code

3.5 the role

Next, let’s look at the corresponding implementation of TF role in tF-operator.

3.5.1 track of definition

The first is role definition. The roles here basically correspond to the roles of Tensorflow, including many reserved for compatibility.

// setTypeNamesToCamelCase sets the name of all replica types from any case to correct case. func setTypeNamesToCamelCase(tfJob *TFJob) { setTypeNameToCamelCase(tfJob, TFReplicaTypePS) setTypeNameToCamelCase(tfJob, TFReplicaTypeWorker) setTypeNameToCamelCase(tfJob, TFReplicaTypeChief) setTypeNameToCamelCase(tfJob, TFReplicaTypeMaster) setTypeNameToCamelCase(tfJob, TFReplicaTypeEval) } const ( // TFReplicaTypePS is the type for parameter servers of distributed TensorFlow. TFReplicaTypePS commonv1.ReplicaType = "PS" // TFReplicaTypeWorker is the type for workers of distributed TensorFlow. //  This is also used for non-distributed TensorFlow. TFReplicaTypeWorker commonv1.ReplicaType = "Worker" // TFReplicaTypeChief is the type for chief worker of distributed TensorFlow. // If there is "chief" replica type, it's the "chief worker". // Else, worker:0 is the chief worker. TFReplicaTypeChief commonv1.ReplicaType = "Chief" // TFReplicaTypeMaster is the type for master worker of distributed TensorFlow. // This is similar to chief, and kept just for backwards compatibility. TFReplicaTypeMaster commonv1.ReplicaType = "Master" // TFReplicaTypeEval is the type for evaluation replica in TensorFlow. TFReplicaTypeEval commonv1.ReplicaType = "Evaluator" )Copy the code

3.5.2 Creating a Role

The NewTFJobV2 function creates different roles depending on the configuration.

As you can see here, when you generate a job, you basically follow the corresponding fields of the spec.

apiVersion: "kubeflow.org/v1alpha1"
kind: "TFJob"
metadata:
  name: "example-job"
spec:
  replicaSpecs:
Copy the code

Here is the function definition.

func NewTFJobV2(worker, ps, master, cheif, evaluator int) *tfv1.TFJob { tfJob := &tfv1.TFJob{ TypeMeta: metav1.TypeMeta{ Kind: tfv1.Kind, }, ObjectMeta: metav1.ObjectMeta{ Name: TestTFJobName, Namespace: metav1.NamespaceDefault, }, Spec: tfv1.TFJobSpec{ TFReplicaSpecs: make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec), }, } tfv1.SetObjectDefaults_TFJob(tfJob) if worker > 0 { worker := int32(worker) workerReplicaSpec := &commonv1.ReplicaSpec{ Replicas: &worker, Template: NewTFReplicaSpecTemplate(), } tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker] = workerReplicaSpec } if ps > 0 { ps := int32(ps) psReplicaSpec :=  &commonv1.ReplicaSpec{ Replicas: &ps, Template: NewTFReplicaSpecTemplate(), } tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypePS] = psReplicaSpec } if master > 0 { master := int32(master) masterReplicaSpec := &commonv1.ReplicaSpec{ Replicas: &master, Template: NewTFReplicaSpecTemplate(), } tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeMaster] = masterReplicaSpec } if cheif > 0 { cheif := int32(cheif) cheifReplicaSpec := &commonv1.ReplicaSpec{ Replicas: &cheif, Template: NewTFReplicaSpecTemplate(), } tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeChief] = cheifReplicaSpec } if evaluator > 0 { evaluator := int32(evaluator) evaluatorReplicaSpec := &commonv1.ReplicaSpec{ Replicas: &evaluator, Template: NewTFReplicaSpecTemplate(), } tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeChief] = evaluatorReplicaSpec } return tfJob }Copy the code

3.5.3 How to Distinguish the Master

Distinguish the master in the following ways.

func (tc *TFController) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
    if ContainChieforMasterSpec(replicas) {
        return rtype == tfv1.TFReplicaTypeChief || rtype == tfv1.TFReplicaTypeMaster
    }
    // else check if it is worker with index 0
    return rtype == tfv1.TFReplicaTypeWorker && index == 0
}
Copy the code

0x04 Contoller

So let’s get into the game and see how Controller is implemented.

4.1 Key concepts of K8S CRD

First we need to look at some key concepts of K8S CRD.

  • Listen for specific resource changes in the Apiserver, then store them in a thread-safe local cache, and finally call back to our own event Handler.
  • The local cache: The informer synchronizes data from the Apiserver (etCD) to memory in real time, reducing the query pressure on the APiserver. However, the informer is not real-time. The local data will lag behind the remote data, but will eventually be consistent with the ETCD. Therefore, you need to determine whether to use the Local cache or apiserver to obtain data in real time.
  • Lister: Provides CURD operations to access the local cache.
  • Controller: A logical concept that refers to the implementation of scheduling certain resources. We need to develop it ourselves. The main things a Controller does are:
    1. Implement event Handler processing resource CURD operation
    2. In event Handler, the workQueue class library can be used to deduplicate consecutive events of the same resource object and to retry failed events after handling exceptions, which is usually recommended.
  • Workqueue: A separate class library, optional but usually used for the reasons mentioned above. When the Event Handler is implemented, we need to put the changed resource identifier into the workqueue for consumption by the following processor.
  • Clientset: By default, Clientset can only be CRUD K8s resource types, such as deployments, daemonset, etc. The generated code generates a separate clientset for our custom resource (CRD), allowing us to use the structured code CURD custom resource. In other words, to operate the built-in resources, use the clientset provided by K8s, and to operate the CRD, use the clientset in the generated code.
  • Processor: The GO coroutine we implemented consumes the events in the workqueue, which provides deweighting by resource identifier.

4.2 define

TFController is defined as follows. It can be seen that several member variables are useful, and some of the above components are used respectively.

// TFController is the type for TFJob Controller, which manages
// the lifecycle of TFJobs.
type TFController struct {
    common.JobController
​
    // tfJobClientSet is a clientset for CRD TFJob.
    tfJobClientSet tfjobclientset.Interface
​
    // To allow injection of sync functions for testing.
    syncHandler func(string) (bool, error)
​
    // tfJobInformer is a temporary field for unstructured informer support.
    tfJobInformer cache.SharedIndexInformer
​
    // Listers for TFJob, Pod and Service
    // tfJobLister can list/get tfjobs from the shared informer's store.
    tfJobLister tfjoblisters.TFJobLister
​
    // tfJobInformerSynced returns true if the tfjob store has been synced at least once.
    tfJobInformerSynced cache.InformerSynced
}
Copy the code

4.3 the entrance

The entry point for the tF-operator logic code is the runWorker, which is essentially a circular call to processNextWorkItem.

func (tc *TFController) runWorker() {
    for tc.processNextWorkItem() {
    }
}
Copy the code

ProcessNextWorkItem reads a single work item from the WorkQueue and attempts to process it by calling syncHandler.

// processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (tc *TFController) processNextWorkItem() bool { obj, quit := tc.WorkQueue.Get() if key, ok = obj.(string); ! Ok {tc.workqueue. Forget(obj) return true} tfJob, err := tc.gettfJobFromKey (key) // Synchronize the tfJob to match the actual state to the required state. // Sync TFJob to match the actual state to this desired state. forget, err := tc.syncHandler(key) }Copy the code

4.4 syncHandler

The function of syncHandler is to synchronize jobs based on key, that is, to create a Job from the WorkQueue and process it locally.

We set tc.syncHandler = tc.syncTFJob, so we actually come to syncTFJob.

  • iftfjobThe expected value of theta has been fulfilledsyncTFJobI’m going to use what I’m givenkeyTo synchronizetfjob, which means it doesn’t want morepod/serviceTo be created or deleted:
  • EnableDynamicWorker is set here depending on the type.
  • The specific job is then processed by invoking ReconcileJobs.
// syncTFJob syncs the tfjob with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods/services created or deleted. // This function is not meant to Invoked with the same key. // This function cannot concurrently call func (TC *TFController) syncTFJob(key String) (bool, but also invoked with the same key. error) { namespace, name, err := cache.SplitMetaNamespaceKey(key) sharedTFJob, err := tc.getTFJobFromName(namespace, name) tfjob := sharedTFJob.DeepCopy() // Sync tfjob every time if EnableDynamicWorker is true tfjobNeedsSync := Tfjob. Spec. EnableDynamicWorker | | tc satisfiedExpectations (tfjob) / / set the default value for new tfjob. // Set default for the new tfjob. scheme.Scheme.Default(tfjob) if tfjobNeedsSync && tfjob.DeletionTimestamp == nil { // Call reconcileTFJobs to start TFJobs reconcileTFJobsErr = tc ReconcileJobs (tfjob, tfjob. Spec. TFReplicaSpecs, tfjob. The Status, &tfjob.Spec.RunPolicy) } return true, err }Copy the code

4.5 ReconcileJobs

The reconcileTFJobs checks and updates the Replicas of each given TFReplicaSpec, and deals with it accordingly. It can be considered as the master logic.

  • If the job is complete, delete all pods and services.

  • If the TFJob exceeds the backofflimit or exceeds the active deadline, delete all pods and services and set the status to Failed.

  • Otherwise, iterate over the TFReplicaSpecs section of the configuration file,

    • Start corresponding PODS for different types of nodes.
    • After starting the Pod, you also start a Service for it.
// If an error occurred while creating/deleting Pods/Services, it will request the TFJob. // ReconcileJobs checks and updates replicas for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting pods/services. func (jc *JobController) ReconcileJobs( job interface{}, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, jobStatus apiv1.JobStatus, runPolicy *apiv1.RunPolicy) error { metaObject, ok := job.(metav1.Object) jobName := metaObject.GetName() runtimeObject, ok := job.(runtime.Object) jobKey, err := KeyFunc(job) pods, err := jc.Controller.GetPodsForJob(job) services, Err: = jc. Controller. GetServicesForJob (job) oldStatus: = jobStatus. DeepCopy () / / if TFJob terminated, Delete all pods and services. if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) { // If the Job is succeed or failed, delete all pods and services. jc.DeletePodsAndServices(runPolicy, job, pods) jc.CleanupJob(runPolicy, jobStatus, Retrieve the previous number of retry previousRetry := jc.WorkQueue.NumRequeues(jobKey) activePods := k8sutil.FilterActivePods(pods) jc.recordAbnormalPods(activePods, runtimeObject) active := int32(len(activePods)) failed := k8sutil.FilterPodCount(pods, v1.PodFailed) totalReplicas := k8sutil.GetTotalReplicas(replicas) prevReplicasFailedNum := k8sutil.GetTotalFailedReplicas(jobStatus.ReplicaStatuses) if jobExceedsLimit { // If the Job exceeds backoff limit or is  past active deadline // delete all pods and services, then set the status to failed jc.DeletePodsAndServices(runPolicy, job, pods); err ! = nil { jc.CleanupJob(runPolicy, jobStatus, job); err ! = nil { jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, commonutil.JobFailedReason, failureMessage) commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage) return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) } else { // General cases which need to reconcile if jc.Config.EnableGangScheduling { minAvailableReplicas := totalReplicas _, err := jc.SyncPodGroup(metaObject, minAvailableReplicas)} Start corresponding PODS for different types of nodes. // After starting the Pod, start a Service for it. // Diff current active pods/services with replicas. for rtype, spec := range replicas { err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas) err = jc.Controller.ReconcileServices(metaObject, services, rtype, spec) } } err = jc.Controller.UpdateJobStatus(job, replicas, &jobStatus) // No need to update the job status if the status hasn't changed since last time. if ! reflect.DeepEqual(*oldStatus, jobStatus) { return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) } return nil }Copy the code

The current logic is as follows:

             +------------+
             | runWorker  |
             +-----+------+
                   |
                   |
                   v
          +--------+------------+
          | processNextWorkItem |
          +--------+------------+
                   |
                   |
                   v
              +----+------+
              | syncTFJob |
              +----+------+
                   |
                   |
                   v
           +-------+--------+
           | ReconcileJobs  |
           +-------+--------+
                   |
                   |
                   v
          +--------+---------+
          |                  |
          |                  |
          v                  v
+---------+---------+  +-----+--------+
|                   |  |              |
| ReconcileServices |  |ReconcilePods |
|                   |  |              |
+-------------------+  +--------------+
​
Copy the code

Let’s talk about handling Pod and Service respectively.

4.6 handle Pod

4.6.1 ReconcilePods

ReconcilePods checks and updates the POD for each given TFReplicaSpec.

For example:

  • Initialize the replica status.
  • If the master POD exists, select master POD. If there is no master, the first worker POD is selected as master.
  • CreateNewPod to create a new pod;
  • Or remove pod;
// reconcilePods checks and updates pods for each given TFReplicaSpec. // It will requeue the tfjob in case of an error while creating/deleting pods. func (tc *TFController) ReconcilePods( job interface{}, jobStatus *commonv1.JobStatus, pods []*v1.Pod, rtype commonv1.ReplicaType, spec *commonv1.ReplicaSpec, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, ) error { tfJob, (* tfv1.tfJob) // Convert ReplicaType to lower string. rt := strings.tolower (string(rtype)) // Obtain all pods of rtype.  pods, err := tc.FilterPodsForReplicaType(pods, rt) numReplicas := int(*spec.Replicas) masterRole := false initializeReplicaStatuses(jobStatus, rtype) // GetPodSlices will return enough information here to make decision to add/remove/update resources. // For example, let's assume we have pods with replica-index 0, 1, 2 // If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a pod with replica-index 3 will be created. // If replica is 1, return a slice with size 3. [[0],[1],[2]], pod with replica-index 1 and 2 are out of range and will be deleted. podSlices := tc.GetPodSlices(pods, numReplicas, logger) for index, podSlice := range podSlices { if len(podSlice) > 1 { logger.Warningf("We have too many pods for %s %d", rt, } else if len(podSlice) == 0 {// if master pod is present, select master pod; // If master pod is not present, select master pod; // check if this replica is the master role masterRole = tc.IsMasterRole(replicas, rtype, index) // TODO: [should change to CreateNewPod] err = tc.createNewPod(tfJob, rt, strconv.Itoa(index), spec, masterRole, Replicas)} else {// Check the status of the current pod. pod := podSlice[0] // Check if the index is  in the valid range, if not, we should kill the pod if index < 0 || index >= numReplicas { err = tc.PodControl.DeletePod(pod.Namespace, pod.Name, tfJob) } // Check if the pod is retryable. if spec.RestartPolicy == commonv1.RestartPolicyExitCode { if pod.Status.Phase  == v1.PodFailed && train_util.IsRetryableExitCode(exitCode) { tc.Recorder.Event(tfJob, corev1.EventTypeWarning, tfJobRestartingReason, msg) err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRestarting, tfJobRestartingReason, msg) tfJobsRestartCount.Inc() } } updateJobReplicaStatuses(jobStatus, rtype, pod) } } return nil }Copy the code

4.6.2 createNewPod

CreateNewPod Creates a new pod for the given index and type:

// createNewPod creates a new pod for the given index and type. func (tc *TFController) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec *commonv1.ReplicaSpec, masterRole bool, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error { tfjobKey, err := KeyFunc(tfjob) expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, rt) // Create OwnerReference. controllerRef := tc.GenOwnerReference(tfjob) // Set type and index for the worker. labels := tc.GenLabels(tfjob.Name) labels[tfReplicaTypeLabel] = rt labels[tfReplicaIndexLabel] = index podTemplate := spec.Template.DeepCopy() // Set name for the template. podTemplate.Name = common.GenGeneralName(tfjob.Name, rt, index) if podTemplate.Labels == nil { podTemplate.Labels = make(map[string]string) } for key, Value := range labels {podTemplate.Labels[key] = value} If err := tc.setClusterSpec (tfJob, podTemplate, rt, index); err ! = nil { return err } // if gang-scheduling is enabled: // 1. if user has specified other scheduler, we report a warning without overriding any fields. // 2. if no SchedulerName is set for pods, then we set the SchedulerName to "kube-batch". if tc.Config.EnableGangScheduling { if isNonGangSchedulerSet(replicas) { tc.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) } else { podTemplate.Spec.SchedulerName = gangSchedulerName } if podTemplate.Annotations == nil { podTemplate.Annotations = map[string]string{} } podTemplate.Annotations[gangSchedulingPodGroupAnnotation] = }}}}}}}}}} Really start the Pod create err = tc PodControl. CreatePodsWithControllerRef (tfjob Namespace, podTemplate, tfjob, controllerRef) return nil }Copy the code

4.6.3 Generating Configuration Information

4.6.3.1 SetClusterSpec

The generated configuration information in the above function is important, so let’s take it out of the box.

SetClusterSpec generates and sets TF_CONFIG for a given podTemplateSpec:

// SetClusterSpec generates and sets TF_CONFIG for the given podTemplateSpec. func (tc *TFController) SetClusterSpec(job  interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error { tfjob, ok := job.(*tfv1.TFJob) // Generate TF_CONFIG JSON string. tfConfigStr, err := genTFConfigJSONStr(tfjob, rtype, index) // Add TF_CONFIG environment variable to tensorflow container in the pod. for i := range podTemplate.Spec.Containers { if podTemplate.Spec.Containers[i].Name == tfv1.DefaultContainerName { if len(podTemplate.Spec.Containers[i].Env) == 0 { podTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0) } podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, v1.EnvVar{ Name: tfConfig, Value: tfConfigStr, }) break } } return nil }Copy the code
4.6.3.2 genTFConfigJSONStr

GenTFConfigJSONStr will generate JSON data.

// genTFConfig will generate the environment variable TF_CONFIG // { // "cluster": { // "ps": ["ps1:2222", "ps2:2222"], // "worker": ["worker1:2222", "worker2:2222", "worker3:2222"] // }, // "task": { // "type": "ps", // "index": 1 // }, // } // } func genTFConfigJSONStr(tfjob *tfv1.TFJob, rtype, index string) (string, error) { // Configure the TFCONFIG environment variable. i, err := strconv.ParseInt(index, 0, 32) if err ! = nil { return "", err } cluster, err := genClusterSpec(tfjob) if err ! = nil { return "", err } var tfConfigJSONByteSlice []byte if tfjob.Spec.EnableDynamicWorker { sparseCluster := convertClusterSpecToSparseClusterSpec(cluster, strings.ToLower(rtype), int32(i)) sparseTFConfig := SparseTFConfig{ Cluster: sparseCluster, Task: TaskSpec{ Type: strings.ToLower(rtype), Index: int(i), }, } tfConfigJSONByteSlice, err = json.Marshal(sparseTFConfig) } else { tfConfig := TFConfig{ Cluster: cluster, Task: TaskSpec{ Type: strings.ToLower(rtype), Index: int(i), }, // We need to set environment to cloud otherwise it will default to local which isn't what we want. // Environment is 2 by tensorflow. Contrib. Learn. Python. Learn the in versions < = 1.3 / / TODO (jlewi) : I don't think it is used in versions TF > -1.4. So we can eventually get rid of it. // We need to set the environment to cloud otherwise it will default to local, This is not what we want. Environment: "cloud", } tfConfigJSONByteSlice, err = json.Marshal(tfConfig) } if err ! = nil { return "", err } return string(tfConfigJSONByteSlice), nil }Copy the code
4.6.3.3 genClusterSpec

In this case, get cluster information from cluster information.

// genClusterSpec will generate ClusterSpec. func genClusterSpec(tfjob *tfv1.TFJob) (ClusterSpec, error) { clusterSpec := make(ClusterSpec) for rtype, spec := range tfjob.Spec.TFReplicaSpecs { rt := strings.ToLower(string(rtype)) replicaNames := make([]string, 0, * spec.replicas) port, err := GetPortFromTFJob(tfjob, rtype) For I := int32(0); for I := int32(0); i < *spec.Replicas; i++ { // As described here: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#a-records. // Headless service assigned a DNS A  record for a name of the form "my-svc.my-namespace.svc.cluster.local". // And the last part "svc.cluster.local" is called cluster domain // which maybe different between kubernetes clusters. // As follows: https://kubernetes.io/docs/concepts/services-networking/dns-pos-service/#a-records. / / Headless service for "my - SVC. My - namespace. SVC. Cluster. The local" the name of the distribution of a DNS record. // The last part is the "svc.cluster.local" is called the cluster domain, and there may be differences between different Kubernetes clusters. hostName := common.GenGeneralName(tfjob.Name, rt, fmt.Sprintf("%d", i)) svcName := hostName + "." + tfjob.Namespace + "." + "svc" clusterDomain := os.Getenv(EnvCustomClusterDomain) if len(clusterDomain) > 0 { svcName += "." + clusterDomain } endpoint := fmt.Sprintf("%s:%d", svcName, port) replicaNames = append(replicaNames, endpoint) } clusterSpec[rt] = replicaNames } return clusterSpec, nil }Copy the code

4.6.4 CreatePodsWithControllerRef

After obtaining the cluster configuration information, we use the cluster configuration information to create the actual start Pod:

func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { return r.createPods("", namespace, template, object, nil) } func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { if err := ValidateControllerRef(controllerRef); err ! = nil { return err } return r.createPods("", namespace, template, controllerObject, controllerRef) }Copy the code

4.6.5 createPods

This is where the K8S interface is actually called to create the POD

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { pod, err := GetPodFromTemplate(template, object, controllerRef) if len(nodeName) ! = 0 { pod.Spec.NodeName = nodeName } if labels.Set(pod.Labels).AsSelectorPreValidated().Empty() { return fmt.Errorf("unable to create pods, no labels") } if newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod); err ! = nil { return err } else { accessor, err := meta.Accessor(object) } return nil }Copy the code

The logic is as follows:

+------------------------------+ +------------+ | SetClusterSpec | | runWorker | | +-------------------------+ | +-----+------+ | | genTFConfigJSONStr | | | | | | | | | | genClusterSpec | | v | | | | +--------+------------+ | +-------------------------+ | | processNextWorkItem | +------------------------------+ +--------+------------+ | | | | v  v +------+-------+ +-----------------------------+ +------------+ +----+------+ +----> | createNewPod +----->+ CreatePodsWithControllerRef +------>+ createPods | | syncTFJob | | +--------------+ +-----------------------------+ +------------+ +----+------+ | | | | | v | +-------+--------+ | | ReconcileJobs | | +-------+--------+ | | | | | v | +--------+---------+ | | | | | | | v v | +------+----------+ +----+--------+ | | | | | | |ReconcileServices| |ReconcilePods+--+ | | | | +-----------------+ +-------------+Copy the code

Mobile phones are as follows:

4.7 Processing Services

4.7.1 ReconcileServices

ReconcileServices checks and updates the service for each given TFReplicaSpec, roughly as follows:

  • The TFJob is requested when an error occurs while creating/deleting a service.

  • Obtain all services of type RT.

    • Or create new services;
    • Or delete the old service, currently only allowed to narrow the scope of worker’s service;
// reconcileServices checks and updates services for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting services. func (jc *JobController) ReconcileServices( job metav1.Object, services []*v1.Service, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec) error { // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) replicas := int(*spec.Replicas) // Get all services for the type rt. services, err := jc.FilterServicesForReplicaType(services, rt) // GetServiceSlices will return enough information here to make decision to add/remove/update resources. // // For example, let's assume we have services with replica-index 0, 1, 2 // If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a svc with replica-index 3 will be created. // // If replica is 1, return a slice with size 3. [[0],[1],[2]], svc with replica-index 1 and 2 are out of range and will be deleted. serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rt)) for index, serviceSlice := range serviceSlices { if len(serviceSlice) > 1 { } else if len(serviceSlice) == 0 { err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index)) } else { // Check the status of the current svc. svc := serviceSlice[0] // check if the index is in  the valid range, if not, we should kill the svc if index < 0 || index >= replicas { err = jc.ServiceControl.DeleteService(svc.Namespace, svc.Name, job.(runtime.Object)) } } } return nil }Copy the code

4.7.2 CreateNewService

Create a new service for the given index and type:

// createNewService creates a new service for the given index and type. func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec, index string) error { jobKey, err := KeyFunc(job) // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, rt) err = jc.Expectations.ExpectCreations(expectationServicesKey, 1) if err ! = nil { return err } // Append ReplicaTypeLabel and ReplicaIndexLabel labels. labels := jc.GenLabels(job.GetName()) labels[apiv1.ReplicaTypeLabel] = rt labels[apiv1.ReplicaIndexLabel] = index port, err := jc.GetPortFromJob(spec) if err ! = nil { return err } service := &v1.Service{ Spec: v1.ServiceSpec{ ClusterIP: "None", Selector: labels, Ports: []v1.ServicePort{}, }, } // Add service port to headless service only if port is set from controller implementation if port ! = nil { svcPort := v1.ServicePort{Name: jc.Controller.GetDefaultContainerPortName(), Port: *port} service.Spec.Ports = append(service.Spec.Ports, svcPort) } service.Name = GenGeneralName(job.GetName(), rt, index) service.Labels = labels // Create OwnerReference. controllerRef := jc.GenOwnerReference(job) err = jc.ServiceControl.CreateServicesWithControllerRef(job.GetNamespace(), service, job.(runtime.Object), controllerRef) if err ! = nil && errors.IsTimeout(err) { succeededServiceCreationCount.Inc() return nil } else if err ! = nil { failedServiceCreationCount.Inc() return err } succeededServiceCreationCount.Inc() return nil }Copy the code

4.7.3 CreateServicesWithControllerRef

Using the cluster configuration information, actually start the creation of the Service:

func (r RealServiceControl) CreateServicesWithControllerRef(namespace string, service *v1.Service, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { if err := ValidateControllerRef(controllerRef); err ! = nil { return err } return r.createServices(namespace, service, controllerObject, controllerRef) }Copy the code

4.7.4 createServices

The K8S interface is called to create the service:

func (r RealServiceControl) createServices(namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    if labels.Set(service.Labels).AsSelectorPreValidated().Empty() {
        return fmt.Errorf("unable to create Services, no labels")
    }
    serviceWithOwner, err := GetServiceFromTemplate(service, object, controllerRef)
    newService, err := r.KubeClient.CoreV1().Services(namespace).Create(serviceWithOwner)
    accessor, err := meta.Accessor(object)
}
Copy the code

At this point, the logic is expanded as follows:

+------------------------------+ +------------+ | SetClusterSpec | | runWorker | | +-------------------------+ | +-----+------+ | | genTFConfigJSONStr | | | | | | | | | | genClusterSpec | | v | | | | +--------+------------+ | +-------------------------+ | | processNextWorkItem | +------------------------------+ +--------+------------+ | | | | v  v +------+-------+ +-----------------------------+ +------------+ +----+------+ +----> | createNewPod +----->+ CreatePodsWithControllerRef +------>+ createPods | | syncTFJob | | +--------------+ +-----------------------------+ +------------+ +----+------+ | | | | | v | +------------------+ +---------------------------------+ +----------------+ +-------+--------+ | +----> | CreateNewService +---->+ CreateServicesWithControllerRef +--->+ createServices | | ReconcileJobs | | | +------------------+ +---------------------------------+ +----------------+ +-------+--------+ | | |  | | | | | v | | +--------+---------+ | | | | | | | | | | v v | | +------+----------+ +----+--------+ | | | | | | | | |ReconcileServices| |ReconcilePods+--+ | | | | | | +------+----------+ +-------------+ | | | +---------------------------------->+Copy the code

Mobile phones are as follows:

Therefore, we can roughly know that tF-operator is essentially:

  • The custom resource object of TF-operator is used to describe the training task of distributed machine learning.
  • At the same time, the Controller of TFJob is implemented to control the birth and death of the container and manage the relationship between several processes for users.

0x05 Comparison with common Deployment

Analysis to this point, you may also be a little confused, what is the difference between TF on K8s and ordinary deployment, where is the advantage? Let’s analyze it in detail.

5.1 run

Let’s first look at the Dockerfile content in the source code

The FROM tensorflow/tensorflow: 1.5.0. ADD the/var/tf_dist_mnist ENTRYPOINT [" python ", "/ var/tf_dist_mnist dist_mnist. Py"]Copy the code

Then look at the corresponding spec, there are 2 PS and 4 workers respectively.

apiVersion: "kubeflow.org/v1" kind: "TFJob" metadata: name: "dist-mnist-for-e2e-test" spec: tfReplicaSpecs: PS: replicas: 2 restartPolicy: Never template: spec: containers: - name: tensorflow image: Kubeflow /tf-dist- Mnist-test :1.0 Worker: replicas: 4 restartPolicy: Never template: spec: containers: name: Tensorflow image: kubeflow/tf - dist - mnist - test: 1.0Copy the code

Then install Example and run a distributed MNIST training mission.

CD./examples/v1/dist-mnist docker build -f Dockerfile -t kubeflow/tf-dist-mnist-test: 1.0.kubectl create -f ./tf_job_mnist.yamlCopy the code

5.2 compare

Let’s just look at the training code.

5.2.1 ordinary TF

The configuration for various hosts is set with script parameters. Here is the configuration startup that reads the parameters.

# read parameter ps_spec = flags.ps_hosts.split (',') worker_spec = flags.worker_hosts.split (',') # Create cluster num_worker = len(worker_spec) cluster = tf.train.ClusterSpec({'ps': ps_spec, 'worker': worker_spec}) server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)Copy the code

5.2.2 TF – Operator

First, dist_mnist.py has the following methods to obtain cluster information.

# If not explicitly specified in the constructor and the TF_CONFIG
# environment variable is present, load cluster_spec from TF_CONFIG.
tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
Copy the code

Secondly, in tF-operator, there is the following, indicating that the cluster information is set from here:

tfConfig = "TF_CONFIG"
Copy the code

Then, in SetClusterSpec there is the following, which is to call K8S interface dynamically to get the configuration:

// SetClusterSpec generates and sets TF_CONFIG for the given podTemplateSpec. func (tc *TFController) SetClusterSpec(job  interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error { tfjob, ok := job.(*tfv1.TFJob) // Do not set TF_CONFIG for local training jobs. if ! isDistributed(tfjob) { return nil } // Generate TF_CONFIG JSON string. tfConfigStr, err := genTFConfigJSONStr(tfjob, rtype, index) // Add TF_CONFIG environment variable to tensorflow container in the pod. for i := range podTemplate.Spec.Containers { if podTemplate.Spec.Containers[i].Name == tfv1.DefaultContainerName { if len(podTemplate.Spec.Containers[i].Env) == 0 { podTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0) } podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, v1.EnvVar{ Name: tfConfig, Value: tfConfigStr, }) break } } return nil }Copy the code

So you can see that, from the user’s point of view, you only need to change a little code. As for deployment services, etc., K8S took over.

As long as the user sets the required number of workers in the spec, ps will be done. This allows the user to focus on the model. Devops takes care of everything for you.

0 x06 summary

Based on previous results, we can conclude the following advantages of TF-operator:

  • The custom resource object of TF-operator is used to describe the training task of distributed machine learning.
  • At the same time, the Controller of TFJob is implemented to control the birth and death of the container and manage the relationship between several processes for users.
  • For users, as long as a customized resource object of TFJob is created and relevant information is configured in Template, it is equivalent to describing the execution process of a distributed training program.
  • The user can focus on the model. Devops does everything for you;

Although the KubeFlow/TF-operator works, it still has many defects.

  • Kubeflow can start jobs on Kubernetes based on TensorFlow’s native distributed computing power. But because the latter is not fault-tolerant, Kubeflow cannot be invented out of nothing. No fault tolerance, which means no flexible scheduling.
  • When using kubeflow/tf-operator to execute distributed TensorFlow jobs, model iteration cannot begin until all the processes applied are started. If cluster resources are insufficient to start all processes, the current job can only wait for other jobs to release resources. To reduce resource wait times, you can configure dedicated resource pools for jobs.
  • Because resources are not shared, the resource utilization of the cluster is low. Therefore, it is difficult for KubeFlow/TF-operator to take into account both r&d efficiency and cluster utilization.

And, most importantly: there’s no connection to Horovod, no software like MPI-operator installed, so let’s look at mpi-operator below.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Tensorflow Study Notes (XIX): Distributed TensorFlow

The Elastic Training Operator is an Elastic deep learning Training tool on Kubernetes

Build Kubeflow Pipelines on Ali Cloud

Develop your machine learning workflow

Build machine learning systems like Google 3 – Run ResNet101 with MPIJob

Discover the technical architecture behind Tencent’s multi-tenant training platform based on Kubeflow

Blog.csdn.net/weixin_4397…

[KubeFlow] An in-depth interpretation of the mpi-operator

Best practices for optimizing distributed deep learning performance on Amazon EKS

Acceleration and practice of cloud-native AI platform

One of the cloud native elastic AI training series: Elastic distributed training practices based on AllReduce

MPI on Kubernetes

Kubeflow/ tF-operator source analysis

MPI, OpenMPI and deep learning

Kubectl exec is executed through the shell and shell commands are executed in the corresponding POD container

K8s Series — CRD Custom Resource and Controller Implementation

TensorFlow Distributed Suite (Principles, deployments, Instances)

Kubernetes Operator best practices

ElasticDL: Ant Financial’s open source elastic distributed deep learning system based on TensorFlow

Distributed Deep learning systems – Container-based resource scheduling

Analysis of constructing AI platform based on K8S