After the computation graph is divided between devices, there may be data dependencies between partitiongraphs across devices, so TF inserts Send/Recv nodes between them to complete data interaction. In distributed mode, Send/Recv performs data exchange via RpcRemoteRendezvous, so we need to first look at the data exchange mechanism Rendezvous in TF.

Rendezvous, which we have seen so far in distributed machine learning, mostly in the context of elasticity and communication, has a slightly different but similar basic meaning, derived from the Original French word: Rendezvous, gathering, assembly, Rendezvous, etc. TensorFlow Rendezvous is the communication component and exchange mechanism for message transport.

This paper still draws on two great gods:

  • [TensorFlow Internals] (github.com/horance-liu…). Although its analysis is not the latest code, it is recommended that friends who are interested in the internal implementation mechanism of TF read it, and they will definitely gain a lot.
  • Home.cnblogs.com/u/deep-lear… It’s not just TensorFlow, but there are a lot of other areas that are at the forefront of the industry.

The other articles in this series are:

Heterogeneous Distribute Learning based on TensorFlow distributed thesis [翻译

Implementation of Control Flow in TensorFlow

TensorFlow Distributed environment (1) — overall architecture

TensorFlow distributed environment (2)– Master static logic

TensorFlow distributed environment (3)- Worker static logic

TensorFlow distributed environment (4) — WorkerCache

TensorFlow distributed environment (5) — Session

TensorFlow distributed environment (6) — Master dynamic logic

TensorFlow distributed environment (7) — Worker dynamic logic

Mechanism of 1.

In distributed mode, edges across devices are split, and Send nodes and Recv nodes are inserted at the sending and receiving ends of the edges, respectively.

  • Intra-process Send and Recv nodes implement data exchange via IntraProcessRendezvous.
  • The Send and Recv nodes between processes exchange data using GrpcRemoteRendezvous.

We assume that Worker 0 has two Gpus. When the Send node and Recv node are inserted, the effect is as follows, where Worker 1 is sent to the representative processes of workers to realize data exchange through GrpcRemoteRendezvous. Dotted arrows between two Gpus in Worker 0 represent intra-process data exchange via IntraProcessRendezvous, and solid arrows between workers represent data exchange using RPC.

When executing a step, if two workers need to exchange data, then:

  • The producer Sender will make a tensor and put it into the local Table.
  • The consumer Receiver sends the RecvTensorRequest message with the binary (step_id, rendezvous_key) to the producer
  • The Worker on the producer side takes the corresponding Tensor data from the local Table and returns it via RecvTensorResponse.

The data transmission of SEND/RECV is accomplished through a derived class of WorkerInterface, which is based on the underlying gRPC communication library.

Figure 1 Send/receive

1.1 Message Identifier

When we learned about PyTorch distribution, we learned that each distributed communication needs a globally unique identifier, such as:

  • Use autogradMessageId to represent a pair of send/ Recv autograd functions. Each Send-recv pair is assigned a globally unique autograd_message_id that uniquely identifies the send-Recv pair. This is useful for finding the corresponding function on the remote node during backward propagation.
  • This container is also responsible for maintaining globally unique message ids that are used to associate the send/receive automatic differential function pairs. The format is a 64-bit integer, the first 16 bits are the worker ID, and the last 48 bits are the integer automatically incremented inside the worker.

Similarly, TF needs to determine a unique identifier for each Send/Recv Pair to prevent message misplacement when multiple sets of messages are sent in parallel. This identifier is ParsedKey.

1.1.1 definition

Its definition is as follows:

  • Src_device: indicates the sending device.
  • SRC: Has the same information as src_device, but is represented as a structure.
  • Src_database: Used to debug, when a worker restarts, this value will change so that the worker that died can be distinguished.
  • Dst_device: indicates the receiver device.
  • DST: The same information as dST_device, but represented as a structure.
  • Edge_name: edge name, which can be a tensor name or a string of some special meaning.
// Parses the key constructed by CreateKey and parse src/dst device
// names into structures respectively.
struct ParsedKey {
  StringPiece src_device;
  DeviceNameUtils::ParsedName src;
  uint64 src_incarnation = 0;
  StringPiece dst_device;
  DeviceNameUtils::ParsedName dst;
  StringPiece edge_name;

  ParsedKey() {}
  ParsedKey(const ParsedKey& b) { *this = b; }

  ParsedKey& operator= (const ParsedKey& b);
  StringPiece FullKey(a) const { return buf_; }

 private:
  friend class Rendezvous;
  friend class SendOp;
  friend class RecvOp;
  std::string buf_;
};
Copy the code

1.1.2 to create

The key string is generated as follows:

src_device ; HexString(src_incarnation) ; dst_device ; name ; frame_iter.frame_id : frame_iter.iter_id
Copy the code

The specific code is as follows:

/* static */
string Rendezvous::CreateKey(const string& src_device, uint64 src_incarnation,
                             const string& dst_device, const string& name,
                             const FrameAndIter& frame_iter) {
  // NOTE:'; ' is not used in the device name's job name.
  //
  // We include both sender and receiver in the key to facilitate
  // debugging. For correctness, we only need to encode the receiver.
  //
  // "src_incarnation" is used to distinguish a worker when it
  // restarts.
  char buf[strings::kFastToBufferSize];
  return strings::StrCat(
      src_device, ";", strings::Uint64ToHexString(src_incarnation, buf), ";",
      dst_device, ";", name, ";", frame_iter.frame_id, ":", frame_iter.iter_id);
}
Copy the code

The system then uses the ParseKey method to parse the key and generate a ParsedKey. Frame_iter. Frame_id: frame_iter. Iter_id ParseKey maps the first four fields of the input key. Everything else is literal, except that edge_name corresponds to name.

/* static */
Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
  if (key.data() == out->buf_.data()) {
    // Caller used our buf_ string directly, so we don't need to copy. (The
    // SendOp and RecvOp implementations do this, for example).
    DCHECK_EQ(key.size(), out->buf_.size());
  } else {
    // Make a copy that our StringPieces can point at a copy that will persist
    // for the lifetime of the ParsedKey object.
    out->buf_.assign(key.data(), key.size());
  }
  StringPiece s(out->buf_);
  StringPiece parts[5];
  for (int i = 0; i < 5; i++) {
    parts[i] = ConsumeNextPart(&s, '; ');
  }
  if (s.empty() &&          // Consumed the whole string! parts[4].empty() &&  // Exactly five parts
      DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
      strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
      DeviceNameUtils::ParseFullName(parts[2], &out->dst) && ! parts[3].empty()) {
    out->src_device = StringPiece(parts[0].data(), parts[0].size());
    out->dst_device = StringPiece(parts[2].data(), parts[2].size());
    out->edge_name = StringPiece(parts[3].data(), parts[3].size());
    return Status::OK(a); }return errors::InvalidArgument("Invalid rendezvous key: ", key);
}
Copy the code

1.2 Rendezvous

Rendezvous is an abstraction for passing tensors from producers to consumers. A rendezvous is a table of channels. Each channel is marked by a Rendezvous key. The key is encoded as a < producer, consumer > pair, where producer and consumer are tensorflow devices.

The producer calls the Send() method to Send a tensor on a named channel. The consumer calls the Recv() method to receive a tensor from a specified channel. A sequence of tensors can be passed from producer to consumer. The consumer receives them in the order the producer sends them.

Consumers can safely request tensors before or after they are generated. The consumer can choose to make blocking calls or provide callbacks: in either case, the consumer will receive the tensor as it becomes available. The producer never blocks.

1.2.1 interface class

RendezvousInterface is the interface class that defines the virtual functions. ParsedKey is also defined here (we omit this part of the code).

class RendezvousInterface {
 public:
  struct Args {
    DeviceContext* device_context = nullptr;
    AllocatorAttributes alloc_attrs;
    CancellationManager* cancellation_manager = nullptr;  // not owned.
  };

