In the previous article, Master successively calls gRPC in the process to send commands to the remote worker, that is, each function in GrpcRemoteWorker class initiates an asynchronous gRPC call by calling IssueRequest(). GrpcRemoteWorker makes two requests: RegisterGraphAsync and RunGraphAsync. Let’s see how GrpcWorkerService handles this.

This paper still draws on two great gods:

  • [TensorFlow Internals] (github.com/horance-liu… TF internal implementation mechanism interested friends are to read, will definitely harvest.
  • Home.cnblogs.com/u/deep-lear… It’s not just TensorFlow, but there are a lot of other areas that are at the forefront of the industry.

The other articles in this series are:

Heterogeneous Distribute Learning based on TensorFlow distributed thesis [翻译

Implementation of Control Flow in TensorFlow

TensorFlow Distributed environment (1) — overall architecture

TensorFlow distributed environment (2)– Master static logic

TensorFlow distributed environment (3)- Worker static logic

TensorFlow distributed environment (4) — WorkerCache

TensorFlow distributed environment (5) — Session

TensorFlow distributed environment (6) — Master dynamic logic

1. An overview of the

1.1 review

Let’s start by reviewing the relationship between the various concepts so far.

  • The Client builds the FullGraph, but the FullGraph cannot be executed in parallel, so shard optimization is required.
  • The Master performs operations on the complete computed graph, such as pruning, to generate a ClientGraph (the smallest dependent subgraph that can be executed). Then divide ClientGraph into multiple partitionGraphs based on Worker information. Register these partitionGraphs to each Worker.
  • After receiving the registration request, the Worker splits the received PartitionGraph into multiple partitionGraphs based on the local computing device set, and starts an Executor on each device to execute the PartitionGraph received by the device.

1.2 to know the new

Let’s look at the Worker process summary. When the process comes to a characteristic Worker node, if the Worker node receives the RegisterGraphRequest, The message carries the session_handle assigned by MasterSession and the graph_def subgraph (GraphDef form). GraphDef is the result of TensorFlow serializing the graphs created by the Client using the Protocol Buffer. GraphDef contains all the metadata of the computed graph. It can be converted to Graph by the ConvertGraphDefToGraph method. Graph not only has the metadata to compute the Graph, but also other information needed at runtime.

The Worker continues to slice the graph into multiple partitionGraphs based on the set of local devices, assigns the PartitionGraph to each device, starts an Executor on each device, and waits for subsequent commands to be executed. The Executor class is an abstraction of the session Executor in TensorFlow that provides a RunAsync virtual method that asynchronously executes a local graph and a synchronous encapsulated version of the Run method.

When the Worker node receives RunGraphAsync, each device starts executing. WorkerSession calls session->graph_mgr()->ExecuteAsync, which in turn calls StartParallelExecutors, which starts an ExecutorBarrier. When a computing device completes executing the PartitionGraph assigned, the ExecutorBarrier counter increases by 1, and if all devices complete executing the PartitionGraph list, the barrier.wait() blocking operation exits.

Let’s walk through the process step by step.

2. Register subgraphs

When the worker nodes after received RegisterGraphRequest, first came to the GrpcWorkerService, so the actual call is the “. / tensorflow WorkerService/RegisterGraph “, RegisterGraphHandler: RegisterGraphHandler RegisterGraphHandler

#define HANDLE_CALL(method, may_block_on_compute_pool)                        \
  void method##Handler(WorkerCall<method##Request, method##Response>* call) { \
    auto closure = [this, call]() {                                           \
      Status s = worker_->method(&call->request, &call->response);            \
      if(! s.ok()) { \ VLOG(3) <<"Bad response from " << #method << ":"<< s; \ } \ call->SendResponse(ToGrpcStatus(s)); The \}; \if((may_block_on_compute_pool)) { \ worker_->env()->env->SchedClosure(std::move(closure)); The \}else{ \ worker_->env()->compute_pool->Schedule(std::move(closure)); \ } \ ENQUEUE_REQUEST(method, false); The \}

HANDLE_CALL(RegisterGraph, false);
Copy the code

2.1 GrpcWorker

RegisterGraph actually calls the WorkerInterface method, which internally goes to the RegisterGraphAsync method.

Status WorkerInterface::RegisterGraph(const RegisterGraphRequest* request,
                     RegisterGraphResponse* response) {
  return CallAndWait(&ME::RegisterGraphAsync, request, response);
}
Copy the code

RegisterGraphAsync finally comes to the Worker’s implementation, which first finds WokerSession according to session_handle and then calls GraphMgr.

GraphMgr* SessionMgr::graph_mgr(a) const { return graph_mgr_.get(a); }Copy the code

RegisterGraphAsync is as follows:

void Worker::RegisterGraphAsync(const RegisterGraphRequest* request,
                                RegisterGraphResponse* response,
                                StatusCallback done) {
  std::shared_ptr<WorkerSession> session;
  Status s;
  if (request->create_worker_session_called()) {
    s = env_->session_mgr->WorkerSessionForSession(request->session_handle(),
                                                   &session);
  } else {
    session = env_->session_mgr->LegacySession(a); }if (s.ok()) {
    s = session->graph_mgr() - >Register(
        request->session_handle(), request->graph_def(), session.get(),
        request->graph_options(), request->debug_options(),
        request->config_proto(), request->collective_graph_key(),
        session->cluster_flr(), response->mutable_graph_handle());
  }
  done(s);
}
Copy the code

2.2 GraphMgr

GraphMgr is responsible for tracking a set of graphs registered with the TensorFlow worker. Each registered graph is identified by the graph_handle generated by GraphMgr and returned to the caller. After successful registration, the caller executes a graph using the graph handle. Each execution is distinguished from the others by a globally unique ID “step_id” generated by the caller. As long as the “step_id” used is different, multiple executions can use the same graph independently at the same time, and multiple threads can call the GraphMgr method concurrently.

2.2.1 definition

GraphMgr is defined as follows:

class GraphMgr {
 private:
  typedef GraphMgr ME;

  struct ExecutionUnit {
    std::unique_ptr<Graph> graph = nullptr;
    Device* device = nullptr;               // not owned.
    Executor* root = nullptr;               // not owned.
    FunctionLibraryRuntime* lib = nullptr;  // not owned.
    // Build the cost model if this value is strictly positive.
    int64_t build_cost_model = 0;
  };

  struct Item : public core::RefCounted {
    ~Item(a)override;

    // Session handle.
    string session;

    // Graph handle.
    string handle;

    std::unique_ptr<FunctionLibraryDefinition> lib_def;
    // Owns the FunctionLibraryRuntime objects needed to execute functions, one
    // per device.
    std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr;
    // A graph is partitioned over multiple devices. Each partition
    // has a root executor which may call into the runtime library.
    std::vector<ExecutionUnit> units;

    // Used to deregister a cost model when cost model is required in graph
    // manager.
    GraphMgr* graph_mgr;

    int64_t collective_graph_key;
  };

  const WorkerEnv* worker_env_;  // Not owned.
  const DeviceMgr* device_mgr_;

  CostModelManager cost_model_manager_;

  // Owned.
  mutex mu_;
  int64_t next_id_ TF_GUARDED_BY(mu_) = 0;

  // If true, blocks until device has finished all queued operations in a step.
  bool sync_on_finish_ = true;

  // Table mapping graph handles to registered graphs.
  //
  // TODO(zhifengc): If the client does not call Deregister, we'll
  // lose memory over time. We should implement a timeout-based
  // mechanism to gc these graphs.
  std::unordered_map<string, Item*> table_;

  TF_DISALLOW_COPY_AND_ASSIGN(GraphMgr);
};
Copy the code

The relationship and functions of each class are as follows: The registration graph is to register new items into the table_ variable of GraphMgr, and the execution graph is to execute specific items.

2.2.2 registered figure

The registration diagram code is as follows, which is actually transferred to InitItem, so let’s move on to InitItem.

Status GraphMgr::Register(
    const string& handle, const GraphDef& gdef, WorkerSession* session,
    const GraphOptions& graph_options, const DebugOptions& debug_options,
    const ConfigProto& config_proto, int64_t collective_graph_key,
    DistributedFunctionLibraryRuntime* cluster_flr, string* graph_handle) {
  Item* item = new Item;
  Status s = InitItem(handle, gdef, session, graph_options, debug_options,
                      config_proto, collective_graph_key, cluster_flr, item);
  if(! s.ok()) {
    item->Unref(a);return s;
  }

  // Inserts one item into table_.
  {
    mutex_lock l(mu_);
    *graph_handle =
        strings::Printf("%016llx".static_cast<long long>(++next_id_));
    item->handle = *graph_handle;
    CHECK(table_.insert({*graph_handle, item}).second);
  }
  return Status::OK(a); }Copy the code

InitItem’s main functions are:

  • After defining “gdef” for a graph of the given session, create executors.

  • If a node in “gdef” is shared by other graphs in “session “, the same op kernel is reused. For example, often a params node is shared by multiple graphs in a session.

  • If “gdef” is assigned to more than one device, additional nodes may be added (for example, send/receive nodes). The names of the additional nodes are generated by calling “new_name(old_name) “.

  • If successful, the “Executors” will be allocated, each device fills an actuator, and the caller gets ownership of the returned executors.

// Creates executors given a graph definition "gdef" of a "session".
// If a node in "gdef" is shared by other graphs in "session", the
// same op kernel is reused. E.g., typically a params node is shared
// by multiple graphs in a session.
//
// If "gdef" is assigned to multiple devices, extra nodes (e.g.,
// send/recv nodes) maybe added. The extra nodes' name are generated
// by calling "new_name(old_name)".
//
// "executors" are filled with one executor per device if success and
// the caller takes the ownership of returned executors.
Status GraphMgr::InitItem(
    const string& handle, const GraphDef& gdef, WorkerSession* session,
    const GraphOptions& graph_options, const DebugOptions& debug_options,
    const ConfigProto& config_proto, int64_t collective_graph_key,
    DistributedFunctionLibraryRuntime* cluster_flr, Item* item) {
  item->session = handle;
  item->collective_graph_key = collective_graph_key;
  item->lib_def.reset(
      new FunctionLibraryDefinition(OpRegistry::Global(), gdef.library()));

  TF_RETURN_IF_ERROR(ValidateGraphDefForDevices(gdef));

  // We don't explicitly Validate the graph def because ConvertGraphDefToGraph
  // does that below.
  item->proc_flr.reset(new ProcessFunctionLibraryRuntime(
      device_mgr_, worker_env_->env, /*config=*/&config_proto,
      gdef.versions().producer(), item->lib_def.get(),
      graph_options.optimizer_options(), worker_env_->compute_pool, cluster_flr,
      /*session_metadata=*/nullptr,
      Rendezvous::Factory{
          [this, session](const int64_t step_id, const DeviceMgr*,
                          Rendezvous** r) -> Status {
            auto* remote_r = this->worker_env_->rendezvous_mgr->Find(step_id);
            TF_RETURN_IF_ERROR(remote_r->Initialize(session));
            *r = remote_r;
            return Status::OK(a); },this] (const int64_t step_id) {
            this->worker_env_->rendezvous_mgr->Cleanup(step_id);
            return Status::OK(a); }}));// Constructs the graph out of "gdef".
  Graph graph(OpRegistry::Global());
  GraphConstructorOptions opts;
  opts.allow_internal_ops = true;
  opts.expect_device_spec = true;
  opts.validate_nodes = true;
  TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(opts, gdef, &graph));

  // Splits "graph" into multiple subgraphs by device names.
  std::unordered_map<string, GraphDef> partitions;
  PartitionOptions popts;
  popts.node_to_loc = SplitByDevice; // this is called
  popts.new_name = [this] (const string& prefix) {
    mutex_lock l(mu_);
    return strings::StrCat(prefix, "_G", next_id_++);
  };
  popts.get_incarnation = [this] (const string& name) -> int64 {
    Device* device = nullptr;
    Status s = device_mgr_->LookupDevice(name, &device);
    if (s.ok()) {
      return device->attributes().incarnation(a); }else {
      returnPartitionOptions::kIllegalIncarnation; }}; popts.flib_def = item->lib_def.get(a); popts.control_flow_added =true;
  popts.scheduling_for_recvs = graph_options.enable_recv_scheduling(a);TF_RETURN_IF_ERROR(Partition(popts, &graph, &partitions));
  if (popts.scheduling_for_recvs) {
    TF_RETURN_IF_ERROR(AddControlEdges(popts, &partitions));
  }

  std::unordered_map<string, std::unique_ptr<Graph>> partition_graphs;
  // Graph conversion for each partition
  for (auto& partition : partitions) {
    std::unique_ptr<Graph> device_graph(new Graph(OpRegistry::Global()));
    GraphConstructorOptions device_opts;
    // There are internal operations (e.g., send/recv) that we now allow.
    device_opts.allow_internal_ops = true;
    device_opts.expect_device_spec = true;
    TF_RETURN_IF_ERROR(ConvertGraphDefToGraph(
        device_opts, std::move(partition.second), device_graph.get()));
    partition_graphs.emplace(partition.first, std::move(device_graph));
  }

  GraphOptimizationPassOptions optimization_options;
  optimization_options.flib_def = item->lib_def.get(a); optimization_options.partition_graphs = &partition_graphs;TF_RETURN_IF_ERROR(OptimizationPassRegistry::Global() - >RunGrouping(
      OptimizationPassRegistry::POST_PARTITIONING, optimization_options));

  LocalExecutorParams params;

  item->units.reserve(partitions.size());
  item->graph_mgr = this;
  const auto& optimizer_opts = graph_options.optimizer_options(a);GraphOptimizer optimizer(optimizer_opts);
  for (auto& p : partition_graphs) {
    const string& device_name = p.first;
    std::unique_ptr<Graph>& subgraph = p.second;
    item->units.resize(item->units.size() + 1);
    ExecutionUnit* unit = &(item->units.back());

    // Find the device.
    Status s = device_mgr_->LookupDevice(device_name, &unit->device);
    if(! s.ok()) {
      // Remove the empty unit from the item as the item destructor wants all
      // units to have valid devices.
      item->units.pop_back(a);return s;
    }

    // see if you need to rewrite the diagram
    // Give the device an opportunity to rewrite its subgraph.
    TF_RETURN_IF_ERROR(unit->device->MaybeRewriteGraph(&subgraph));

    // Top-level nodes in the graph uses the op segment to cache
    // kernels. Therefore, as long as the executor is alive, we need
    // to ensure the kernels cached for the session are alive.
    auto opseg = unit->device->op_segment(a); opseg->AddHold(handle);

    // Function library runtime.
    FunctionLibraryRuntime* lib = item->proc_flr->GetFLR(unit->device->name());

    / / create executor
    // Construct the root executor for the subgraph.
    params.device = unit->device;
    params.function_library = lib;
    params.create_kernel =
        [handle, lib, opseg](const std::shared_ptr<const NodeProperties>& props,
                             OpKernel** kernel) {
          // NOTE(mrry): We must not share function kernels (implemented
          // using `CallOp`) between subgraphs, because `CallOp::handle_`
          // is tied to a particular subgraph. Even if the function itself
          // is stateful, the `CallOp` that invokes it is not.
          if(! OpSegment::ShouldOwnKernel(lib, props->node_def.op())) {
            return lib->CreateKernel(props, kernel);
          }
          auto create_fn = [lib, &props](OpKernel** kernel) {
            return lib->CreateKernel(props, kernel);
          };
          // Kernels created for subgraph nodes need to be cached. On
          // cache miss, create_fn() is invoked to create a kernel based
          // on the function library here + global op registry.
          return opseg->FindOrCreate(handle, props->node_def.name(), kernel,
                                     create_fn);
        };
    params.delete_kernel = [lib](OpKernel* kernel) {
      if(kernel && ! OpSegment::ShouldOwnKernel(lib, kernel->type_string())) {
        deletekernel; }};/ / optimization figure
    optimizer.Optimize(lib, worker_env_->env, params.device, &subgraph,
                       GraphOptimizer::Options());

    TF_RETURN_IF_ERROR(
        EnsureMemoryTypes(DeviceType(unit->device->device_type()),
                          unit->device->name(), subgraph.get()));
    unit->graph = std::move(subgraph);
    unit->build_cost_model = graph_options.build_cost_model(a);if (unit->build_cost_model > 0) {
      skip_cost_models_ = false;
    }
    TF_RETURN_IF_ERROR(NewLocalExecutor(params, *unit->graph, &unit->root));
  }
  return Status::OK(a); }Copy the code

One thing to note above is the use of SplitByDevice for the second shard of the graph, this time by device.

// NOTE: node->device_name() is not set by GraphConstructor.  We
// expects that NodeDef in GraphDef given to workers fully specifies
// device names.
static string SplitByDevice(const Node* node) {
  return node->assigned_device_name(a); }inline const std::string& Node::assigned_device_name(a) const {
  return graph_->get_assigned_device_name(*this);
}
Copy the code

The result of the registration graph is roughly as follows, which is to generate an Item using various information from the Master and register it in GraphMgr. Meanwhile, an ExecutionUnit is also generated for the Item, where Graph_handle is generated according to the handle.

After the subgraph is registered, you can run the subgraph later.

3. Run the subgraph

The Master uses the RunGraphRequest to execute all subgraphs registered under Graph_handle. The Master generates a globally unique step_ID to distinguish between the different running steps of the graph calculation. Subgraphs can communicate with each other (for example, send/forward operations) using step_id to distinguish tensors produced by different runs.

The send of the RunGraphRequest message indicates the subgraph input tensor, and recv_key indicates the subgraph output tensor. RunGraphResponse will return the Tensor list for recv_key.

3.1 the Service

First came to the GrpcWorkerService, calls to the “. / tensorflow WorkerService/RunGraph “, the corresponding code is:

void RunGraphHandler(WorkerCall<RunGraphRequest, RunGraphResponse>* call) {
  // Use Schedule to place computation tasks in the thread pool queue
  Schedule([this, call]() {
    CallOptions* call_opts = new CallOptions;
    ProtoRunGraphRequest* wrapped_request =
        new ProtoRunGraphRequest(&call->request);
    NonOwnedProtoRunGraphResponse* wrapped_response =
        new NonOwnedProtoRunGraphResponse(&call->response);
    call->SetCancelCallback([call_opts]() { call_opts->StartCancel(a); }); worker_->RunGraphAsync(call_opts, wrapped_request, wrapped_response,
                           [call, call_opts, wrapped_request,
                            wrapped_response](const Status& s) {
                             call->ClearCancelCallback(a);delete call_opts;
                             delete wrapped_request;
                             delete wrapped_response;
                             call->SendResponse(ToGrpcStatus(s));
                           });
  });
  ENQUEUE_REQUEST(RunGraph, true);
}
Copy the code

In this case, the computation is put into a thread pool queue, with the business logic in the Worker::RunGraphAsync function.

void Schedule(std::function<void()> f) {
  worker_->env()->compute_pool->Schedule(std::move(f));
}
Copy the code

3.2 GrpcWorker

Within RunGraphAsync, there are two ways to execute, and we chose DoRunGraph for analysis.

void Worker::RunGraphAsync(CallOptions* opts, RunGraphRequestWrapper* request, MutableRunGraphResponseWrapper* response, StatusCallback done) {
  if (request->store_errors_in_response_body()) {
    done = [response, done](const Status& status) {
      response->set_status(status);
      done(Status::OK());
    };
  }
  if (request->is_partial()) {
    DoPartialRunGraph(opts, request, response, std::move(done)); // If you are interested, you can go further
  } else {
    DoRunGraph(opts, request, response, std::move(done)); // Analyze here}}Copy the code

DoRunGraph basically calls session-> Graph_Mgr ()->ExecuteAsync to execute the computed graph.

void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request, MutableRunGraphResponseWrapper* response, StatusCallback done) {
  const int64_t step_id = request->step_id(a); Status s = recent_request_ids_.TrackUnique(request->request_id(),
                                             "RunGraph (Worker)", request);
  if(! s.ok()) {
    done(s);
    return;
  }

  std::shared_ptr<WorkerSession> session;
  if (request->create_worker_session_called()) {
    s = env_->session_mgr->WorkerSessionForSession(request->session_handle(),
                                                   &session);
  } else {
    session = env_->session_mgr->LegacySession(a); }if(! s.ok()) {
    done(s);
    return;
  }
  GraphMgr::NamedTensors in;
  GraphMgr::NamedTensors* out = new GraphMgr::NamedTensors;
  s = PrepareRunGraph(request, &in, out);
  if(! s.ok()) {
    delete out;
    done(s);
    return;
  }
  StepStatsCollector* collector = nullptr;
  if (request->exec_opts().report_tensor_allocations_upon_oom() ||
      request->exec_opts().record_timeline() ||
      request->exec_opts().record_costs()) {
    collector = new StepStatsCollector(response->mutable_step_stats());
  }
  DeviceProfilerSession* device_profiler_session = nullptr;
  if (collector && request->exec_opts().record_timeline()) {
    // If timeline was requested, assume we want hardware level tracing.
    device_profiler_session = DeviceProfilerSession::Create().release(a); } CancellationManager* cm =new CancellationManager;
  opts->SetCancelCallback([this, cm, step_id]() {
    cm->StartCancel(a);AbortStep(step_id);
  });
  CancellationToken token;
  token = cancellation_manager_.get_cancellation_token(a);boolalready_cancelled = ! cancellation_manager_.RegisterCallback(
      token, [cm]() { cm->StartCancel(a); });if (already_cancelled) {
    opts->ClearCancelCallback(a);delete cm;
    delete collector;
    delete device_profiler_session;
    delete out;
    done(errors::Aborted("Call was aborted"));
    return;
  }
  session->graph_mgr() - >ExecuteAsync(
      request->graph_handle(), step_id, session.get(), request->exec_opts(),
      collector, response, cm, in,
      [this, step_id, response, session, cm, out, token, collector,
       device_profiler_session, opts, done](const Status& status) {
        Status s = status;
        if (s.ok()) {
          // Accept tensors
          s = session->graph_mgr() - >RecvOutputs(step_id, out);
        }

        opts->ClearCancelCallback(a); cancellation_manager_.DeregisterCallback(token);
        delete cm;

        if (device_profiler_session) {
          device_profiler_session->CollectData(response->mutable_step_stats()).IgnoreError(a); }if (s.ok()) {
          for (const auto& p : *out) {
            const string& key = p.first;
            const Tensor& val = p.second;
            response->AddRecv(key, val); }}if (collector) collector->Finalize(a);delete collector;
        delete device_profiler_session;
        delete out;
        done(s);
      });
}
Copy the code

3.3 GraphMgr

ExecuteAsync calls StartParallelExecutors to complete parallel computing. The logic is as follows:

  • Find a subgraph;
  • Calculate the subgraph cost;
  • Generate a rendezvous, use this session to initialize rendezvous, use this session to communicate, rendezvous uses this session to communicate;
  • Send the tensor to Rendezvous;
  • Call StartParallelExecutors to execute the subgraph;
void GraphMgr::ExecuteAsync(const string& handle, const int64_t step_id,
                            WorkerSession* session, const ExecutorOpts& opts,
                            StepStatsCollector* collector,
                            MutableRunGraphResponseWrapper* response,
                            CancellationManager* cancellation_manager,
                            const NamedTensors& in, StatusCallback done) {
  const uint64 start_time_usecs = Env::Default() - >NowMicros(a);profiler::TraceMeProducer activity(
      // To TraceMeConsumers in ExecutorState::Process/Finish or RunGraphDone.
      [step_id] {
        return profiler::TraceMeEncode(
            "RunGraph", {{"id", step_id}, {"_r".1} /*root_event*/});
      },
      profiler::ContextType::kTfExecutor, step_id,
      profiler::TraceMeLevel::kInfo);
  
  // Lookup an item. Holds one ref while executing.
  // Find a subgraph
  Item* item = nullptr;
  {
    mutex_lock l(mu_);
    auto iter = table_.find(handle);
    if(iter ! = table_.end()) {
      item = iter->second;
      item->Ref();
    }
  }
 
  / / calculate the cost
  CostGraphDef* cost_graph = nullptr;
  if(response ! =nullptr) {
    cost_graph = response->mutable_cost_graph(a);if (opts.record_partition_graphs()) {
      for (const ExecutionUnit& unit : item->units) {
        GraphDef graph_def;
        unit.graph->ToGraphDef(&graph_def);
        response->AddPartitionGraph(graph_def); }}}// Generate a rendezvous
  RemoteRendezvous* rendezvous = worker_env_->rendezvous_mgr->Find(step_id);
  // Rendezvous will be initialized with this session, and subsequent rendezvous will be communicated with this session
  Status s = rendezvous->Initialize(session); CollectiveExecutor::Handle* ce_handle = item->collective_graph_key ! = BuildGraphOptions::kNoCollectiveGraphKey ?new CollectiveExecutor::Handle(
                worker_env_->collective_executor_mgr->FindOrCreate(step_id),
                true)
          : nullptr;
  // Sends values specified by the caller.
  // Send the tensor to Rendezvous
  size_t input_size = 0;
  if (s.ok()) {
    std::vector<string> keys;
    std::vector<Tensor> tensors_to_send;
    keys.reserve(in.size());
    tensors_to_send.reserve(in.size());
    for (auto& p : in) {
      keys.push_back(p.first);
      tensors_to_send.push_back(p.second);
      input_size += p.second.AllocatedBytes(a); }// Send the tensor
    s = SendTensorsToRendezvous(rendezvous, nullptr, {}, keys, tensors_to_send);
  }

  if(! s.ok()) {
    done(s);
    delete ce_handle;
    item->Unref(a); rendezvous->Unref(a);return;
  }

  // Execute the subgraph
  StartParallelExecutors(
      handle, step_id, item, rendezvous, ce_handle, collector, cost_graph,
      cancellation_manager, session, start_time_usecs,
      [item, rendezvous, ce_handle, done, start_time_usecs, input_size,
       step_id](const Status& s) {
        profiler::TraceMeConsumer activity(
            // From TraceMeProducer in GraphMgr::ExecuteAsync.
            [step_id] {
              return profiler::TraceMeEncode("RunGraphDone", {{"id", step_id}});
            },
            profiler::ContextType::kTfExecutor, step_id,
            profiler::TraceMeLevel::kInfo);
        done(s);
        metrics::RecordGraphInputTensors(input_size);
        metrics::UpdateGraphExecTime(Env::Default() - >NowMicros() -
                                     start_time_usecs);
        rendezvous->Unref(a); item->Unref(a);delete ce_handle;
      });
}
Copy the code

ExecuteAsync uses handle to find the Item to find the graph. Where session is used for communication and execution, step_id is related to communication, see the above code for details.

StartParallelExecutors will start an ExecutorBarrier. When a computing device completes executing the PartitionGraph assigned, the ExecutorBarrier counter increases by 1, and if all devices complete executing the PartitionGraph list, the barrier.wait() blocking operation exits.

void GraphMgr::StartParallelExecutors(
    const string& handle, int64_t step_id, Item* item, Rendezvous* rendezvous,
    CollectiveExecutor::Handle* ce_handle, StepStatsCollector* collector,
    CostGraphDef* cost_graph, CancellationManager* cancellation_manager,
    WorkerSession* session, int64_t start_time_usecs, StatusCallback done) {
  const int num_units = item->units.size(a); ScopedStepContainer* step_container =new ScopedStepContainer(
      step_id,
      [this] (const string& name) { device_mgr_->ClearContainers({name}); });

  ExecutorBarrier* barrier =
      new ExecutorBarrier(num_units, rendezvous,
                          [this, item, collector, cost_graph, step_container,
                           done](const Status& s) {
                            BuildCostModel(item, collector, cost_graph);
                            done(s);
                            delete step_container;
                          });
  Executor::Args args;
  args.step_id = step_id;
  args.rendezvous = rendezvous;
  args.collective_executor = ce_handle ? ce_handle->get() : nullptr;
  args.cancellation_manager = cancellation_manager;
  args.stats_collector = collector;
  args.step_container = step_container;
  args.sync_on_finish = sync_on_finish_;
  args.start_time_usecs = start_time_usecs;
  if (LogMemory::IsEnabled()) {
    LogMemory::RecordStep(args.step_id, handle);
  }
  thread::ThreadPool* pool = worker_env_->compute_pool;
  using std::placeholders::_1;
  // Line below is equivalent to this code, but does one less indirect call:
  // args.runner = [pool](std::function
      
        fn) { pool->Schedule(fn); };
      ()>
  auto default_runner = std::bind(&thread::ThreadPool::Schedule, pool, _1);
  for (const auto& unit : item->units) {
    thread::ThreadPool* device_thread_pool =
        unit.device->tensorflow_device_thread_pool(a);if(! device_thread_pool) { args.runner = default_runner; }else {
      args.runner =
          std::bind(&thread::ThreadPool::Schedule, device_thread_pool, _1);
    }
    unit.root->RunAsync(args, barrier->Get()); }}Copy the code

3.4 summary

For the registration/run subgraph, we summarize it with a diagram.

Figure 1 Register/run subgraph

4. To summarize

We use a diagram to summarize the entire distributed computing process as follows:

Figure 2 distributed computing flow

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference