Before detailing the various strategies of TensorFlow distribution, we first need to look at the foundation of distribution: the distributed environment. Only lay a solid foundation, in order to the greatest extent in the later analysis of the work to clear away obstacles, get twice the result with half the effort.

Some of the API used in this article is not up to date, but because we are trying to understand its design, the older API is much clearer (many companies in the industry are still based on older versions of TensroFlow, so the older API is also quite analytical).

Here are two highly recommended gods:

  • [TensorFlow Internals] (github.com/horance-liu… TF internal implementation mechanism interested friends are to read, will definitely harvest.
  • Home.cnblogs.com/u/deep-lear… It’s not just TensorFlow, but there are a lot of other areas that are at the forefront of the industry.

The other articles in this series are:

Heterogeneous Distribute Learning based on TensorFlow distributed thesis [翻译

Implementation of Control Flow in TensorFlow

1. Overall architecture

We split the distributed model from several different angles. The division is not absolute, and these angles are not orthogonal and may include each other partly. This division is only easier to understand from these aspects.

1.1 Cluster Perspective

1.1.1 concept

Let’s first break it down in terms of cluster and business logic as follows, with terms like:

  • Cluster: TensorFlow Cluster definition.

    • A TensorFlow cluster contains one or more TensorFlow servers, and a cluster typically focuses on a relatively high-level goal, such as training a neural network with multiple machines in parallel.
    • Training is segmented into a series of jobs, and each job is responsible for a series of tasks. When a cluster has multiple tasks, use tF.train.ClusterSpec to specify the machine for each task.
  • Job: A Job consists of a series of tasks dedicated to the same goal. Tasks in a Job usually run on different machines. There are two kinds of jobs:

    • Ps Job: PS stands for Parameter Server, which handles storing/updating variables.
    • Worker job: Used to host computationally intensive stateless nodes responsible for data computation.
  • Task: A Task completes a specific Task and is usually associated with a process on the TensorFlow server.

    • The Task belongs to a specific job and has a unique index, task_index, in the job’s Task list.
    • Tasks are usually associated with a specific TF.train.server and run in a separate process.
    • You can run one or more tasks on a machine, such as a single machine with multiple Gpus.

1.1.2 schematic

A Cluster contains multiple jobs, and a Job includes 1 to multiple tasks:

Figure 1 Relationships between roles

For Job and two roles, we present a typical parameter server diagram as follows. The ps cluster running at the top of the figure below, with four workers running in the middle.

Figure 2 Parameter server.

Source: “A Survey on Distributed Machine Learning”

1.1.3 create

Let’s take a look at how distributed training can be implemented with low-level apis.

1.1.3.1 Creating a Cluster

We first create a cluster. The cluster consists of two roles. The parameter server PS job has three tasks and the worker job has two tasks. Where each task is a machine, it is also possible to run multiple tasks on the same machine (for example, each task controls different GPU devices).

ClusterSpec is organized as jobs and specifies how tasks are deployed in a cluster. Because a Task corresponds to a process, ClusterSpec also describes how processes are distributed in the TensorFlow distributed runtime.

ps_hosts = ["1.1.1.1:11"."2.2.2.2:22"]
worker_hosts = ["3.3.3.3:33"."4.4.4.4:44"."5.5.5.5:55"]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
Copy the code
1.1.3.2 Creating a Task

Several tasks are then started, and the user script needs to be run on each machine for a total of 5 times (3 ps, 2 workers). For each task, you need to use the same tF.train.ClusterSpec to understand all the tasks in the cluster. A tF.distribution.server service is then started.

A TF.distribution. Server instance encapsulates a set of devices and a tf.pat.v1. Session target that can participate in distributed training. A service belongs to a cluster (specified by tF.train.clusterSpec) and corresponds to a specific task in a given job. This service can communicate with any other service in the same cluster.

FLAGS = tf.app.flags.FLAGS
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
Copy the code
1.1.3.3 Specifying a device

Since the Server has been started, the specific execution logic for each task or node is different. Depending on the command parameters executed by the script, the code determines which task the Server performs.

  • If flags. job_name == “ps”, the program will perform the join operation, because the parameter server is the parameter update service and only needs to wait for other worker nodes to submit updated parameters.
  • If flags. job_name == “worker”, subsequent calculations are performed. Computations/parameters can be separated in TensorFlow, and compute nodes can be assigned to devices, or parameters can be assigned to each device. In a distributed environment, the tf.device() function is still used to place nodes/operations under the current task. The tb.train.replica_device_setter function automatically assigns the computation to the worker based on the job name.
 if FLAGS.job_name == "ps":
   server.join()
 elif FLAGS.job_name == "worker":
   with tf.device(tf.train.replica_device_setter(
               worker_device="/job:worker/task:%d" % FLAGS.task_index,
               cluster=cluster)):
Copy the code

1.2 Distributed Perspective

1.2.1 concept

Let’s take a closer look at this from a distributed business logic/architecture perspective. As we all know, master-worker architecture is a very common architectural organization form in distributed systems, such as: GFS has Master, ChunkServer, Spanner has Zonemaster and Spanserver, Spark has Driver and Executor, and Flink has JobManager and TaskManager. Under this architecture, Master usually maintains cluster meta-information and schedules tasks, while Workers are responsible for specific calculation or maintenance of specific data fragments.

In fact, the distributed TensorFlow also adopts the master-worker architecture. To better illustrate, we present an official architecture diagram of distributed TensorFlow, in which the three roles are all viewed from a logical perspective.

  • Client: All of the previous terms are intended to build a distributed environment that the Client uses to perform computations. A client is usually a program that constructs a TensorFlow graph. Typically, the client loops through RPC calls to get the master to perform iterative computations (such as training).
  • Master: After receiving the command to execute the calculation graph, the Master is responsible for coordinating the scheduling, such as pruning and optimizing the calculation graph and splitting the calculation graph into multiple subgraphs. Each subgraph is assigned and registered to different workers, and each worker is triggered to execute the subgraph concurrently.
  • Worker: Is responsible for concretely calculating the subgraph it receives. After receiving the message of registered subgraph, the Worker will perform secondary segmentation of the calculation subgraph based on the local computing device, allocate the subgraph after the secondary segmentation to each device, and then start the computing device to execute the subgraph concurrently. Workers may complete data exchange through inter-process communication. There are two workers in the figure. The specific Job role of the lower worker is the parameter server, responsible for maintaining parameters/updating parameters, etc. The upper worker will send gradients to the parameter server for parameter updating.

1.2.2 schematic

The cluster in the figure consists of three nodes, each running a TensorFlow Server. Here, both Master and Worker are TensorFlow servers.

Figure 3 cluster, from TensorFlow

1.3 System Perspective

1.3.1 concept

Next, we analyze it from the perspective of specific software implementation, which can be decomposed into the following concepts:

  • TensorFlow Server: A Server is a process that runs the tf.train.Server instance and is a member of a cluster. A Server usually includes a Master Service and a Worker Service. The Server can communicate with other servers in the cluster.

  • Master Service: A GRPC Service, which is used for the same series of remote distributed devices to interact and coordinate the scheduling of multiple worker services.

    • The Master Service corresponding to the “/ / tensorflow/core/protobuf/master_service proto”, its internal CreateSession, RunStep interface, All TensorFlow servers implement the Master Service.
    • Clients can interact with the Master Service to perform distributed TensorFlow calculations. Generally, the client maintains interactive computing with a Master through RPC. The client establishes a client session, connects to a Master, and the Master creates a Master session.
    • A Master Service contains multiple “Master sessions” and maintains their state. Each session encapsulates a graph and its associated state. These master sessions usually correspond to the same “client session” (for example, a TensorFlow ::Session instance).
  • Master Session: A Master Session is responsible for the following tasks.

    • Act as a bridge between the client and the back-end runtime, for example sending GraphDef in Protobuf format to the distributed Master.
    • Each node is assigned to a device (local or remote) using a placement algorithm. The placement algorithm might make a decision based on statistics collected from the worker in the system (for example, memory usage, bandwidth consumption, and so on).
    • To support data flow and resource management across devices and processes, sessions insert intermediate nodes and edges into the compute graph.
    • Issue a command to the worker to execute the subgraph associated with this worker.
  • Worker Session: Worker identifies an execution sequence (register calculation graph, execute command) by Worker Session, Worker Session belongs to a Master Session.

  • Worker Service: This is a GRPC service that represents MasterService executing a data flow graph on a set of local devices. A Worker Service keeps/tracks multiple subgraphs of the customer compute graph that correspond to nodes that should be executed on the worker, as well as any additional nodes that are needed for interprocess communication. Worker service corresponds to worker_service.proto. All TensorFlow servers also implement worker Service.

1.3.2 schematic

We now know that both MasterService and WorkerService are running on each Server, which means that the Server may play both the role of Master and Worker. For example, if we go back to the figure above, the cluster has three nodes. A TensorFlow Server runs on each node. Here, each Master and Worker is a TensorFlow Server, and there are two kinds of service (MasterService and WorkerService) on each Server. However, in this system, Currently, the actual roles with significance are MasterService (above Master) and WorkerService (above two workers), which are represented by underscores in the figure.

Figure 4 service

Let’s move on to some other possibilities.

  • If A Client is connected to A Server A in the cluster, Server A plays the role of Master, and other servers in the cluster are workers, but Server A can also play the role of Worker.
  • The Client and Master can reside in the same process. In this case, the Client and Master can directly interact with each other using function calls to avoid RPC overhead.
  • Master and Worker can be in the same process, and they can directly use function calls to interact to avoid RPC overhead.
  • Multiple clients can be connected to a cluster at the same time. For example, two servers in the cluster can play the role of Master/Worker, and two servers can play the role of Worker:

Figure 5 Multiple Client access

1.4 Operation Angle of Figure

The core of distributed operation is also how to operate the computing graph, but the computing function is split into three roles: Client, Master and Worker. The Client constructs the graph and the Worker performs the computation, but how does the Worker know what to calculate? TensorFlow inserts a Master role between the two for coordination and scheduling.

In distributed mode, the compute graph is split and operations are performed.

  • From the perspective of splitting, TF performs a secondary splitting operation on the calculation graph:
    • MasterSession generates ClientGraph, and then SplitByWorker completes level 1 splitting to get multiple partitionGraphs, and registers the PartitionGraph list with workers.
    • WorkerSession uses SplitByDevice to perform secondary splitting of the computed graph obtained by itself and allocate the PartitionGraph after splitting to each device.
  • From the perspective of execution, the specific execution of the calculation graph only happens on Worker.
    • The Master starts each Worker to execute the PartitionGraph list concurrently.
    • The Worker starts the Executor on each device and executes the PartitionGraph.

Since execution is done by shard, we will only demonstrate shard as follows:

FIG. 6 Segmentation calculation diagram

1.5 Communication Angle

Finally, we analyze the distributed mode from the perspective of communication. The communication component of TF’s message transport was called Rendezvous, which was an abstraction of delivering tensors from producers to consumers, and Rendezvous was a table of channels. The producer calls the Send() method to Send a tensor on a named channel. The consumer calls the Recv() method to receive a tensor from a specified channel.

In distributed mode, edges across devices are split, and Send nodes and Recv nodes are inserted at the sending and receiving ends of the edges, respectively.

  • Intra-process Send and Recv nodes implement data exchange via IntraProcessRendezvous.
  • The Send and Recv nodes between processes exchange data using GrpcRemoteRendezvous.

For example, the left side is the original calculation chart, and the right side is the calculation chart after splitting. 5 nodes are assigned to two workers.

FIG. 7 Split calculation diagram

We assume that Worker 0 has two Gpus. When the Send node and Recv node are inserted, the effect is as follows, where Worker 1 is sent to the representative processes of workers to realize data exchange through GrpcRemoteRendezvous. The dotted arrows between two Gpus in Worker 0 represent intra-process data exchange via IntraProcessRendezvous.

FIG. 8 Communication Angle

Let’s take a look at the Server overview.

2. Server

2.1 interface

Server interface located tensorflow/core/protobuf/tensorflow_server proto, specific as follows:

// Defines the configuration of a single TensorFlow server.
message ServerDef {
 // The cluster of which this server is a member.
 ClusterDef cluster = 1;

 // The name of the job of which this server is a member.
 //
 // NOTE(mrry): The cluster field must contain a JobDef with a name field
 // that matches this name.
 string job_name = 2;

 // The task index of this server in its job.
 //
 // NOTE: The cluster field must contain a JobDef with a matching name
 // and a mapping in its tasks field for this index.
 int32 task_index = 3;

 // The default configuration for sessions that run on this server.
 ConfigProto default_session_config = 4;

 // The protocol to be used by this server.
 //
 // Acceptable values include: "grpc", "grpc+verbs".
 string protocol = 5;

 // The server port. If not set, then we identify the port from the job_name.
 int32 port = 6;

 // Device filters for remote tasks in the cluster.
 // NOTE: This is an experimental feature and only effective in TensorFlow 2.x.
 ClusterDeviceFilters cluster_device_filters = 7;
}
Copy the code

2.2 the Python definition

Server can be viewed from several perspectives.

  • First, a Server is a member of a cluster that manages its set of local devices.
  • Second, the Server is a GRPC-based Server that can communicate with other servers in the cluster.
  • Third, the Server is the process that runs the tf.train.Server instance. Tf.train. These two external interfaces are the two “services”, Master and Worker. Server can play both roles.
  • Fourth, the implementation of Server is GrpcServer.
    • GrpcServer has a member variable inside GRPC ::Server server_, which is GPRC communication Server, server_ listens for messages, And send the command to the corresponding of the two internal services, MasterService and WorkerService. The service is processed through a callback function.
    • The external service is MasterService. The MasterService starts a MasterSession for each Client. The MasterSession is represented by a globally unique session_handle, which is passed to the Client. A Master can serve multiple clients. A Client can deal with only one Master.
    • When it is a Worker role, it can provide services for multiple masters. Its external service is WorkerService, and the WorkerService generates a WorkerSession instance for each MasterSession. MasterSession allows WorkerSession to register graphs and execute commands.

Figure 9 GrpcServer structure

Specific Python interface definitions in tensorflow/Python/training/server_lib py.

@tf_export("distribute.Server", v1=["distribute.Server"."train.Server"])
@deprecation.deprecated_endpoints("train.Server")
class Server(object) :
 """An in-process TensorFlow server, for use in distributed training. A tf.distribute.Server instance encapsulates a set of devices and a tf.compat.v1.Session target that can participate in distributed training. A server belongs to a cluster (specified by a tf.train.ClusterSpec), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster. """

 def __init__(self,
              server_or_cluster_def,
              job_name=None,
              task_index=None,
              protocol=None,
              config=None,
              start=True) :
   """Creates a new server with the given definition.

   The job_name, task_index, and protocol arguments are optional, and
   override any information provided in server_or_cluster_def.

   Args:
     server_or_cluster_def: A tf.train.ServerDef or tf.train.ClusterDef
       protocol buffer, or a tf.train.ClusterSpec object, describing the
       server to be created and/or the cluster of which it is a member.
     job_name: (Optional.) Specifies the name of the job of which the server is
       a member. Defaults to the value in server_or_cluster_def, if
       specified.
     task_index: (Optional.) Specifies the task index of the server in its job.
       Defaults to the value in server_or_cluster_def, if specified.
       Otherwise defaults to 0 if the server's job has only one task.
     protocol: (Optional.) Specifies the protocol to be used by the server.
       Acceptable values include "grpc", "grpc+verbs". Defaults to the value
       in server_or_cluster_def, if specified. Otherwise defaults to
       "grpc".
     config: (Options.) A tf.compat.v1.ConfigProto that specifies default
       configuration options for all sessions that run on this server.
     start: (Optional.) Boolean, indicating whether to start the server after
       creating it. Defaults to True.

   Raises:
     tf.errors.OpError: Or one of its subclasses if an error occurs while
       creating the TensorFlow server.
   """
   self._server_def = _make_server_def(server_or_cluster_def, job_name,
                                       task_index, protocol, config)
   self._server = c_api.TF_NewServer(self._server_def.SerializeToString())
   if start:
     self.start()
Copy the code

The TF_NewServer method enters the C++ world and calls tensorflow::NewServer to establish the C++ world Server.

TF_Server* TF_NewServer(const void* proto, size_t proto_len,
                       TF_Status* status) {
#if defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD)
 status->status = tensorflow::errors::Unimplemented(
     "Server functionality is not supported on mobile");
 return nullptr;
#else
 tensorflow::ServerDef server_def;
 if(! server_def.ParseFromArray(proto, static_cast<int>(proto_len))) {
   status->status = InvalidArgument(
       "Could not parse provided bytes into a ServerDef protocol buffer");
   return nullptr;
 }

 std::unique_ptr<tensorflow::ServerInterface> out_server;
 status->status = tensorflow::NewServer(server_def, &out_server);
 if(! status->status.ok()) return nullptr;

 return new TF_Server(std::move(out_server));
#endif  // defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD)
}
Copy the code

The following code is then used to select which Server to build.

// Creates a server based on the given server_def, and stores it in
// *out_server. Returns OK on success, otherwise returns an error.
Status NewServer(const ServerDef& server_def,
                std::unique_ptr<ServerInterface>* out_server) {
 ServerFactory* factory;
 TF_RETURN_IF_ERROR(ServerFactory::GetFactory(server_def, &factory));
 return factory->NewServer(server_def, ServerFactory::Options(), out_server);
}
Copy the code

GrpcServer is already registered in the system. GrpcServerFactory is the factory class. If protocol is “GRPC”, GrpcServer is generated.

class GrpcServerFactory : public ServerFactory {
public:
 bool AcceptsOptions(const ServerDef& server_def) override {
   return server_def.protocol() = ="grpc";
 }

 Status NewServer(const ServerDef& server_def, const Options& options,
                  std::unique_ptr<ServerInterface>* out_server) override {
   return GrpcServer::Create(server_def, Env::Default(), options.local_device_mgr, out_server); }};Copy the code

So, let’s look at GrpcServer next.

2.3 ServerInterface

ServerInterface is the basic interface that represents a TensorFlow Sever that outputs Master and Worker services. Defined in tensorflow/core/distributed_runtime/server_lib. H. The library creates TensorFlow server objects based on a registration/factory mechanism. Each server implementation must have a matching ServerFactory and create a static “Registrar” object that calls ServerFactory::Register() with an instance of the factory class. Details are as follows:

class ServerInterface {
public:
 ServerInterface() {}
 virtual ~ServerInterface() {}

 // Starts the server running asynchronously. Returns OK on success, otherwise
 // returns an error.
 virtual Status Start(a) = 0;

 // Stops the server asynchronously. Returns OK on success, otherwise returns
 // an error.
 //
 // After calling Stop(), the caller may call Join() to block until the
 // server has stopped.
 virtual Status Stop(a) = 0;

 // Blocks until the server has stopped. Returns OK on success, otherwise
 // returns an error.
 virtual Status Join(a) = 0;

 // Returns a target string that can be used to connect to this server using
 // tensorflow::NewSession().
 virtual const string target(a) const = 0;

 virtual WorkerEnv* worker_env(a) = 0;
 virtual MasterEnv* master_env(a) = 0;

 // Update the set of workers that can be reached by the server
 virtual Status UpdateServerDef(const ServerDef& server_def) = 0;

 // Functions to operate on service-specific properties.
 //
 // Add master eager context to local eager service in order to handle enqueue
 // requests from remote workers.
 virtual Status AddMasterEagerContextToEagerService(
     const tensorflow::uint64 context_id, EagerContext* context) = 0;
 // Set coordination service agent instance to coordination service RPC handler
 virtual Status SetCoordinationServiceAgentInstance( CoordinationServiceAgent* agent) = 0;

private:
 TF_DISALLOW_COPY_AND_ASSIGN(ServerInterface);
};
Copy the code

The factory class is defined as follows:

class ServerFactory {
public:
 struct Options {
   // Local DeviceMgr to use.
   tensorflow::DeviceMgr* local_device_mgr;
 };
 // Creates a new server based on the given server_def, and stores
 // it in *out_server. Returns OK on success, otherwise returns an
 // error.
 virtual Status NewServer(const ServerDef& server_def, const Options& options,
                          std::unique_ptr<ServerInterface>* out_server) = 0;

 // Returns true if and only if this factory can create a server
 // based on the given server_def.
 virtual bool AcceptsOptions(const ServerDef& server_def) = 0;

 virtual ~ServerFactory() {}

 // For each ServerFactory subclass, an instance of that class must
 // be registered by calling this method.
 //
 // The server_type must be unique to the server factory.
 static void Register(const string& server_type, ServerFactory* factory);

 // Looks up a factory that can create a server based on the given
 // server_def, and stores it in *out_factory. Returns OK on
 // success, otherwise returns an error.
 static Status GetFactory(const ServerDef& server_def,
                          ServerFactory** out_factory);
};
Copy the code

2.4 GrpcServer

Against 2.4.1 definition

GrpcServer is the structure that manages the Master and Worker services in the current process. The state machine in the following comments is formed by Start(), Stop() and Join().

  • GRPC ::Server is started in the New state, but no external service is provided.
  • The MasterService and WorkerService RPC services are Started in the Started state.
  • In the Stopped state, MasterService and WorkerService are Stopped.
 // Represents the current state of the server, which changes as follows:
 //
 // Join() Join()
 / / ___ ___
 // Start() \ / Stop() \ /
 // NEW ---------> STARTED --------> STOPPED
 / / / /
 / / \ ________________________
 // Stop(), Join()
Copy the code

Its main member variables are:

  • MasterEnv master_env_ : is the environment used by the Master. The environment does not have these actual Pointers.
  • Worker_env_ : WorkerEnv, which is the environment in which the worker works.
  • Master_impl_ : Master class that performs specific service operations.
  • Worker_impl_ : GrpcWorker that performs service operations;
  • Master_service_ : GrpcMasterService instance;
  • Worker_service_ : GrpcWorkerService instance;
  • Master_thread_ : the thread used by MasterService for RPC polling;
  • Worker_thread_ : the thread used by WorkerService to conduct RPC polling;
  • STD ::unique_ptr<:: GRPC ::Server> Server_ : GPRC communication Server;

Specifically, several threads are started to execute GrpcMasterService, GrpcWorkerService, and GrpcEagerServiceImpl respectively.

class GrpcServer : public ServerInterface {

private:
 Env* env_;

 // The port to which this server is bound.
 int bound_port_ = 0;

 // The host name of this server
 string host_name_;

 // Guards server configuration, server, and state.
 mutex mu_;

 enum State { NEW, STARTED, STOPPED };
 State state_ TF_GUARDED_BY(mu_);

 // Implementation of a TensorFlow master, and RPC polling thread.
 MasterEnv master_env_;
 std::unique_ptr<Master> master_impl_;
 AsyncServiceInterface* master_service_ = nullptr;
 std::unique_ptr<Thread> master_thread_ TF_GUARDED_BY(mu_);

 std::map<std::string, AsyncServiceInterface*> extra_services_;
 std::vector<std::unique_ptr<Thread>> extra_service_threads_
     TF_GUARDED_BY(mu_);

 // Implementation of a TensorFlow worker, and RPC polling thread.
 WorkerEnv worker_env_;
 std::unique_ptr<const DeviceMgr> owned_device_manager_;
 std::unique_ptr<GrpcWorker> worker_impl_;
 AsyncServiceInterface* worker_service_ = nullptr;
 std::unique_ptr<Thread> worker_thread_ TF_GUARDED_BY(mu_);
 std::unique_ptr<GrpcWorkerEnv> grpc_worker_env_;

 // TensorFlow Eager implementation, and RPC polling thread.
 AsyncServiceInterface* eager_service_ = nullptr;
 std::unique_ptr<Thread> eager_thread_ TF_GUARDED_BY(mu_);
 std::shared_ptr<WorkerSession> worker_session_;

 // TensorFlow profiler service implementation.
 std::unique_ptr<grpc::ProfilerService::Service> profiler_service_ = nullptr;

 // The overall server configuration.
 ServerDef server_def_ TF_GUARDED_BY(mu_);

 std::unique_ptr<::grpc::Server> server_ TF_GUARDED_BY(mu_);
};
Copy the code

2.4.2 initialization

The initialization logic is as follows:

  • Initialize MasterEnv and WorkerEnv.

  • Create Device Manager;

  • Build a device list;

  • Create RpcRendezvousMgr;

  • Set up the necessary server Settings.

  • Create the Master and the corresponding GrpcMasterService. GrpcMasterService is the entity that provides the external service. The message handler function will be called when the message arrives. The specific business is provided by Master.

  • Create a GrpcWorker and the corresponding GrpcWorkerService. GrpcWorkerService is an entity that provides services to the outside world. The message handler function will be called when the message arrives. Specific services are provided by GrpcWorker.

  • Call Build. BuildAndStart to start GRPC communication Server GRPC ::Server. After starting GRPC ::Server, GrpcServer is still in the New state and does not provide external services until the state machine changes to the Started state.

  • Establish the environment required by GRPC;

  • Create WorkerCache;

  • Create a SessionMgr and then create a WorkerSession in the SessionMgr;

  • Set the MasterSession Factory, if necessary, will call create MasterSession, because some tasks such as PS does not need MasterSession;

  • Registered LocalMaster;

Status GrpcServer::Init(const GrpcServerOptions& opts) {
 mutex_lock l(mu_);
 master_env_.env = env_;
 worker_env_.env = env_;

 // Check parameters before DeviceFactory::AddDevices,
 // otherwise if 'task_index=-1' the program will abort.

 int requested_port;
 TF_RETURN_IF_ERROR(GetHostAndPort(server_def_, &host_name_, &requested_port));

 SessionOptions sess_opts;
 ConfigProto config = server_def_.default_session_config(a); sess_opts.config = config;// Configure shared devices between master and worker.
 string name_prefix =
     strings::StrCat("/job:", server_def_.job_name(), "/replica:0"."/task:", server_def_.task_index());
 
 // Create Device Manager
 if (opts.local_device_mgr == nullptr) {
   std::vector<std::unique_ptr<Device>> devices;
   TF_RETURN_IF_ERROR(
       DeviceFactory::AddDevices(sess_opts, name_prefix, &devices));
   worker_env_.device_mgr = new DynamicDeviceMgr(std::move(devices));
   owned_device_manager_.reset(worker_env_.device_mgr);
 } else {
   worker_env_.device_mgr = opts.local_device_mgr;
   owned_device_manager_.reset(nullptr);
 }
 // Build a list of devices
 worker_env_.local_devices = worker_env_.device_mgr->ListDevices(a); master_env_.local_devices = worker_env_.device_mgr->ListDevices(a);RpcRendezvousMgr was created
 worker_env_.rendezvous_mgr = opts.rendezvous_mgr_func == nullptr
                                  ? new RpcRendezvousMgr(&worker_env_)
                                  : opts.rendezvous_mgr_func(&worker_env_);
 string unused;
 string default_worker_name;
 if(! DeviceNameUtils::SplitDeviceName(master_env_.local_devices[0] - >name(),
                                       &default_worker_name, &unused)) {
   return errors::Internal("Could not parse worker name.");
 }

 // Set up the necessary server Settings
 ::grpc::ServerBuilder builder;
 builder.AddListeningPort(strings::StrCat("0.0.0.0:", requested_port),
                          GetServerCredentials(server_def_), &bound_port_);
 builder.SetMaxMessageSize(std::numeric_limits<int32>::max());
 bool reuse_port = false;
 const Status status =
     ReadBoolFromEnvVar("TF_GRPC_REUSE_PORT".false, &reuse_port);
 auto server_build_option =
     reuse_port
         ? std::unique_ptr<::grpc::ServerBuilderOption>(new ReusePortOption)
         : std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption);
 builder.SetOption(std::move(server_build_option));

 // Allow subclasses to specify more args to pass to the gRPC server.
 // Create Master and GrpcMasterService
 MaybeMutateBuilder(&builder, requested_port);
 master_impl_ = CreateMaster(&master_env_);
 master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);
 // Create GrpcWorker and GrpcWorkerService
 worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config)
                                 : NewGrpcWorker(&worker_env_, config);
 worker_service_ = NewGrpcWorkerService(worker_impl_.get(), &builder,
                                        opts.worker_service_options)
                       .release(a); eager_service_ =new eager::GrpcEagerServiceImpl(&worker_env_, &builder);

 profiler_service_ = profiler::CreateProfilerService(a); builder.RegisterService(profiler_service_.get());

 // Add any extra services to be started.
 extra_services_ = ExtraServices(&builder);

 // extra service:
 if(opts.service_func ! =nullptr) {
   opts.service_func(&worker_env_, &builder);
 }
 // Start the GRPC communication server
 server_ = builder.BuildAndStart(a);// Create the execution environment for the GRPC workers cache.
 // Create the environment required by GRPC
 grpc_worker_env_.reset(CreateGrpcWorkerEnv());

 / / create WorkerCache
 WorkerCacheInterface* worker_cache;
 WorkerCacheFactoryOptions worker_cache_factory_options(server_def_);
 TF_RETURN_IF_ERROR(
     WorkerCacheFactory(worker_cache_factory_options, &worker_cache));
 CHECK_NE(nullptr, worker_cache);

 if (opts.collective_mgr_func) {
   worker_env_.collective_executor_mgr.reset(
       opts.collective_mgr_func(config, &worker_env_, worker_cache));
 } else {
   worker_env_.collective_executor_mgr = CreateProdRpcCollectiveExecutorMgr(
       config, worker_env_.device_mgr, MaybeCreateNcclCommunicator(),
       worker_cache, default_worker_name);
 }

 // Set up worker environment.
 // Create a SessionMgr, and then create a WorkerSession in that SessionMgr
 worker_env_.session_mgr = new SessionMgr(
     &worker_env_, SessionMgr::WorkerNameFromServerDef(server_def_),
     std::unique_ptr<WorkerCacheInterface>(worker_cache),
     [this] (const ServerDef& server_def, WorkerCacheInterface** worker_cache) {
       WorkerCacheFactoryOptions options(server_def);
       return WorkerCacheFactory(options, worker_cache);
     });
 worker_env_.compute_pool = ComputePool(sess_opts);

 // Finish setting up master environment.
 master_env_.ops = OpRegistry::Global(a); master_env_.worker_cache = worker_cache; master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr.get(a); StatsPublisherFactory stats_factory = opts.stats_factory;// Set the MasterSession Factory. Create MasterSession if necessary, because some tasks such as ps do not need MasterSession
 master_env_.master_session_factory =
     [config, stats_factory](
         SessionOptions options, const MasterEnv* env,
         std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs,
         std::unique_ptr<WorkerCacheInterface> worker_cache,
         std::unique_ptr<DeviceSet> device_set,
         std::vector<string> filtered_worker_list) {
       options.config.MergeFrom(config);
       return new MasterSession(options, env, std::move(remote_devs),
                                std::move(worker_cache), std::move(device_set),
                                std::move(filtered_worker_list),
                                stats_factory);
     };
 master_env_.worker_cache_factory =
     [this] (const WorkerCacheFactoryOptions& options,
            WorkerCacheInterface** worker_cache) {
       return WorkerCacheFactory(options, worker_cache);
     };

 // Provide direct access to the master from in-process clients.
 / / register LocalMaster
 LocalMaster::Register(target(), master_impl_.get(),
                       config.operation_timeout_in_ms());

 return Status::OK(a); }Copy the code