  // The caller is a tensor producer and it sends a message (a tensor
  // "val" and a bool "is_dead") under the given "key".
  //
  // {val, is_dead} is bundled as a message sent and received.
  // Typically, is_dead is set by some control flow nodes
  // (e.g., a not-taken branch). args is passed by Send to the
  // Recv function to communicate any information that the Recv
  // function might need. This is typically only necessary for
  // Send/Recv on the same worker.
  //
  // Send() never blocks.
  virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val,
                      const bool is_dead) = 0;

  // Callback provided by a tensor consumer waiting on the rendezvous.
  // It will be invoked when the tensor is available, or when a non-OK
  // status arises in the production of that tensor. It also gets
  // two Rendezvous::Args, one provided by the sender, the other by the
  // receiver, which may be needed when a non-CPU device is in use
  // by either side.
  typedef std::function<void(const Status&, const Args&, const Args&,
                             const Tensor&, const bool)>
      DoneCallback;

  virtual void RecvAsync(const ParsedKey& key, const Args& args,
                         DoneCallback done) = 0;

  // Synchronous wrapper for RecvAsync.
  Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
              bool* is_dead, int64_t timeout_ms);
  Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
              bool* is_dead);

  // Aborts all pending and future Send/Recv with the given "status".
  // StartAbort() does not wait for ongoing calls to finish.
  // REQUIRES: ! status.ok()
  virtual void StartAbort(const Status& status) = 0;

 protected:
  virtual ~RendezvousInterface(a);virtual bool is_cross_process(a) { return false; }
  friend class ProcessFunctionLibraryRuntime;
};
Copy the code

1.2.2 Basic Implementation of Rendezvous

The Rendezvous class provides the most basic implementations of Send, Recv, and RecvAsync, as well as ParseKey functionality.

// A reference-counted implementation of RendezvousInterface.
//
// This class is used in cases where a rendezvous may be shared between multiple
// threads with no clear owner.
class Rendezvous : public RendezvousInterface, public core::RefCounted {
 public:
  class Factory {
   public:
    // Default to a factory that evaluates to false.
    Factory() : valid_(false) {}

    Factory(std::function<Status(const int64_t.const DeviceMgr*, Rendezvous**)>
                create_fn,
            std::function<Status(const int64_t)> cleanup_fn)
        : valid_(true),
          create_fn_(std::move(create_fn)),
          cleanup_fn_(std::move(cleanup_fn)) {}

    // If no clean up fn is provided, just put in a dummy.
    // For backwards compatibility.
    explicit Factory(
        std::function<Status(const int64_t.const DeviceMgr*, Rendezvous**)>
            create_fn)
        : valid_(true),
          create_fn_(std::move(create_fn)),
          cleanup_fn_([](const int64_t step_id) { return Status::OK(a); {}})explicit operator bool(a) const { return valid_; }

    Status operator(a)(const int64_t step_id, const DeviceMgr* device_mgr,
                      Rendezvous** rendez) const {
      return create_fn_(step_id, device_mgr, rendez);
    }

    Status CleanUp(const int64_t step_id) const { return cleanup_fn_(step_id); }

   private:
    bool valid_;
    std::function<Status(const int64_t.const DeviceMgr*, Rendezvous**)>
        create_fn_;
    std::function<Status(const int64_t)> cleanup_fn_;
  };

  // Constructs a rendezvous key for the tensor of "name" sent from
  // "src_device" to "dst_device". The tensor is generated in the frame
  // and iteration specified by "frame_iter".
  static std::string CreateKey(const std::string& src_device,
                               uint64 src_incarnation,
                               const std::string& dst_device,
                               const std::string& name,
                               const FrameAndIter& frame_iter);

  static Status ParseKey(StringPiece key, ParsedKey* out);
};
Copy the code

1.2.3 Cross-process RemoteRendezvous

RemoteRendezvous inherits Rendezvous by adding only a pure virtual Initialize method. All derived classes that communicate across processes need to override this function because Session initialization is required.

RemoteRendezvous can handle producer or consumer situations between two remote processes, adding the ability to coordinate with remote workers. RemoteRendezvous follows a two-stage initialization strategy: First, the object is built. Eventually, they will be initialized. The rendezvous Interface client must ensure that the Nitialize method is ultimately called on the RemoteRendezvous returned.

// RemoteRendezvous follow a 2-part initialization. First the objects are
// constructed. Eventually, they will be initialized. Clients of the
// RendezvousMgrInterface must guarantee to call Initialize on the returned
// RemoteRendezvous eventually.
//
// Partially initialized RemoteRendezvous must respect the Rendezvous interface
// (i.e. Send() must never block), however implementations are not expected to
// actually perform the underlying operations until after the RemoteRendezvous
// has been Initialize'd.
class RemoteRendezvous : public Rendezvous {
 public:
  // Fully construct the RemoteRendezvous.
  virtual Status Initialize(WorkerSession* session) = 0;

 protected:
  bool is_cross_process(a) override { return true; }};Copy the code

1. BaseRemoteRendezvous

Because of the different protocols that exist for cross-process communication, the various Rendezvous Rendezvous Rendezvous will need to be implemented according to their own protocols. So TF added an intermediate layer, BaseRemoteRendezvous, between RemoteRendezvous and truly specialized Rendezvous, which served as a bridge between the beginning and the end, providing common Send and Recv methods, You can reuse as much code as possible.

The main member variable of BaseRemoteRendezvous is Rendezvous* local_, and the code makes extensive use of BaseRecvTensorCall as a parameter, which is an entity abstraction of communication.

// RemoteRendezvous is a Rendezvous which can handle either
// the producer or consumer being in a remote process.
//
// Buffering of Tensor values is delegated to a "local" Rendezvous
// obtained from NewLocalRendezvous(). This class just adds
// functionality to coordinate with remote workers.
class BaseRemoteRendezvous : public RemoteRendezvous {
 public:
  BaseRemoteRendezvous(const WorkerEnv* env, int64_t step_id);

  // Upgrades the BaseRemoteRendezvous to full initialization.
  Status Initialize(WorkerSession* session) override;

  // Forwards to local_, where the Tensor "val" will be buffered and
  // any waiting callback stored.
  Status Send(const ParsedKey& key, const Rendezvous::Args& args,
              const Tensor& val, const bool is_dead) override;

  // This method is called only by the RecvOp. It tests to see
  // whether the value will be produced by a local or remote device
  // and handles accordingly. In the local case it forwards to
  // local_, in the remote case it initiates an RPC request.
  void RecvAsync(const ParsedKey& key, const Rendezvous::Args& args,
                 DoneCallback done) override;

  void StartAbort(const Status& status) override;

  // This method is called only by the local Worker, forwarded through
  // the same method on RendezvousMgr. This occurs when the Worker
  // has received a RecvTensor request, either locally or over the
  // network. In either case it needs to retrieve a locally buffered
  // value from local_, and give it to its caller.
  //
  // Runs "done" as soon as the tensor for "parsed" is available or an error
  // is detected.
  //
  // REQUIRES: "parsed" is one that will be Saved into the local rendezvous.
  void RecvLocalAsync(const ParsedKey& parsed, DoneCallback done);

 protected:
  virtual void RecvFromRemoteAsync(const Rendezvous::ParsedKey& parsed,
                                   const Rendezvous::Args& args,
                                   DoneCallback done) = 0;

  // Returns true if "src" and "dst" are located in the same worker,
  // and hence may use a local rendezvous.
  virtual bool IsSameWorker(DeviceNameUtils::ParsedName src, DeviceNameUtils::ParsedName dst);

  // If aborted, aborts "call". Otherwise, adds "call" into active_.
  void RegisterCall(BaseRecvTensorCall* call, const Rendezvous::Args& args);

  // Removes "call" from active_ if "call" is in active_.
  void DeregisterCall(BaseRecvTensorCall* call);

  WorkerSession* session(a);

  bool is_initialized(a);

  ~BaseRemoteRendezvous(a)override;

  const WorkerEnv* const env_;  // Not owned.
  const int64_t step_id_;

 private:
  Rendezvous* local_;  // Owns a Ref on this object.

  mutable mutex mu_;

  // Status given by StartAbort() if any.
  Status status_ TF_GUARDED_BY(mu_);

  WorkerSession* session_ TF_GUARDED_BY(mu_);  // Not owned.

  // Data structures to handle calls when partially initialized.
  struct DeferredCall {
    const ParsedKey parsed;
    DoneCallback done;

    DeferredCall(const ParsedKey& parsed, DoneCallback done);
  };
  std::vector<DeferredCall> deferred_calls_ TF_GUARDED_BY(mu_);

  typedef std::function<void()> InactiveCallback;

  std::unordered_map<BaseRecvTensorCall*, InactiveCallback> active_
      TF_GUARDED_BY(mu_);

  bool is_initialized_locked(a) TF_SHARED_LOCKS_REQUIRED(mu_) {
    returnsession_ ! =nullptr;
  }