Master

Master is the object that concretely provides the business. In the code above, the relevant statements to generate the master are as follows

master_impl_ = CreateMaster(&master_env_);

LocalMaster::Register(target(), master_impl_.get(),
                       config.operation_timeout_in_ms());
Copy the code

As you can see from the following code, GrpcServer generates a Master.

std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) {
 return std::unique_ptr<Master>(new Master(master_env, 0.0));
}
Copy the code

The target for Master is “GRPC ://”.

const string GrpcServer::target(a) const {
 return strings::StrCat("grpc://", host_name_, ":", bound_port_);
}
Copy the code

LocalMaster will register the Master internally.

// Provide direct access to the master from in-process clients.
LocalMaster::Register(target(), master_impl_.get(),
                     config.operation_timeout_in_ms());
Copy the code

Worker

In the initialization code, the following code creates the worker. By default, NewGrpcWorker is called to create the GrpcWorker (the object that provides the business).

 worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config)
                                 : NewGrpcWorker(&worker_env_, config);
Copy the code

2.4.3 Env

WorkerEnv

WorkerEnv aggregates various related configurations for Worker to use, which can be considered as the running context of Worker. WorkerEnv and Server have the same life cycle and can be seen throughout Worker running. Its main variables are as follows:

  • Env* Env: cross-platform API interface

  • SessionMgr* session_mgr: Manages the WorkerSession set.

  • STD ::vector<Device*> local_devices: a set of local devices.

  • DeviceMgr* device_MGR: Manages local and remote device sets.

  • Rendezvous SMGRInterface * Rendezvous Vous_MGR: Manage Rendezvous instance set.

  • std::unique_ptr collective_executor_mgr;

  • Thread ::ThreadPool* compute_pool: a thread pool from which a thread is fetched each time an operator executes.