  // If "is_src" is true, checks that the rendezvous key "parsed"'s
  // source is in this process. If "is_src" is false, checks that the
  // rendezvous key "parsed"'s destination is in this process.
  Status ValidateDevices(const Rendezvous::ParsedKey& parsed, bool is_src);

  // Callback handling the case when a rendezvous has been
  // accomplished in local_ and the consumer is local to this process.
  // Tensor "in" will be copied into "out". The key "parsed" encodes
  // the src and dst devices.
  void SameWorkerRecvDone(const Rendezvous::ParsedKey& parsed,
                          const Rendezvous::Args& in_args,
                          const Rendezvous::Args& out_args, const Tensor& in,
                          Tensor* out, StatusCallback done);

  // Must be called only if fully initialized.
  void RecvLocalAsyncInternal(const ParsedKey& parsed, DoneCallback done);

  TF_DISALLOW_COPY_AND_ASSIGN(BaseRemoteRendezvous);
};

class BaseRecvTensorCall {
 public:
  BaseRecvTensorCall() {}
  virtual ~BaseRecvTensorCall() {}
  virtual void Start(std::function<void()> recv_done) = 0;
  virtual void StartAbort(const Status& s) = 0;
  virtual Status status(a) const = 0;
 private:
  TF_DISALLOW_COPY_AND_ASSIGN(BaseRecvTensorCall);
};
Copy the code

A Local Rendezvous was built at creation time, which was used to perform basic services.

BaseRemoteRendezvous::BaseRemoteRendezvous(const WorkerEnv* env,
                                           int64_t step_id)
    : env_(env),
      step_id_(step_id),
      local_(NewLocalRendezvous()),
      session_(nullptr) {}

Rendezvous* NewLocalRendezvous(a) { return new LocalRendezvousWrapper; }
Copy the code

LocalRendezvousWrapper is defined as follows:

class LocalRendezvousWrapper : public Rendezvous {
 public:
  LocalRendezvousWrapper() : impl_(this) {}

  Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
              const bool is_dead) override {
    return impl_.Send(key, send_args, val, is_dead);
  }

  void RecvAsync(const ParsedKey& key, const Args& recv_args,
                 DoneCallback done) override {
    impl_.RecvAsync(key, recv_args, std::move(done));
  }

  void StartAbort(const Status& status) override { impl_.StartAbort(status); }

 private:
  LocalRendezvous impl_;

  TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvousWrapper);
};
Copy the code

Let’s look at the BaseRemoteRendezvous initialization method, which does the basic configuration, such as setting up the session.

Status BaseRemoteRendezvous::Initialize(WorkerSession* session) {
  std::vector<DeferredCall> deferred_calls;
  {
    mutex_lock l(mu_);
    if(session_ ! =nullptr) {
      if (session_->worker_name() == session->worker_name()) {
        return Status::OK(a); } Status s = errors::Internal(
          "Double init! Worker names would have changed from: ",
          session_->worker_name(), "- >", session->worker_name());
      return s;
    }
    session_ = session;
    std::swap(deferred_calls, deferred_calls_);
  }
  for (auto& call : deferred_calls) {
    RecvLocalAsyncInternal(call.parsed, std::move(call.done));
  }
  return Status::OK(a); }Copy the code

1.2.5 RpcRemoteRendezvous

RpcRemoteRendezvous is the gRPC protocol implementation of RemoteRendezvous.

class RpcRemoteRendezvous : public BaseRemoteRendezvous {
 public:
  RpcRemoteRendezvous(const WorkerEnv* env, int64_t step_id)
      : BaseRemoteRendezvous(env, step_id) {}

 protected:
  void RecvFromRemoteAsync(const Rendezvous::ParsedKey& parsed,
                           const Rendezvous::Args& args,
                           DoneCallback done) override;

 private:
  ~RpcRemoteRendezvous(a)override {}

  TF_DISALLOW_COPY_AND_ASSIGN(RpcRemoteRendezvous);
};
Copy the code

The corresponding derived class of BaseRecvTensorCall is RpcRecvTensorCall.

// Used only to retrieve tensors from remote processes.
class RpcRecvTensorCall : public BaseRecvTensorCall {
 public:
  RpcRecvTensorCall() : wi_(nullptr), dst_device_(nullptr) {}

  void Init(WorkerInterface* wi, int64_t step_id, StringPiece key,
            AllocatorAttributes alloc_attrs, Device* dst_device,
            const Rendezvous::Args& recv_args, Rendezvous::DoneCallback done) {
    wi_ = wi;
    alloc_attrs_ = alloc_attrs;
    dst_device_ = dst_device;
    recv_args_ = recv_args;
    done_ = std::move(done);
    req_.set_step_id(step_id);
    req_.set_rendezvous_key(key.data(), key.size());
    req_.set_request_id(GetUniqueRequestId());
  }

  void Reset(a) {
    // The RpcRemoteRendezvous using this object is responsible for calling
    // ReleaseWorker() before Reset().

    alloc_attrs_ = AllocatorAttributes(a); dst_device_ =nullptr;
    // We don't clear opts_ and assume that Init will set up the state for
    // opts_ appropriately.
    req_.Clear(a); resp_.Clear(a); {mutex_lock l(mu_);
      status_ = Status::OK(a); } done_ =nullptr;
  }

  ~RpcRecvTensorCall(a)override {
    // Since only the RpcRecvTensorFreeList will delete an
    // RpcRecvTensorCall, we require that ReleaseWorker() has been called before
    // the user releases a Call object to the free list.
    CHECK_EQ(static_cast<WorkerInterface*>(nullptr), wi_)
        << "Leaking WorkerInterface in RpcRecvTensorCall destructor.";
  }

  void Start(std::function<void()> recv_done) override {
    StartRTCall(std::move(recv_done));
  }

  void StartAbort(const Status& s) override {{mutex_lock l(mu_);
      status_.Update(s);
    }
    opts_.StartCancel(a); }Status status(a) const override {
    mutex_lock l(mu_);
    return status_;
  }

  void ReleaseWorker(WorkerCacheInterface* worker_cache) {
    DCHECK_NE(static_cast<WorkerInterface*>(nullptr), wi_)
        << "RpcRecvTensorCall::ReleaseWorker() called twice.";
    worker_cache->ReleaseWorker(src_worker_, wi_);
    wi_ = nullptr;
  }

  const Tensor& tensor(a) const { return resp_.tensor(a); }bool is_dead(a) const { return resp_.metadata().is_dead(a); }Device* dst_device(a) const { return dst_device_; }
  const Rendezvous::Args& recv_args(a) const { return recv_args_; }
  const Rendezvous::DoneCallback& done(a) const { return done_; }

 private:
  friend class RpcRemoteRendezvous;

  // Start the main RecvTensor call, checking for an async abort.
  void StartRTCall(std::function<void()> recv_done) {
    resp_.InitAlloc(dst_device_, alloc_attrs_);
    auto abort_checked = std::make_shared<Notification>();
    auto cb = [this, abort_checked,
               recv_done = std::move(recv_done)](const Status& s) {
      // Make sure the Rendezvous abort checking is finished before running the
      // callback, which might destroy the current call object.
      abort_checked->WaitForNotification(a);if(! s.ok()) {
        mutex_lock l(mu_);
        status_.Update(s);
      }
      recv_done(a); }; wi_->RecvTensorAsync(&opts_, &req_, &resp_, std::move(cb));

    // NOTE: Check if the rendezvous was aborted after sending out the RPC. The
    // ordering is important because StartAbort could be called right before
    // the RecvTensorAsync request registers its RPC cancellation to opts_.
    // In that case, the previous StartAbort would not trigger the
    // cancellation of this call.
    Status s;
    {
      mutex_lock l(mu_);
      s = status_;
    }
    if(! s.ok()) {
      opts_.StartCancel(a); }// Notify that the abort check has finished.
    abort_checked->Notify(a); } string src_worker_; string src_rel_device_; WorkerInterface* wi_;// Not owned.
  AllocatorAttributes alloc_attrs_;
  Device* dst_device_;
  CallOptions opts_;
  RecvTensorRequest req_;
  TensorResponse resp_;
  Rendezvous::Args recv_args_;
  Rendezvous::DoneCallback done_;

  mutable mutex mu_;
  Status status_ TF_GUARDED_BY(mu_);

  TF_DISALLOW_COPY_AND_ASSIGN(RpcRecvTensorCall);
};
Copy the code