// The worker environment class, which holds a bag of pointers to
// per-worker singletons.
//
// WorkerEnv does not own its member pointers.
struct WorkerEnv {
 Env* env = nullptr;

 // session_mgr encapsulates state for each session.
 SessionMgr* session_mgr = nullptr;

 // The local devices of this worker. Devices are owned by the device_mgr.
 //
 // REQUIRES: ! local_devices.empty().
 std::vector<Device*> local_devices;

 // device_mgr manages local devices (cpu and gpu). The WorkerService
 // is the network interface for managed devices.
 //
 // Note: Please use the device_mgr associated with your session if appropriate
 // instead of this one. Using this device_mgr does not support ClusterSpec
 // propagated sessions.
 DeviceMgr* device_mgr = nullptr;

 // A set of rendezvous keyed by step ids.
 RendezvousMgrInterface* rendezvous_mgr = nullptr;

 // Generates per-step CollectiveExecutors and has access to utilities
 // supporting collective operations.
 std::unique_ptr<CollectiveExecutorMgrInterface> collective_executor_mgr;

 // A pool of threads for scheduling compute work.
 thread::ThreadPool* compute_pool = nullptr;

 // Coordination service.
 CoordinationServiceInterface* coord_service;
};
Copy the code

Several management class member variables of WorkerEnv are important, such as the SessionMgr class, which manages sessions for workers, such as session creation and destruction, and maintains a mapping of session handles to sessions for the current Worker.