The current logical relationship is as follows:

Figure 2 Rendezvous logical relationships

1.3 management class

Rendezvous RENDEZVOUS gr is primarily responsible for the creation and destruction of remotersource rendezvous, which tracks a set of local Rendezvous instances, all tensors sent by this worker are buffered in rendezvous GR until the tensors are received. Each globally unique “step_id” corresponds to a local Rendezvous instance managed by rendezvous GR.

1.3.1 interface

Rendezvous SmgrInterface is the interface class.

// RendezvousMgr keeps track of a set of local rendezvous instances.
// All tensors sent by this worker are buffered in a RendezvousMgr
// until the tensor is received. Each global unique "step_id"
// corresponds to one local rendezvous instance managed by a
// RendezvousMgr.
//
// E.g.,
// Rendezvous* rendez = worker_env->rendezvous_mgr->Find(0x8935);
// fork execution of an graph executor using "rendez" on thread 1;
// fork execution of another graph executor using "rendez" on thread 2;
/ /...
// join threads 1 and 2;
//
// In the example above, execution in thread 1 and 2 communicates with
// each other by send/recv operations through the "rend".
//
// Tensors sent and recved through rendezvous managed by this
// RendezvousMgr must have keys generated by Rendezvous::CreateKey.
class RendezvousMgrInterface {
 public:
  RendezvousMgrInterface() {}
  virtual ~RendezvousMgrInterface() {}

  // Returns Rendezvous supporting send and recv among workers in the
  // "step_id". The caller takes ownership of one reference on the
  // returned Rendezvous instance.
  //
  // Note: the caller must guarantee to eventually call Initialize on the
  // returned RemoteRendezvous
  virtual RemoteRendezvous* Find(int64_t step_id) = 0;

  // Finds the local rendezvous instance for the "step_id". Runs
  // "done" when the tensor for "key" is produced or an error occurs.
  //
  // This method is used by the rpc handler of RecvTensor.
  virtual void RecvLocalAsync(int64_t step_id,
                              const Rendezvous::ParsedKey& parsed,
                              Rendezvous::DoneCallback done) = 0;

  // Synchronous wrapper for RecvLocalAsync.
  virtual Status RecvLocal(int64_t step_id, const Rendezvous::ParsedKey& parsed,
                           Tensor* val, bool* is_dead) = 0;

  // Removes rendezvous for "step_id".
  //
  // TODO(zhifengc): Have a background thread in worker that
  // periodically calls CleanupAll().
  virtual void Cleanup(int64_t step_id) = 0;
};
Copy the code

1.3.2 BaseRendezvousMgr

BaseRendezvousMgr implements basic functions, such as finding Rendezvous according to step_id.

class BaseRendezvousMgr : public RendezvousMgrInterface {
 public:
  explicit BaseRendezvousMgr(const WorkerEnv* worker_env);

  ~BaseRendezvousMgr(a)override;

  // Returns Rendezvous supporting send and recv among workers in the
  // "step_id". The caller takes ownership of one reference on the
  // returned Rendezvous instance.
  //
  // Note: the caller must guarantee to eventually call Initialize on the
  // returned RemoteRendezvous
  RemoteRendezvous* Find(int64_t step_id) override;

  // Finds the local rendezvous instance for the "step_id". Runs
  // "done" when the tensor for "key" is produced or an error occurs.
  //
  // This method is used by the rpc handler of RecvTensor.
  void RecvLocalAsync(int64_t step_id, const Rendezvous::ParsedKey& parsed,
                      Rendezvous::DoneCallback done) override;

  // Synchronous wrapper for RecvLocalAsync.
  Status RecvLocal(int64_t step_id, const Rendezvous::ParsedKey& parsed,
                   Tensor* val, bool* is_dead) override;

  // Removes rendezvous for "step_id".
  void Cleanup(int64_t step_id) override;

 protected:
  virtual BaseRemoteRendezvous* Create(int64_t step_id,
                                       const WorkerEnv* worker_env) = 0;

 private:
  // Maps step_id to rendezvous.
  typedef absl::flat_hash_map<int64_t, BaseRemoteRendezvous*> Table;

  // Not owned.
  const WorkerEnv* const worker_env_;

  mutex mu_;
  Table table_ TF_GUARDED_BY(mu_);

  BaseRemoteRendezvous* FindOrCreate(int64_t step_id);

  TF_DISALLOW_COPY_AND_ASSIGN(BaseRendezvousMgr);
};
Copy the code

2. Use

We saw some use of Rendezvous earlier in the computation, so let’s look at a few scenarios to examine.

2.1 the Worker to accept

Let’s first look at the worker on the receiving side.

2.1.1 DoRunGraph

The Worker accepts tensors in the DoRunGraph method.

void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request, MutableRunGraphResponseWrapper* response, StatusCallback done) {

  session->graph_mgr() - >ExecuteAsync(
      request->graph_handle(), step_id, session.get(), request->exec_opts(),
      collector, response, cm, in,
      [this, step_id, response, session, cm, out, token, collector,
       device_profiler_session, opts, done](const Status& status) {
        Status s = status;
        if (s.ok()) {
          // Accept tensors
          s = session->graph_mgr() - >RecvOutputs(step_id, out); }}); }Copy the code

Then we’ll get a Rendezvous message based on step_id.

Status GraphMgr::RecvOutputs(const int64_t step_id, NamedTensors* out) {
  Rendezvous* rendezvous = worker_env_->rendezvous_mgr->Find(step_id);
  Status s = RecvOutputsFromRendezvous(rendezvous, out, Rendezvous::Args());
  rendezvous->Unref(a);size_t output_size = 0;
  for (auto& p : *out) {
    output_size += p.second.AllocatedBytes(a); }return s;
}
Copy the code

Concrete as shown in the figure below, on the process sequence as shown in figure Numbers, one step 3 returned to the Rendezvous, RecvOutputsFromRendezvous is a global approach.

2.1.2 DoPartialRunGraph

The DoPartialRunGraph calls RecvOutputsAsync to complete the accept task.

void Worker::DoPartialRunGraph(CallOptions* opts, RunGraphRequestWrapper* request, MutableRunGraphResponseWrapper* response, StatusCallback done) {
  const int64_t step_id = request->step_id(a);const string& graph_handle = request->graph_handle(a); Status s = recent_request_ids_.TrackUnique(
      request->request_id(), "PartialRunGraph (Worker)", request);

  std::shared_ptr<WorkerSession> session;
  if (request->create_worker_session_called()) {
    s = env_->session_mgr->WorkerSessionForSession(request->session_handle(),
                                                   &session);
  } else {
    session = env_->session_mgr->LegacySession(a); } GraphMgr::NamedTensors in; GraphMgr::NamedTensors* out =new GraphMgr::NamedTensors;
  s = PrepareRunGraph(request, &in, out);
  auto finish = [done, out, opts](const Status& s) {
    opts->ClearCancelCallback(a);delete out;
    done(s);
  };

  CancellationManager* cm = nullptr;
  bool is_new_partial_run = partial_run_mgr_.FindOrCreate(step_id, &cm);

  // Before we start doing anything, we set the RPC cancellation.
  opts->SetCancelCallback([this, cm, step_id]() {
    cm->StartCancel(a);AbortStep(step_id);
  });

  // If this is a new partial run request, the request will need to start the
  // executors.
  if (is_new_partial_run) {
    CancellationToken token;
    token = cancellation_manager_.get_cancellation_token(a); cancellation_manager_.RegisterCallback(token,
                                           [cm]() { cm->StartCancel(a); }); session->graph_mgr() - >ExecuteAsync(
        graph_handle, step_id, session.get(), request->exec_opts(),
        nullptr /* collector */.nullptr /* response */, cm, in,
        [this, token, step_id, session](Status s) {
          cancellation_manager_.DeregisterCallback(token);
          partial_run_mgr_.ExecutorDone(step_id, s);
        });
  } else {
    // Send the partial run's new inputs.
    s = session->graph_mgr() - >SendInputs(step_id, in);
  }

  // RecvOutputsAsync is called to accept tensors
  session->graph_mgr() - >RecvOutputsAsync(
      step_id, out, [this, out, request, response, step_id, finish](Status s) {
        if (s.ok()) {
          // Construct and return the resp.
          for (const auto& p : *out) {
            const string& key = p.first;
            const Tensor& val = p.second;
            response->AddRecv(key, val); }}if (request->is_last_partial_run()) {
          partial_run_mgr_.PartialRunDone(step_id, finish, s);
        } else {
          finish(s); }}); }Copy the code