class SessionMgr {
 public:
   Status CreateSession(...).;
   Status DeleteSession(...).;
 private:
   const WorkerEnv* const worker_env_;
   const WorkerCacheFactory worker_cache_factory_;
   std::map<string, std::unique_ptr<WorkerSession>> sessions_ GUARDED_BY(mu_);
};
Copy the code

MasterEnv

MasterEnv groups together various configurations for use by the master, and can be thought of as the context of the Master runtime, visible throughout the master’s life cycle. Its main member variables are as follows:

  • Env* Env: cross-platform API interface.
  • Vector

    local_devices: a set of local devices;
    *>
  • WorkerCacheFactory worker_cache_factory: Factory class that creates an instance of WorkerCacheInterface;
  • Master_session_factory: factory class that can create MasterSession instances;
  • WorkerCacheInterface: Creates an instance of MasterInterface, which is used to invoke the remote MasterService service.
  • OpRegistryInterface* OPS: Query metadata for a specific OP;
  • CollectiveExecutorMgrInterface * collective_executor_mgr: access to the set operations.
// The master environment class, which holds a bag of pointers to
// per-master state.
//
// MasterEnv does not own its member pointers.
struct MasterEnv {
 Env* env = nullptr;

 // Object from which WorkerInterface instances can be obtained. Not owned.
 WorkerCacheInterface* worker_cache = nullptr;