Call the RecvOutputsFromRendezvousAsync RecvOutputsAsync here.

void GraphMgr::RecvOutputsAsync(const int64_t step_id, NamedTensors* out,
                                StatusCallback done) {
  Rendezvous* rendezvous = worker_env_->rendezvous_mgr->Find(step_id);
  std::vector<string> keys;
  std::vector<Tensor>* received_keys = new std::vector<Tensor>;
  keys.reserve(out->size());
  received_keys->reserve(out->size());
  for (const auto& p : *out) {
    keys.push_back(p.first);
    received_keys->push_back(p.second);
  }
  RecvOutputsFromRendezvousAsync(
      rendezvous, nullptr, {}, keys, received_keys,
      [done, rendezvous, received_keys, out, keys](const Status s) {
        rendezvous->Unref(a);size_t output_size = 0;
        for (int i = 0, end = keys.size(a); i < end; ++i) { (*out)[keys[i]] = (*received_keys)[i]; output_size += (*out)[keys[i]].AllocatedBytes(a); } metrics::RecordGraphOutputTensors(output_size);
        delete received_keys;
        done(s);
      });
}
Copy the code

Specific, as shown on the process sequence as shown in figure Numbers, one step 3 returned to the Rendezvous, RecvOutputsFromRendezvousAsync is a global approach.

2.2 GraphMgr send

So in ExecuteAsync it sends tensors.

void GraphMgr::ExecuteAsync(const string& handle, const int64_t step_id,
                            WorkerSession* session, const ExecutorOpts& opts,
                            StepStatsCollector* collector,
                            MutableRunGraphResponseWrapper* response,
                            CancellationManager* cancellation_manager,
                            const NamedTensors& in, StatusCallback done) {

  if (s.ok()) {
    // Send the tensor
    s = SendTensorsToRendezvous(rendezvous, nullptr, {}, keys, tensors_to_send);
  }

  // Execute the subgraph
  StartParallelExecutors(
      handle, step_id, item, rendezvous, ce_handle, collector, cost_graph,
      cancellation_manager, session, start_time_usecs,
      [item, rendezvous, ce_handle, done, start_time_usecs, input_size,
       step_id](const Status& s) {
      });
}
Copy the code

SendTensorsToRendezvous is as follows:

Status SendTensorsToRendezvous(
    RendezvousInterface* rendezvous, DeviceContext* device_context,
    const std::vector<AllocatorAttributes>& alloc_attrs,
    const std::vector<string>& keys, gtl::ArraySlice<Tensor> tensors_to_send) {

  Rendezvous::ParsedKey parsed;
  for (int i = 0; i < keys.size(a); ++i) { Rendezvous::Args rendez_args; rendez_args.device_context = device_context;if(! alloc_attrs.empty()) {
      rendez_args.alloc_attrs = alloc_attrs[i];
    }
    TF_RETURN_IF_ERROR(Rendezvous::ParseKey(keys[i], &parsed));
    TF_RETURN_IF_ERROR(
        rendezvous->Send(parsed, rendez_args, tensors_to_send[i], false));
  }
  return Status::OK(a); }Copy the code

Let’s take a closer look at how to receive and send.

3. Send

Let’s first look at the sending process. The Send process does not involve cross-process transmission, so it is the same as the Send transmission process in the local scenario. It only puts the tensor into the Worker’s local Table, and does not involve cross-network transmission at all, so it is non-blocking.

3.1 BaseRemoteRendezvous

The Send method calls the local_->Send completion function.

Status BaseRemoteRendezvous::Send(const Rendezvous::ParsedKey& parsed,
                                  const Rendezvous::Args& args,
                                  const Tensor& val, const bool is_dead) {

  WorkerSession* sess = nullptr;
  {
    tf_shared_lock l(mu_);
    if(! status_.ok()) return status_;
    sess = session_;
  }

  if (!IsLocalDevice(sess->worker_name(), parsed.src_device)) {
    return errors::InvalidArgument(
        "Invalid rendezvous key (src): ", parsed.FullKey(), "@",
        sess->worker_name());
  }

  // Buffers "val" and "device_context" in local_.
  return local_->Send(parsed, args, val, is_dead);
}
Copy the code

3.2 LocalRendezvous

LocalRendezvous::Send will insert the tensor into the local surface.

Status LocalRendezvous::Send(const Rendezvous::ParsedKey& key,
                             const Rendezvous::Args& send_args,
                             const Tensor& val, const bool is_dead) {
  uint64 key_hash = KeyHash(key.FullKey());

  if (is_dead) {
    static auto* rendezvous_dead_values_sent = monitoring::Counter<2> : :New(
        "/tensorflow/core/rendezvous_dead_values_sent"."The number of dead values sent between a pair of devices."."send_device"."recv_device");
    rendezvous_dead_values_sent
        ->GetCell(string(key.src_device), string(key.dst_device))
        ->IncrementBy(1);
  }

  mu_.lock(a);if(! status_.ok()) {
    // Rendezvous has been aborted.
    Status s = status_;
    mu_.unlock(a);return s;
  }

  ItemQueue* queue = &table_[key_hash];
  if (queue->head == nullptr || queue->head->type == Item::kSend) {
    // There is no waiter for this message. Append the message
    // into the queue. The waiter will pick it up when arrives.
    // Only send-related fields need to be filled.
    queue->push_back(new Item(send_args, val, is_dead));
    mu_.unlock(a);return Status::OK(a); }// There is an earliest waiter to consume this message.
  Item* item = queue->head;

  // Delete the queue when the last element has been consumed.
  if (item->next == nullptr) {
    table_.erase(key_hash);
  } else {
    queue->head = item->next;
  }
  mu_.unlock(a);// Notify the waiter by invoking its done closure, outside the
  // lock.
  DCHECK_EQ(item->type, Item::kRecv);
  (*item->recv_state.waiter)(Status::OK(), send_args, item->args, val, is_dead);
  delete item;
  return Status::OK(a); }Copy the code

The logic is as follows: Worker 0 refers to a Worker role, not the Worker class.

Figure 3. Send logic

4. Accept

The sender has now put the prepared tensor into the local table. The receiver needs to fetch the tensor from the table on the sender, where cross-process transport is involved. The accepted processing is:

  • Recv is the Client, and Recv will put together the corresponding ParsedKey for the Tensor that they need, and then Send a Request to Send, and the ParsedKey is in the Request.
  • Send is the Server. After receiving the Request, Send immediately looks for the Tensor that the Client needs in the local Table, encapsulates the Tensor into a Response and sends it back to Recv.

Data transmission is initiated by recV and sends a request to Send to trigger the communication process. This is a departure from our usual pattern. As we know, there are both synchronous and asynchronous calls in Worker. Let’s choose asynchronous calls to have a look. Give a send and accept process in advance so that we have an overall understanding. The dotted line in the figure below represents the return tensor.

Figure 4. Overall send and receive logic

4.1 the Client

The client logic is as follows:

4.1.1 RecvOutputsFromRendezvousAsync

Global function RecvOutputsFromRendezvousAsync calls to the rendezvous – > RecvAsync.

void RecvOutputsFromRendezvousAsync(
    RendezvousInterface* rendezvous, DeviceContext* device_context,
    const std::vector<AllocatorAttributes>& alloc_attrs,
    const std::vector<string>& keys, std::vector<Tensor>* received_tensors,
    StatusCallback done) {
  if (keys.empty()) {
    done(Status::OK());
    return;
  }

  received_tensors->reserve(keys.size());
  std::vector<
      std::tuple<string, Tensor*, Rendezvous::ParsedKey, AllocatorAttributes>>
      arguments;
  for (int i = 0; i < keys.size(a); ++i) { Rendezvous::ParsedKey parsed; Status s = Rendezvous::ParseKey(keys[i], &parsed);
    received_tensors->push_back(Tensor());
    if(! s.ok()) {
      done(s);
      return;
    }
    AllocatorAttributes alloc_attr;
    if(! alloc_attrs.empty()) {
      alloc_attr = alloc_attrs[i];
    }
    arguments.emplace_back(keys[i], &((*received_tensors)[i]), parsed,
                           alloc_attr);
  }

  auto status_cb = new ReffedStatusCallback(std::move(done));
  for (auto& p : arguments) {
    const string& key = std::get<0>(p);
    Tensor* val = std::get<1>(p);
    Rendezvous::ParsedKey parsed = std::get<2>(p);
    Rendezvous::Args rendez_args;
    rendez_args.device_context = device_context;
    rendez_args.alloc_attrs = std::get<3>(p);
    status_cb->Ref(a); rendezvous->RecvAsync(
        parsed, rendez_args,
        [val, key, status_cb](const Status& s,
                              const Rendezvous::Args& send_args,
                              const Rendezvous::Args& recv_args,
                              const Tensor& v, const bool is_dead) {
          Status status = s;
          if (status.ok()) {
            *val = v;
            if (is_dead) {
              status = errors::InvalidArgument("The tensor returned for ", key,
                                               " was not valid.");
            }
          }
          status_cb->UpdateStatus(status);
          status_cb->Unref(a); }); } status_cb->Unref(a); }Copy the code