 // The operation definitions to use. Must be filled before use.
 const OpRegistryInterface* ops = nullptr;

 // Local devices co-located with this master. Devices are not owned
 // by the master service.
 //
 // REQUIRES: ! local_devices.empty().
 std::vector<Device*> local_devices;

 // Factory for creating master sessions, given session options and a
 // vector of devices.
 //
 // The caller of the function takes ownership of the returned
 // MasterSession, which may not be null. Ownership of the
 // MasterEnv* is retained by the caller.
 std::function<MasterSession*(
     SessionOptions, MasterEnv*,
     std::unique_ptr<std::vector<std::unique_ptr<Device>>>,
     std::unique_ptr<WorkerCacheInterface>,
     std::unique_ptr<DeviceSet> device_set,
     std::vector<string> filtered_worker_list)>
     master_session_factory;

 std::function<Status(const WorkerCacheFactoryOptions&,
                      WorkerCacheInterface**)>
     worker_cache_factory;

 // Generates per-step CollectiveExecutors and has access to utilities
 // supporting collective operations. Not owned.
 CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr;
};
Copy the code

2.5 start

At the end of the Python code, the start method is called.

@tf_export("distribute.Server", v1=["distribute.Server"."train.Server"])
@deprecation.deprecated_endpoints("train.Server")
class Server(object) :
 def __init__(self,
              server_or_cluster_def,
              job_name=None,
              task_index=None,
              protocol=None,
              config=None,
              start=True) :
   self._server_def = _make_server_def(server_or_cluster_def, job_name,
                                       task_index, protocol, config)
   self._server = c_api.TF_NewServer(self._server_def.SerializeToString())
   if start:
     self.start()
Copy the code

Before the call, Server is in the New state. After the call to start, the state of GrpcServer is moved from New to Started state. The Start() method starts three separate threads, the message handlers for MasterService, WorkerService, and EagerService. Until now, GrpcServer provides MasterService and WorkerService.

Status GrpcServer::Start(a) {
 mutex_lock l(mu_);
 switch (state_) {
   case NEW: {
     master_thread_.reset(
         env_->StartThread(ThreadOptions(), "TF_master_service"[this] { master_service_->HandleRPCsLoop(a); })); worker_thread_.reset(
         env_->StartThread(ThreadOptions(), "TF_worker_service"[this] { worker_service_->HandleRPCsLoop(a); })); eager_thread_.reset(
         env_->StartThread(ThreadOptions(), "TF_eager_service"[this] { eager_service_->HandleRPCsLoop(a); }));for (const auto& kv : extra_services_) {
       const std::string& service_name = kv.first;
       AsyncServiceInterface* service = kv.second;
       std::unique_ptr<Thread> extra_service_thread;
       extra_service_thread.reset(env_->StartThread(
           ThreadOptions(), service_name,
           [service = service] { service->HandleRPCsLoop(a); })); extra_service_threads_.push_back(std::move(extra_service_thread));
     }

     state_ = STARTED;
     return Status::OK(a); }case STARTED:
     return Status::OK(a);case STOPPED:
     return errors::FailedPrecondition("Server has stopped.");
   default:
     LOG(FATAL); }}Copy the code