4.1.2 BaseRemoteRendezvous

Because it’s not in a process, RecvFromRemoteAsync is called.

void BaseRemoteRendezvous::RecvAsync(const ParsedKey& parsed,
                                     const Rendezvous::Args& recv_args,
                                     DoneCallback done) {
  Status s = ValidateDevices(parsed, false / *! is_src*/);

  profiler::ScopedMemoryDebugAnnotation op_annotation("RecvAsync", step_id_);
  // Are src and dst in the same worker?
  if (IsSameWorker(parsed.src, parsed.dst)) { // In the same worker
    // Recv the tensor from local_.
    local_->RecvAsync(
        parsed, recv_args,
        [this, parsed, done](
            const Status& status, const Rendezvous::Args& send_args,
            const Rendezvous::Args& recv_args, const Tensor& in, bool is_dead) {

          Tensor* out = new Tensor;
          StatusCallback final_callback = [done, send_args, recv_args, out,
                                           is_dead](const Status& s) {
            done(s, send_args, recv_args, *out, is_dead);
            delete out;
          };

          if (status.ok()) {
            SameWorkerRecvDone(parsed, send_args, recv_args, in, out,
                               std::move(final_callback));
          } else {
            final_callback(status); }});return;
  } else { // Not in the same worker
    RecvFromRemoteAsync(parsed, recv_args, std::move(done)); }}Copy the code

4.1.3 RpcRemoteRendezvous

RpcRecvTensorCall ->Start(); Start() : StartRTCall() RpcRecvTensorCall inherits the abstract base class BaseRecvTensorCall, which is an abstraction of a gRPC call that encapsulates a complex chain of subsequent calls. How to use the corresponding Worker to set RpcRecvTensorCall:

WorkerInterface* rwi = worker_cache->GetOrCreateWorker(call->src_worker_);

call->Init(rwi, step_id_, parsed.FullKey(), recv_args.alloc_attrs, dst_device,
             recv_args, std::move(done));
Copy the code

The complete code is as follows:

void RpcRemoteRendezvous::RecvFromRemoteAsync(
    const Rendezvous::ParsedKey& parsed, const Rendezvous::Args& recv_args,
    DoneCallback done) {
  CHECK(is_initialized());
  Status s;

  // Prepare a RecvTensor call that can handle being aborted.
  // Generate a Call
  RpcRecvTensorCall* call = get_call_freelist() - >New(a);// key.src_device identifies a remote device.
  if(! DeviceNameUtils::SplitDeviceName(parsed.src_device, &call->src_worker_,
                                        &call->src_rel_device_)) {
    s = errors::Internal(parsed.src_device,
                         " is invalid remote source device.");
  }
  WorkerSession* sess = session(a); std::shared_ptr<WorkerCacheInterface> worker_cache = sess->GetSharedWorkerCache(a);// The worker will be released in a subsequent call to
  // sess->worker_cache()->ReleaseWorker() (if the call has not yet been
  // initialized) or call->ReleaseWorker() (if it has been initialized).
  
  // Get the corresponding Worker
  WorkerInterface* rwi = worker_cache->GetOrCreateWorker(call->src_worker_);

  Device* dst_device;
  if (s.ok()) {
    s = sess->device_mgr() - >LookupDevice(parsed.dst_device, &dst_device);
  }
  if(! s.ok()) {
    if(rwi ! =nullptr) {
      sess->worker_cache() - >ReleaseWorker(call->src_worker_, rwi);
    }
    get_call_freelist() - >Release(call);
    done(s, Args(), recv_args, Tensor{}, false);
    return;
  }

  // Initialize with Worker
  call->Init(rwi, step_id_, parsed.FullKey(), recv_args.alloc_attrs, dst_device,
             recv_args, std::move(done));

  // Record "call" in active_ so that it can be aborted cleanly.
  RegisterCall(call, recv_args);

  // Start "call".
  Ref(a); call->Start([this, call, worker_cache]() {
    // Removes "call" from active_. Prevent StartAbort().
    DeregisterCall(call);
    // If StartAbort was called prior to DeregisterCall, then the
    // current status should be bad.
    Status s = call->status(a);// NOTE: *session() can potentially be deleted before we return from
    // call->done()(...) , so we must release the worker before calling the
    // callback.
    call->ReleaseWorker(session() - >worker_cache());
    call->done()(s, Args(), call->recv_args(), call->tensor(), call->is_dead());
    get_call_freelist() - >Release(call);
    Unref(a); }); }Copy the code

4.1.4 RpcRecvTensorCall

The RpcRecvTensorCall Start method is as follows, resulting in StartRTCall.

void RpcRecvTensorCall::Start(std::function<void()> recv_done) override {
  StartRTCall(std::move(recv_done));
}
Copy the code

RpcRecvTensorCall: : StartRTCall, will call the Worker RecvTensorAsync to complete transfer, is actually GrpcRemoteWorker RecvTensorAsync.

// Start the main RecvTensor call, checking for an async abort.
void RpcRecvTensorCall::StartRTCall(std::function<void()> recv_done) {
  resp_.InitAlloc(dst_device_, alloc_attrs_);
  auto abort_checked = std::make_shared<Notification>();
  auto cb = [this, abort_checked,
             recv_done = std::move(recv_done)](const Status& s) {
    // Make sure the Rendezvous abort checking is finished before running the
    // callback, which might destroy the current call object.
    abort_checked->WaitForNotification(a);if(! s.ok()) {
      mutex_lock l(mu_);
      status_.Update(s);
    }
    recv_done(a); }; wi_->RecvTensorAsync(&opts_, &req_, &resp_, std::move(cb));

  // NOTE: Check if the rendezvous was aborted after sending out the RPC. The
  // ordering is important because StartAbort could be called right before
  // the RecvTensorAsync request registers its RPC cancellation to opts_.
  // In that case, the previous StartAbort would not trigger the
  // cancellation of this call.
  Status s;
  {
    mutex_lock l(mu_);
    s = status_;
  }
  if(! s.ok()) {
    opts_.StartCancel(a); }// Notify that the abort check has finished.
  abort_checked->Notify(a); }Copy the code

4.1.5 GrpcRemoteWorker

The reduced version of the RecvTensorAsync method is as follows, returning us to the familiar Worker process.

void GrpcRemoteWorker::RecvTensorAsync(CallOptions* call_opts, const RecvTensorRequest* request, TensorResponse* response, StatusCallback done) override {
  IssueRequest(request, response, recvtensor_, callback, call_opts);
}
Copy the code

So far we have completed the right half of the image shown in the circle above.

4.2 Server

So now we’re on the Server side, which is essentially the tensor sender. The logic after receiving the RecvTensorRequest is as follows:

2 GrpcWorkerService

GrpcWorkerServiceThread: : HandleRPCsLoop a for loop, insert the 1000 processing mechanism, Set the GrpcWorkerMethod: : kRecvTensor by EnqueueRecvTensorRequestRaw () processing. This is cached in advance, in order to speed up processing and EnqueueRecvTensorRequestRaw after processing a message, will call EnqueueRequestForMethod again insert a processing mechanism.