2.6 Waiting to Terminate the Service

After startup, these threads need to Join, so the main thread is suspended until the two threads terminate, so that the MasterService and WorkerService can be provided persistently.

Status GrpcServer::Join(a) {
 mutex_lock l(mu_);
 switch (state_) {
   case NEW:
     // Prevent the server from being started subsequently.
     state_ = STOPPED;
     return Status::OK(a);case STARTED:
   case STOPPED:
     master_thread_.reset(a); worker_thread_.reset(a); eager_thread_.reset(a);for (auto& thread : extra_service_threads_) {
       thread.reset(a); }return Status::OK(a);default:
     LOG(FATAL); }}Copy the code

At this point, the overall introduction of TF distributed environment is complete.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

TensorFlow Internals

TensorFlow Architecture and Design: Overview

TensorFlow kernel analysis

TensorFlow Architecture and Design: OP Essentialism

TensorFlow whitepaper

Tensorflow Developer Summit 2017

Jcf94.com/2018/02/28/…

TensorFlow 拆包(五):Distributed

TensorFlow Architecture

Tensorflow (Tensorflow)

What are In-graph replication and between-graph replication?

TensorFlow (1): create a session

05tensorflow Distributed session

Section 8, configure distributed TensorFlow

TensorFlow Distributed TensorFlow

Distributed_runtime for tensorflow source code parsing

Distributed TensorFlow: A Gentle Introduction

This article explains the essential knowledge of Tensorflow distributed training

Placer, the Placement heuristic algorithm module in TensorFlow

Graph Partitioner for TensorFlow

A communication mechanism for TensorFlow

TensorFlow distributed pit mining

TensorFlow: Distributed execution of model optimization

Tensorflow architecture process]