void GrpcWorkerServiceThread::HandleRPCsLoop(a) {
  // TODO(ncteisen): This may require performance engineering. We can
  // change the number of threads, the number of handlers per thread,
  // or even decide to specialize certain threads to certain methods.
  SETUP_FOR_REQUEST(GetStatus, 1.false);
  SETUP_FOR_REQUEST(CreateWorkerSession, 1.false);
  SETUP_FOR_REQUEST(DeleteWorkerSession, 1.false);
  SETUP_FOR_REQUEST(CleanupAll, 1.false);
  SETUP_FOR_REQUEST(RegisterGraph, 1.false);
  SETUP_FOR_REQUEST(DeregisterGraph, 1.false);
  SETUP_FOR_REQUEST(Logging, 1.false);
  SETUP_FOR_REQUEST(Tracing, 1.false);
  SETUP_FOR_REQUEST(CompleteGroup, 10.true);
  SETUP_FOR_REQUEST(CompleteInstance, 10.true);
  SETUP_FOR_REQUEST(GetStepSequence, 10.true);
  SETUP_FOR_REQUEST(RecvBuf, 500.true);
  SETUP_FOR_REQUEST(RunGraph, 100.true);
  SETUP_FOR_REQUEST(CleanupGraph, 100.false);
  SETUP_FOR_REQUEST(MarkRecvFinished, 10.false);

  // TODO(ncteisen): Determine a better policy for enqueuing the
  // appropriate number of each request type.
  for (int i = 0;
       i < gtl::FindWithDefault(
               queue_depth_, static_cast<int>(GrpcWorkerMethod::kRecvTensor),
               1000);
       ++i) {
    EnqueueRecvTensorRequestRaw(a);/ / set
  }

  void* tag;
  bool ok;

  while (cq_->Next(&tag, &ok)) {
    UntypedCall<GrpcWorkerServiceThread>::Tag* callback_tag =
        static_cast<UntypedCall<GrpcWorkerServiceThread>::Tag*>(tag);
    CHECK(callback_tag);
    callback_tag->OnCompleted(this, ok); }}Copy the code

Here can insert again, can set by GrpcWorkerServiceThread: : RecvTensorHandlerRaw continue processing GrpcWorkerMethod: : kRecvTensor.

void EnqueueRecvTensorRequestRaw(a) {
  mutex_lock l(shutdown_mu_);
  if(! is_shutdown_) { Call<GrpcWorkerServiceThread, grpc::WorkerService::AsyncService, RecvTensorRequest, ::grpc::ByteBuffer>::EnqueueRequestForMethod(
            worker_service_, cq_.get(),
            static_cast<int>(GrpcWorkerMethod::kRecvTensor),
            &GrpcWorkerServiceThread::RecvTensorHandlerRaw,
            true /* supports cancel*/); }}Copy the code

4.2.2 GrpcWorkerServiceThread

GrpcWorkerServiceThread is a thread class for the server to process requests. Here the GrpcWorker is called to continue processing. The WorkerCall is used as a parameter. WorkerCall is an alias class for the server to process a gRPC request and response.

using WorkerCall =
    Call<GrpcWorkerServiceThread, grpc::WorkerService::AsyncService,
         RequestMessage, ResponseMessage>;
Copy the code

The code is as follows:

void GrpcWorkerServiceThread::RecvTensorHandlerRaw( WorkerCall
       
        * call)
       ,> {
  Schedule([this, call]() {
    CallOptions* call_opts = new CallOptions;
    call->SetCancelCallback([call_opts]() { call_opts->StartCancel(a); }); worker_->GrpcRecvTensorAsync(
        call_opts, &call->request, &call->response,
        [call, call_opts](const Status& s) {
          call->ClearCancelCallback(a);delete call_opts;
          if(! s.ok()) {
            VLOG(3) < <"Bad response from RecvTensor:" << s;
          }
          call->SendResponse(ToGrpcStatus(s));
        });
  });
  EnqueueRecvTensorRequestRaw(a); }Copy the code

Holdings GrpcWorker

GrpcWorker is the Worker that is really responsible for processing the request logic, and is the server version of GrpcRemoteWorker. GrpcWorker: : GrpcRecvTensorAsync logic is:

  • Rendezvous will be acquired. Rendezvous the client’s required Tensor from the local Table using rendezvous_mgr->RecvLocalAsync.
  • Call GRPC: : EncodeTensorToByteBuffer (is_dead, tensor, cache_enabled, response) the tensor encoding.
  • CopyDeviceToHost is then called in the callback to copy the tensor from the GPU to the CPU.
  • Finally, gRPC is used to send back to the client.
// GrpcRecvTensorAsync: unlike the other Worker methods, which use protocol
// buffers for a response object, to avoid extra protocol buffer serialization
// overhead we generate our response directly into a ::grpc::ByteBuffer object
void GrpcWorker::GrpcRecvTensorAsync(CallOptions* opts,
                                     const RecvTensorRequest* request,
                                     ::grpc::ByteBuffer* response,
                                     StatusCallback done) {

  const int64_t request_id = request->request_id(a);const int64_t step_id = request->step_id(a);boolcache_enabled = (response_cache_ ! =nullptr&& request_id ! =0);

  auto do_response = [response, done, cache_enabled](const Tensor& tensor,
                                                     bool is_dead,
                                                     const Status& status) {
    if (status.ok()) {
      grpc::EncodeTensorToByteBuffer(is_dead, tensor, cache_enabled, response);
    }
    done(status);
  };

  // If response cache is enabled and the response cache already contains the
  // request, we delegate this retry request to the response cache. Otherwise,
  // we add the request to the response cache and start the computation to
  // retrieve the requested data.
  if (cache_enabled &&
      response_cache_->QueueRequest(request_id, step_id, do_response)) {
    return;
  }

  auto rendezvous_done = [this, request_id, do_response, cache_enabled](
                             const Tensor& tensor, bool is_dead,
                             const Status& status) {
    if (cache_enabled) {
      // Data is ready. Process all pending requests in the response cache.
      response_cache_->OnRequestFinished(request_id, tensor, is_dead, status);
    } else {
      do_response(tensor, is_dead, status); }};auto fail = [&rendezvous_done](const Status& status) {
    rendezvous_done(Tensor(), false, status);
  };

  Status s = recent_request_ids_.TrackUnique(
      request_id, "RecvTensor (GrpcWorker)", *request);

  const string& key = request->rendezvous_key(a); Rendezvous::ParsedKey parsed; s = Rendezvous::ParseKey(key, &parsed);
  Device* src_dev = nullptr;
  if (s.ok()) {
    s = PrepareRecvTensor(parsed, &src_dev);
  }

  // Request the tensor associated with the rendezvous key.
  // Note that we log the cancellation here but do not abort the current step.
  // gRPC can generate cancellations in response to transient network failures,
  // and aborting the step eliminates the opportunity for client side retries.
  // Repeated client failures will eventually cause the step to be aborted by
  // the client.
  opts->SetCancelCallback(
      [step_id]() { LOG(WARNING) << "RecvTensor cancelled for " << step_id; });
  env_->rendezvous_mgr->RecvLocalAsync(
      step_id, parsed,
      [opts, rendezvous_done, src_dev, request](
          const Status& status, const Rendezvous::Args& send_args,
          const Rendezvous::Args& recv_args, const Tensor& val,
          const bool is_dead) {
        opts->ClearCancelCallback(a);if (status.ok()) {
          // DMA can only be used for Tensors that do not fall into
          // the following three odd edge cases: 1) a zero-size
          // buffer, 2) a dead tensor which has an uninit value, and
          // 3) the tensor has the on_host allocation attribute,
          // i.e. it's in CPU RAM *independent of its assigned
          // device type*.
          const bool on_host = send_args.alloc_attrs.on_host(a); {// Non-DMA cases.
            if (src_dev->tensorflow_gpu_device_info() && (! on_host)) { DeviceContext* send_dev_context = send_args.device_context; AllocatorAttributes alloc_attrs; alloc_attrs.set_gpu_compatible(true);
              alloc_attrs.set_on_host(true);
              Allocator* alloc = src_dev->GetAllocator(alloc_attrs);
              Tensor* copy = new Tensor(alloc, val.dtype(), val.shape());
              // "val" is on an accelerator device. Uses the device_context to
              // fill the copy on host.
              StatusCallback copy_ready = [rendezvous_done, copy,
                                           is_dead](const Status& s) {
                // The value is now ready to be returned on the wire.
                rendezvous_done(*copy, is_dead, s);
                delete copy;
              };

              CopyDeviceToHost(&val, alloc, alloc, request->rendezvous_key(),
                               src_dev, copy, send_dev_context, copy_ready);
              return; }}}rendezvous_done(val, is_dead, status);
      });
}
Copy the code

4.2.4 BaseRendezvousMgr

BaseRendezvousMgr: : RecvLocalAsync will from the local Table lookup tensor.

void BaseRendezvousMgr::RecvLocalAsync(int64_t step_id,
                                       const Rendezvous::ParsedKey& parsed,
                                       Rendezvous::DoneCallback done) {
  auto rendez = FindOrCreate(step_id);
  auto done_cb = [rendez, done = std::move(done)](
                     const Status& s, const Rendezvous::Args& send_args,
                     const Rendezvous::Args& recv_args, const Tensor& v,
                     bool dead) {
    rendez->Unref(a);done(s, send_args, recv_args, v, dead);
  };
  rendez->RecvLocalAsync(parsed, std::move(done_cb));
}
Copy the code

4.2.5 BaseRemoteRendezvous

In fact, RecvLocalAsyncInternal is finally called, and the key code is local_->RecvAsync.

void BaseRemoteRendezvous::RecvLocalAsync(const ParsedKey& parsed,
                                          DoneCallback done) {
  // Test whether the rendezvous is initialized using a shared lock, to avoid
  // the need for exclusive access in the common case.
  if (TF_PREDICT_FALSE(!is_initialized())) {
    mutex_lock l(mu_);
    if (!is_initialized_locked()) {
      // RecvLocalAsync can be called (due to an incoming RecvTensor RPC from a
      // remote worker) before the RunStep (or PartialRunStep) RPC from the
      // master arrives. RecvLocalAsync thus buffers the arguments until after
      // the RemoteRendezvous is Initialize()'d, when it completes the
      // rendezvous logic. At some point after Initialize() is called, a Tensor
      // is produced locally that will then be sent in response to the incoming
      // RPC.
      DeferredCall call(parsed, std::move(done));
      deferred_calls_.push_back(call);
      return; }}RecvLocalAsyncInternal(parsed, std::move(done));
}

void BaseRemoteRendezvous::RecvLocalAsyncInternal(const ParsedKey& parsed,
                                                  DoneCallback done) {
  Status s = ValidateDevices(parsed, true /* is_src */);
  if(! s.ok()) {
    done(s, Args(), Args(), Tensor(), false);
    return;
  }
  local_->RecvAsync(parsed, Args(), std::move(done));
}
Copy the code

4.2.6 LocalRendezvous

LocalRendezvous: : RecvAsync finished read from the local table of tensor operation.

void LocalRendezvous::RecvAsync(const Rendezvous::ParsedKey& key,
                                const Rendezvous::Args& recv_args,
                                Rendezvous::DoneCallback done) {
  uint64 key_hash = KeyHash(key.FullKey());

  mu_.lock(a);if(! status_.ok()) {
    // Rendezvous has been aborted.
    Status s = status_;
    mu_.unlock(a);done(s, Rendezvous::Args(), recv_args, Tensor(), false);
    return;
  }

  ItemQueue* queue = &table_[key_hash];
  if (queue->head == nullptr || queue->head->type == Item::kRecv) {
    // There is no message to pick up.
    // Only recv-related fields need to be filled.
    CancellationManager* cm = recv_args.cancellation_manager;
    CancellationToken token = CancellationManager::kInvalidToken;
    bool already_cancelled = false;
    if(cm ! =nullptr) {
      // Increment the refcount when cancellation manager is present, to make
      // sure the rendezvous outlives the recv and its cancel callbacks.
      // This refcount is dropped in exactly one of the following cases:
      // (1) Recv registers cancellation callback to cm, and then cm is
      // cancelled, unref in the cancellation callback;
      // (2) Recv registers cancellation callback to cm, but cm is already
      // cancelled, unref in the already_cancelled check;
      // (3) Recv is successful, and item done callback finishes deregistering
      // the cancellation callback, unref in the item done callback;
      // (4) Recv is successful, but the item done callback fails to deregister
      // the cancellation callback because cm already StartCancel, in this
      // case the cancellation callback will be invoked by the cm anyway,
      // unref in the cancellation callback.
      if (rc_owner_) rc_owner_->Ref(a); token = cm->get_cancellation_token(a); already_cancelled = ! cm->RegisterCallback(token, [this, token, key_hash] {
        Item* item = nullptr;
        {
          mutex_lock l(mu_);
          ItemQueue* queue = &table_[key_hash];
          // Find an item in the queue with a cancellation token that matches
          // token, and remove it.
          if(queue->head ! =nullptr && queue->head->type == Item::kRecv) {
            for (Item *prev = nullptr, *curr = queue->head; curr ! =nullptr;
                 prev = curr, curr = curr->next) {
              if (curr->recv_state.cancellation_token == token) {
                item = curr;
                if (queue->head->next == nullptr) {
                  // We have a single-element queue, so we can erase it from
                  // the table.
                  table_.erase(key_hash);
                } else {
                  // Remove the current item from the queue.
                  if (curr == queue->head) {
                    DCHECK_EQ(prev, nullptr);
                    queue->head = curr->next;
                  } else {
                    DCHECK_NE(prev, nullptr);
                    prev->next = curr->next;
                  }
                  if(queue->tail == curr) { queue->tail = prev; }}break; }}}}if(item ! =nullptr) {
          (*item->recv_state.waiter)(
              StatusGroup::MakeDerived(
                  errors::Cancelled("RecvAsync is cancelled.")),
              Rendezvous::Args(), item->args, Tensor(), /*is_dead=*/false);
          delete item;
        }
        // Unref case (1) and (4)
        if (rc_owner_) rc_owner_->Unref(a); }); }if (already_cancelled) {
      mu_.unlock(a);// Unref case (2)
      if (rc_owner_) rc_owner_->Unref(a);done(StatusGroup::MakeDerived(
               errors::Cancelled("RecvAsync is cancelled.")),
           Rendezvous::Args(), recv_args, Tensor(), /*is_dead=*/false);
      return;
    }

    // TODO(b/143786186): Investigate moving the allocation of Item outside
    // the lock.
    if(cm ! =nullptr) {
      // NOTE(mrry): We must wrap done with code that deregisters the
      // cancellation callback before calling the done callback, because the
      // cancellation manager may no longer be live after done is called.
      queue->push_back(new Item(
          recv_args,
          [this, cm, token, done = std::move(done)](
              const Status& s, const Rendezvous::Args& send_args,
              const Rendezvous::Args& recv_args, const Tensor& v, bool dead) {
            // TryDeregisterCallback returns true when the cancellation callback
            // is successfully deregistered. If it fails because the CM already
            // StartAbort, Unref will happen inside the cancellation callback
            // when called by the CM.
            if (cm->TryDeregisterCallback(token)) {
              // Unref case (3)
              if (this->rc_owner_) this->rc_owner_->Unref(a); }done(s, send_args, recv_args, v, dead);
          },
          token));
    } else {
      queue->push_back(new Item(recv_args, std::move(done), token));
    }

    mu_.unlock(a);return;
  }

  // A message has already arrived and is queued in the table under
  // this key. Consumes the message and invokes the done closure.
  Item* item = queue->head;

  // Delete the queue when the last element has been consumed.
  if (item->next == nullptr) {
    table_.erase(key_hash);
  } else {
    queue->head = item->next;
  }
  mu_.unlock(a);// Invoke done() without holding the table lock.
  DCHECK_EQ(item->type, Item::kSend);
  done(Status::OK(), item->args, recv_args, *item->send_state.value,
       item->send_state.is_dead);
  delete item;
}
Copy the code

And I ended up completing all the logic of the previous diagram. Or we can look at it another way, as shown below:

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

TensorFlow Architecture and Design: Overview

TensorFlow kernel analysis

TensorFlow Architecture and Design: OP Essentialism

TensorFlow whitepaper

Tensorflow Developer Summit 2017

Jcf94.com/2018/02/28/…

TensorFlow 拆包(五):Distributed

TensorFlow Architecture

Tensorflow (Tensorflow)

What are In-graph replication and between-graph replication?

TensorFlow (1): create a session

05tensorflow Distributed session

Section 8, configure distributed TensorFlow

TensorFlow Distributed TensorFlow

Distributed_runtime for tensorflow source code parsing

Distributed TensorFlow: A Gentle Introduction

This article explains the essential knowledge of Tensorflow distributed training

Placer, the Placement heuristic algorithm module in TensorFlow

Graph Partitioner for TensorFlow

A communication mechanism for TensorFlow

TensorFlow distributed pit mining

TensorFlow: Distributed execution of model optimization

Tensorflow architecture process]