0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the fifth in a series, looks at how Horovod integrates various machine learning frameworks.

The previous links are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

We need some questions to guide the analysis:

  • Horovod does not rely on a certain framework. Instead, it establishes a set of distributed system through MPI to complete collective operations communication work such as AllReduce and AllGather. However, how to realize a unified distributed communication framework?
  • Horovod is a library. How is it embedded into various deep learning frameworks? For example, how to embed Tensorflow, PyTorch, MXNet, Keras?
  • Because Horovod needs to be compatible with so many learning frameworks, it should have its own OP operations and add adaptation layers on top of that to achieve compatibility.
  • How to completely abstract gradient synchronous communication into a frame-independent architecture?
  • How to separate the communication from the computing framework so that the computing framework simply calls the HVD interface directly, such as HorovodAllreduceOp, to do the gradient averaging.

Let’s look at how Horovod fits in next.

0 x01 architecture diagram

Let’s look at it through an architecture diagram.

The following is an architecture diagram from a student online to show you the Horovod distributed training framework. In an effort to maintain a consistent style, I have redrawn it as follows:

His layering idea is as follows:

  • Unified layer: Used to integrate the various framework layers. After HVD separates the communication and computing framework, the computing framework only needs to directly call the HVD interface, such as HorovodAllreduceOp, to do the gradient averaging.
  • Framework layer: Supports four popular deep learning frameworks, Tensorflow, PyTorch, MXNet and Keras. Training support for many popular frameworks is one of Horovod’s strengths.
  • Multi-card communication layer (collection communication layer) : mainly integrates some communication frameworks, including: NCCL, MPI, GLOO, CCL, mainly to complete the AllReduce process mentioned above.
  • Network communication layer: Optimizes network communication and improves the communication efficiency between clusters.

MPI’s role in Hovorod is unique:

  • Horovod integrates MMPI-based AllReduce, similar to NCCL, which is used as a gradient protocol.
  • On the other hand, MPI can be used to start multiple processes (represented by ranks in Hovorod) to implement parallel computing.

0 x02 unified layer

We now know that Horovod internally implements (encapsulates) the AllReduce functionality to implement the gradient specification.

But how does hvd. allReduce implement calls to different communication libraries? Horovod uses a unified layer to do this.

First, let’s take a look at how each rank node works, so we know what factors to consider when implementing the unified layer:

  • Each rank has two threads: the Execution thread and the Background thread.
  • An Execution thread is used to do machine learning calculations.
  • Background Threads are used for communication and allreduce.
    • The background thread has a message queue that receives requests from the OP such as AllReduce, AllGather, and Broadcast.
    • The background thread polls the message queue every once in a while, and once it gets a batch of ops, the tensor in the OP is combined and done accordingly.
    • If Tensor is in video memory, then it will execute using the NCCL library. In memory, MPI or Gloo is used.

Secondly, the implementation of the unified layer is:

  • Construct an Operation class system, first define the base class HVD OP, and then define the subclass AllReduceOP on this basis, and extend a number of collective OP based on different communication library (that is, adaptation layer). For example, GlooAllreduce and MPIAllReduce.
  • Build a message queue. All adaptation layers end up sending Op + Tensor messages to the queue, and Background initialization builds a Background thread to consume the queue. So there’s a process of synchronizing the message, which is like “some tensor” is ready on all the nodes and then you can start calculating.
  • The HVD OP set defined by Horovod is independent of the specific deep learning framework, and Horovod defines different HVD OP implementations for each framework. For example, when using TensorFlow, it cannot be directly inserted into the TF Graph, so you need to register the HVD OP of the TF.

Let’s analyze these aspects one by one.

0x03 Horovod OP class system

The Horovod OP class system is as follows:

  • First define the base class HVD OP;
  • Then define the subclass AllReduceOP on the basis of the subclass;
  • From this, we can extend several collective OP based on different communication libraries, such as GlooAllreduce and MPIAllReduce.

The logic is as follows:

+---------------+ | HorovodOp | +----+-----+---++ ^ ^ ^ ^ ^ | | | | | +----------------------------+ | | | | | +---------------------+ | | +-----------------+ | | +-------+ | | | | | | | +------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+ | AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp | +------------+ + -- -- -- -- -- -- -- -- + + + - + - + - + + + -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- + ^ ^ ^ ^ | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | +-----------------------------------+ | +-------+ +-------------+ | | | | | +-----+--------+ +---+----------+ +------------+--------+ +------------+---+ | MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce | +--------------+ +--------------+ +---------------------+ +----------------+Copy the code

The picture on the phone:

3.1 HorovodOp base class

HorovodOp is the base class for all classes and does the following:

  • Have HorovodGlobalState so you can call the overall state at any time;
  • The NumElements function is responsible for getting how many tensor this OP has;
  • A virtual function, Execute, that is implemented by a derived class, is an algorithm operation that the derived class needs to implement.
class HorovodOp {
public:
  HorovodOp::HorovodOp(HorovodGlobalState* global_state)
      : global_state_(global_state) {}

  int64_t HorovodOp::NumElements(std::vector<TensorTableEntry>& entries) {
    int64_t num_elements = 0;
    for (auto& e : entries) {
      num_elements += e.tensor->shape().num_elements(a); }return num_elements;
  }

  virtual Status Execute(std::vector<TensorTableEntry>& entries,
                         const Response& response) = 0;

protected:
  HorovodGlobalState* global_state_;
};
Copy the code

3.2 Derived class AllreduceOp

There are several derived classes of HorovodOp, which have similar functions, such as AllreduceOp, AllgatherOp, BroadcastOp, AlltoallOp, and JoinOp (used for elastic training).

Taking AllreduceOp as an example, its definition is as follows. The main functions are:

  • Execute requires its derived class implementation, that is, the specific algorithm operation;
  • Enabled requires a derived class implementation;
  • Memcpyinfbuffer: Use it to copy multiple entries in your Input Fusion tensor;
  • MemcpyOutFusionBuffer: Used to copy multiple entries in the Output Fusion tensor;
  • MemcpyEntryInFusionBuffer: used to copy the input Fusion tensor;
  • MemcpyEntryOutFusionBuffer: used to copy the output Fusion tensor;
class AllreduceOp : public HorovodOp {
public:
  virtual Status Execute(std::vector<TensorTableEntry>& entries,
                         const Response& response) = 0;

  virtual bool Enabled(const ParameterManager& param_manager,
                       const std::vector<TensorTableEntry>& entries,
                       const Response& response) const = 0;
protected:  
  virtual void
  MemcpyInFusionBuffer(const std::vector<TensorTableEntry>& entries,
                       const void*& fused_input_data, void*& buffer_data,
                       size_t& buffer_len); . };Copy the code

3.3 Adaptation MPIAllreduce

The next is the specific implementation class, and specific communication framework related, such as: MPIAllreduce, GPUAllreduce, AdasumMPIAllreduceOp, GlooAllreduce. In common/ Ops, you can see specific categories such as NCCL/Gloo/MPI and so on.

These ops are managed by op_Manager. Op_manager finds which ops can be computed based on their priority, such as:

  • MPI_Allreduce is used for MPI. Specific implementations of The Scatter – Gather and all-gather openMPI are available.
  • NCCL calls it directlyncclAllReduceThe new NCCL also supports allReduce across nodes without having to build a layer of its own.

MPIAllreduce is used as an example. Its definition is as follows:

class MPIAllreduce : public AllreduceOp {
public:
  MPIAllreduce(MPIContext* mpi_context, HorovodGlobalState* global_state);

  Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) override;

  bool Enabled(const ParameterManager& param_manager,
               const std::vector<TensorTableEntry>& entries,
               const Response& response) const override;

protected:
  MPIContext* mpi_context_;
};
Copy the code

Execute is a call to MPI_Allreduce to complete an operation, such as:

  • Copy from memory to fusion Buffer;
  • MPI_Allreduce is invoked for merging.
  • Copy from the Fusion Buffer.
Status MPIAllreduce::Execute(std::vector<TensorTableEntry>& entries, const Response& response) {
  // Copy memory into the fusion buffer..MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len); .// Do allreduce.
  timeline.ActivityStartAll(entries, MPI_ALLREDUCE);
  const void* sendbuf = entries.size(a) >1 || fused_input_data == buffer_data
                        ? MPI_IN_PLACE : fused_input_data;
  int op = MPI_Allreduce(sendbuf, buffer_data,
                         (int) num_elements,
                         mpi_context_->GetMPIDataType(first_entry.tensor),
                         mpi_context_->GetMPISumOp(first_entry.tensor->dtype()),
                         mpi_context_->GetMPICommunicator(Communicator::GLOBAL));

  // Copy memory out of the fusion buffer..MemcpyOutFusionBuffer(buffer_data, entries); . }Copy the code

3.4 How do I Use background Threads

Since Horovod is mostly done by a background thread, let’s look at how the Hovorod OP is called in the background thread.

Horovod’s workflow is simple:

  • HorovodGlobalState contains a message queue to receive requests from the Op such as AllReduce, AllGather, and Broadcast.
  • There’s a background thread that polls the message queue every once in a while, and once it gets a batch of ops, the tensor in the OP is fused and done accordingly.
  • If Tensor is in video memory, then it will execute using the NCCL library. In memory, MPI or Gloo is used.

3.4.1 Collective Operations

Once the Horovod background thread gets the tensor that needs to be fused, it calls PerformOperation for the specific collective operation. There are calls in PerformOperation

void PerformOperation(Response response, HorovodGlobalState& state) {... Status status;try {
    // Perform the collective operation
    status = op_manager->ExecuteOperation(entries, response);
  } catch (const std::exception& ex) {
    status = Status::UnknownError(ex.what()); }... }Copy the code

The logic is as follows:

+---------------------------------+
|                                 |             +-----------------------------+
|  BackgroundThreadLoop           |             |                             |
|                                 |             | OperationManager            |
|   +--------------------------+  |             |                             |
|   |  RunLoopOnce             |  |             |                             |
|   |                          |  |             |                             |
|   |                          |  |             |                             |
|   |     ComputeResponseList  |  |    +----------> ExecuteOperation          |
|   |             +            |  |    |        |                             |
|   |             |            |  |    |        |                             |
|   |             |            |  |    |        |                             |
|   |             |            |  |    | 1| | | | v | | | | | | | | | | | | | | PerformOperation +----------+ | | | | | | | | | +--------------------------+ | | |  | | | | +---------------------------------+ +-----------------------------+Copy the code

3.4.2 Calling Different Types of OP

Then status = op_manager->ExecuteOperation(entries, response) calls a different op->Execute(entries, response) to perform the reduce operation.

For example, ALLREDUCE calls ExecuteAllreduce(entries, response).

Status OperationManager::ExecuteOperation(std::vector<TensorTableEntry>& entries,
                                          const Response& response) const {
  if (response.response_type() == Response::ALLREDUCE) {
    return ExecuteAllreduce(entries, response); / / here
  } else if (response.response_type() == Response::ALLGATHER) {
    return ExecuteAllgather(entries, response);
  } else if (response.response_type() == Response::BROADCAST) {
    return ExecuteBroadcast(entries, response);
  } else if (response.response_type() == Response::ALLTOALL) {
    return ExecuteAlltoall(entries, response);
  } else if (response.response_type() == Response::JOIN) {
    return ExecuteJoin(entries, response); }... }Copy the code

The logic is as follows:

+---------------------------------+
|                                 |             +-----------------------+
|  BackgroundThreadLoop           |             |                       |
|                                 |             | OperationManager      |
|   +--------------------------+  |             |                       |
|   |  RunLoopOnce             |  |             |                       |
|   |                          |  |             |                       |
|   |                          |  |             |                       |
|   |     ComputeResponseList  |  |    +----------> ExecuteOperation    |
|   |             +            |  |    |        |           +           |
|   |             |            |  |    |        |           |           |
|   |             |            |  |    |        |           |           |
|   |             |            |  |    | 1      |           |  2        |
|   |             v            |  |    |        |           |           |
|   |                          |  |    |        |           |           |
|   |      PerformOperation +----------+        |           v           |
|   |                          |  |             |   ExecuteAllreduce    |
|   +--------------------------+  |             |                       |
|                                 |             |                       |
+---------------------------------+             +-----------------------+
Copy the code

3.4.3 Selecting an Adaptation Layer

Specifically, select an appropriate OP from allreduce_ops_ and invoke its Execute.

Status OperationManager::ExecuteAllreduce(std::vector<TensorTableEntry>& entries,
                                          const Response& response) const {
  for (auto& op : allreduce_ops_) {
    if (op->Enabled(*param_manager_, entries, response)) {
      return op->Execute(entries, response); }}}Copy the code

Where does allreduce_ops_ come from? In the OperationManager builder.

allreduce_ops_(std::move(allreduce_ops)),
Copy the code

So let’s look at how Allreduce_Ops is built.

3.4.4 Adaptation Layer construction

Add allReduce_OPS in CreateOperationManager.

As you can see, the types added are roughly as follows:

  • MPI_GPUAllreduce
  • NCCLHierarchicalAllreduce
  • NCCLAllreduce
  • DDLAllreduce
  • GlooAllreduce
  • GPUAllreduce
  • MPIAllreduce
  • .
OperationManager* CreateOperationManager(HorovodGlobalState& state) {
  // Order of these operations is very important. Operations will be checked
  // sequentially from the first to the last. The first 'Enabled' operation will
  // be executed.
  std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops;
  std::vector<std::shared_ptr<AllgatherOp>> allgather_ops;
  std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops;
  std::vector<std::shared_ptr<AllreduceOp>> adasum_ops;
  std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops;

#if HAVE_MPI && HAVE_GPU // If MPI is configured
  if (mpi_context.IsEnabled()) { 
#if HOROVOD_GPU_ALLREDUCE == 'M'
    allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
        new MPI_GPUAllreduce(&mpi_context, &gpu_context, &state)));

    allreduce_ops.push_back(
        std::shared_ptr<AllreduceOp>(new NCCLHierarchicalAllreduce(
            &nccl_context, &mpi_context, &gpu_context, &state)));

#elif HAVE_DDL && HOROVOD_GPU_ALLREDUCE == 'D'  // If DDL is configured
    allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
        new DDLAllreduce(&ddl_context, &gpu_context, &state)));
#endif

#if HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N'// If NCCL is configured
  allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
      new NCCLAllreduce(&nccl_context, &gpu_context, &state)));
#endif.Copy the code

So we know how to use these operations.

The process is as follows:

+---------------------------------+
|                                 |             +-----------------------+
|  BackgroundThreadLoop           |             |                       |
|                                 |             | OperationManager      |
|   +--------------------------+  |             |                       |
|   |  RunLoopOnce             |  |             |                       |
|   |                          |  |             |                       |
|   |                          |  |             |                       |      +-->  GPUAllreduce
|   |     ComputeResponseList  |  |    +----------> ExecuteOperation    |      |
|   |             +            |  |    |        |           +           |      |
|   |             |            |  |    |        |           |           |      +-->  NCCLHierarchicalAllreduce
|   |             |            |  |    |        |           |           |      |
|   |             |            |  |    | 1      |           |  2| | | | v | | | | | | +--> NCCLAllreduce | | | | | | | | | | | PerformOperation +----------+ | v | | | | | | | ExecuteAllreduce | +--> DDLAllreduce | +--------------------------+ | | + | | | | | | | | +---------------------------------+ | | | +--> GlooAllreduce | | allreduce_ops----------+ | | | | +----------------+ | |  | +--> | MPIAllreduce | +-----------------------+ | | | | | +----------------------------------> Execute |3            |                |
                                                                                    +----------------+

Copy the code

Mobile phones are as follows:

Each rank node has two threads:

  • An Execution thread is used to do machine learning calculations.
  • Background Thread is responsible for communication and AllReduce.

So far, we’ve really looked at the second part: Background Threads are responsible for communication and AllReduce.

Let’s take a look at some of the aspects of part 1, which is how a framework like Tensorflow sends tensor & op to a background thread.

0x04 Converged with the communication framework

The HVD OP set defined by Horovod is independent of the specific deep learning framework. For example, when using TensorFlow, it is not possible to directly insert into the TF Graph, so you need to register the TF OP.

Horovod defines different implementations for each framework.

For distributed training of TensorFlow model, Horovod developed TensorFlow OPS to implement AllReduce of TensorFlow Tensor. In addition, these OP can be integrated into the calculation diagram of TensorFlow, and the Runtime of TensorFlow graph can be used to achieve the estimates of calculation and communication, so as to improve the efficiency of communication.

Taking AllReduce distributed training of TensorFlow model as an example, Horovod developed AllReduce Ops embedded in the reverse calculation graph of TensorFlow, so as to obtain the gradient of reverse calculation of TensorFlow and perform gradient convergence. Allreduce Ops can be gradient converged by invoking the AllReduce API provided by Gloo.

For example, in horovod/ TensorFlow /mpi_ops.cc, HorovodAllreduceOp is defined for TensorFlow.

4.1 TensorFlow Defines Op

For TensorFlow, you can customize operations, that is, if an existing library does not cover the operations you want, you can customize one yourself.

To make the custom Op compatible with the original library, you must do the following:

  • Registering a new op. Op in a C++ file the registration and implementation of the Op are independent. Describes how the Op should be executed when it is registered. For example, registering an Op defines the name of the Op and specifies its inputs and outputs.
  • Each implementation is called a “kernel”, and multiple kernels can exist to accommodate different architectures (CPU, GPU, etc.) or different input/output types.
  • Create a Python wrapper. The wrapper is the public API for creating the Op. A default wrapper is automatically generated when the Op is registered. You can either use the default wrapper directly or add a new wrapper.
  • (Optional) Write a function to calculate the gradient of Op.
  • (Optional) write a function that describes the Op input and output shapes. This function allows you to infer shape from Op.
  • Test the Op, usually using Pyhton. If you define a gradient, you can use Python’s GradientChecker to test it.

4.2 Horovod implementation — HorovodAllreduceOp

HorovodAllreduceOp is a TF Async OP, and then its internal implementation calls the HVD OP, which is a clever combination pattern. HorovodAllReduce, which inherits TP Aysnc OP, can be inserted into the TF Graph and executed normally.

Adding a new OP requires three steps, so let’s see.

4.2.1 Define the Op interface

The first step is to define the Op’s interface, which is registered with the TensorFlow system using REGISTER_OP(). This Op is called HorovodAllreduceOp.

// 1. Define the Op interface
// REGISTER_OP() registers with the TensorFlow system to define the interface of the Op, which is the HorovodAllreduceOp.
// At registration, specify the name of the Op: REGISTER_OP("HorovodAllreduce")
Tensor: tensor (type and name): Input("tensor: T")
Output("sum: T")
// and the required documentation of any attribute Doc(R" Doc(...)) doc");
//
A tensor of type T can be {int32, INT64, float32, float64}
A tensor sum of T is a tensor sum of all MPI processes

REGISTER_OP("HorovodAllreduce").Attr("T: {int32, int64, float16, float32, float64}").Attr("reduce_op: int").Attr("prescale_factor: float").Attr("postscale_factor: float").Attr("ignore_name_scope: bool = False").Input("tensor: T").Output("sum: T").SetShapeFn([](shape_inference::InferenceContext* c) {
      c->set_output(0, c->input(0));
      return Status::OK(a); });Copy the code

4.2.2 Implement kernel for Op

The second step is to implement the kernel for the Op. After the interface is defined, each implementation is called a “kernel”, providing implementations of one or more ops. that is, multiple kernels can exist.

The HorovodAllreduceOp class inherits from the AsyncOpKernel and overrides its ComputeAsync() method. The ComputeAsync() method provides a context argument of type OpKernelContext*, which is used to access some useful information, such as an input and output tensor.

In ComputeAsync, this AllReduce request is queued. As you can see, Horovod is similar to Baidu in terms of the implementations supported by TensorFlow. All custom AllReduce Ops, where requests are queued.

// 2. Implement kernel for Op.
// After the interface is defined, each implementation is called a "kernel", providing one or more Op implementations, i.e. multiple kernels can exist.
// Create a corresponding class for each of these kernels, inheriting AsyncOpKernel and overriding the ComputeAsync method.
// The ComputeAsync method provides a context of type OpKernelContext*, which is used to access some useful information, such as an input and output tensor

class HorovodAllreduceOp : public AsyncOpKernel {
public:
  // Prevent implicit automatic conversions of class constructors, and only show calls to that constructor
  explicit HorovodAllreduceOp(OpKernelConstruction* context)
      : AsyncOpKernel(context) {
    OP_REQUIRES_OK(context, context->GetAttr("reduce_op", &reduce_op_));
    OP_REQUIRES_OK(context, context->GetAttr("prescale_factor", &prescale_factor_));
    OP_REQUIRES_OK(context, context->GetAttr("postscale_factor", &postscale_factor_));
    OP_REQUIRES_OK(context, context->GetAttr("ignore_name_scope", &ignore_name_scope_));
  }

  // Override the ComputeAsync() method
  void ComputeAsync(OpKernelContext* context, DoneCallback done) override {
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()),
                         done);

    auto node_name = name(a);if (ignore_name_scope_) {
      auto pos = node_name.find_last_of('/');
      if(pos ! = std::string::npos) { node_name = node_name.substr(pos + 1); }}auto device = GetDeviceID(context);
    auto tensor = context->input(0);
    horovod::common::ReduceOp reduce_op = static_cast<horovod::common::ReduceOp>(reduce_op_);
    Tensor* output;
    OP_REQUIRES_OK_ASYNC(
        context, context->allocate_output(0, tensor.shape(), &output), done);
    // ReadyEvent makes sure input tensor is ready, and output is allocated.
    // shared_ptr is a standard shared ownership smart pointer that allows multiple Pointers to the same object
    auto ready_event = std::shared_ptr<common::ReadyEvent>(RecordReadyEvent(context));
    // The template function STD ::make_shared returns a specified type of STD ::shared_ptr
    auto hvd_context = std::make_shared<TFOpContext>(context);
    auto hvd_tensor = std::make_shared<TFTensor>(tensor);
    auto hvd_output = std::make_shared<TFTensor>(*output);
      
    // Queue the Allreduce operation OP of the tensor
    auto enqueue_result = EnqueueTensorAllreduce(
        hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device,
        [context, done](const common::Status& status) {
          context->SetStatus(ConvertStatus(status));
          done(a); }, reduce_op, (double) prescale_factor_, (double) postscale_factor_);
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done);
  }

private:
  int reduce_op_;
  // Using float since TF does not support double OP attributes
  float prescale_factor_;
  float postscale_factor_;
  bool ignore_name_scope_;
};

Copy the code

4.2.3 Registering OP with TensorFlow

The third step is to register the OP into the TensorFlow system.

// 3. Register OP with TensorFlow system
// You can specify multiple constraints for the kernel runtime during registration. For example, you can specify one kernel to run on the CPU and another on the GPU
REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_CPU),
                        HorovodAllreduceOp);
// If the GPU is executed
#if HOROVOD_GPU_ALLREDUCE
REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_GPU),
                        HorovodAllreduceOp);
#endif
Copy the code

4.2.4 attention

For details, refer to Add New OP, which specifies the implementation of the Tensorflow custom operator.

Note that the generated function gets a snake-like name (to conform to PEP8). Therefore, if your operation is named ZeroOut in a C++ file, the Python function will be called zero_out.

The C++ definitions are humped and the generated python functions are underlined in lowercase, so the final correspondence is that the Op adaptor is in the horovod/tensorflow directory.

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

So, in the Python world, when _DistributedOptimizer calls compute_gradients to optimize, it calls MPI_LIB. Horovod_allreduce through _allreduce, So that’s called HorovodAllreduceOp.

The details of how _DistributedOptimizer calls _allReduce will be explained in a future article.

def _allreduce(tensor, name=None, op=Sum) :
    if name is None and not _executing_eagerly():
        name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op)
Copy the code

4.3 How to Use

4.3.1 EnqueueTensorAllreduce

The HorovodAllreduceOp class calls the EnqueueTensorAllreduce() method to add the Allreduce operation OP of the tensor to the HorovodGlobalState queue.

EnqueueTensorAllreduce in: / horovod/common/operations. Cc.

Contexts callbacks and other supporting data are created and then EnqueueTensorAllreduces is called.

// Contexts and controller must be initialized and the background thread
// must be running before this function is called.
Status EnqueueTensorAllreduce(std::shared_ptr<OpContext> context,
                              std::shared_ptr<Tensor> tensor,
                              std::shared_ptr<Tensor> output,
                              std::shared_ptr<ReadyEvent> ready_event,
                              std::string name, const int device,
                              StatusCallback callback,
                              ReduceOp reduce_op,
                              double prescale_factor,
                              double postscale_factor) {
  // Wrap inputs in std::vector and pass onto multi tensor implementation
  std::vector<std::shared_ptr<OpContext>> contexts;
  std::vector<std::shared_ptr<Tensor>> tensors;
  std::vector<std::shared_ptr<Tensor>> outputs;
  std::vector<std::shared_ptr<ReadyEvent>> ready_events;
  std::vector<std::string> names;
  std::vector<StatusCallback> callbacks;

  contexts.emplace_back(std::move(context));
  tensors.emplace_back(std::move(tensor));
  outputs.emplace_back(std::move(output));
  ready_events.emplace_back(std::move(ready_event));
  names.emplace_back(std::move(name));
  callbacks.emplace_back(std::move(callback));

  return EnqueueTensorAllreduces(contexts, tensors, outputs, ready_events,
                                 names, device, callbacks, reduce_op,
                                 prescale_factor, postscale_factor);
}

Copy the code

4.3.2 Submit commands

Enqueuetensorallqueue is basically calling AddToTensorQueueMulti to submit an operation to the TensorQueue.

  • The tensor that needs to be reduced is assembled into a Request.
  • For each tensor, a corresponding TensorTableEntry is created to hold the weights of the tensor, and the message is mainly some meta information metadata.
  • Plug the Request and TensorTableEntry into GlobalState’s Tensor_queue, which is a queue maintained by global objects shared within a process.
  • Wait for the background thread to read the AllReduce request. The background process will always execute a loop called RunLoopOnce. In this case, the background thread makes use of the MPIController to process enqueued requests. MPIController can be understood as coordinating the different Rank processes that process the requested object. This abstraction is not available on Baidu, mainly to support other collection computing libraries such as Facebook’s Gloo. So Horovod also has implementations like GlooController and so on.

The specific code is as follows:

Status EnqueueTensorAllreduces(std::vector<std::shared_ptr<OpContext>>& contexts,
                               std::vector<std::shared_ptr<Tensor>>& tensors,
                               std::vector<std::shared_ptr<Tensor>>& outputs,
                               std::vector<std::shared_ptr<ReadyEvent>>& ready_events,
                               std::vector<std::string>& names,
                               const int device,
                               std::vector<StatusCallback>& callbacks,
                               ReduceOp reduce_op,
                               double prescale_factor,
                               double postscale_factor) { Status status; . std::vector<Request> messages; std::vector<TensorTableEntry> entries; messages.reserve(tensors.size());
  entries.reserve(tensors.size());

  for (int n = 0; n < tensors.size(a); ++n) {// The tensor of reduce is needed to traverse
    // Assemble a Request with tensor
    Request message;
    message.set_request_rank(horovod_global.controller->GetRank());
    message.set_tensor_name(names[n]);
    message.set_tensor_type(tensors[n]->dtype());
    message.set_device(device);
    message.set_prescale_factor(prescale_factor);
    message.set_postscale_factor(postscale_factor);

    if (reduce_op == ReduceOp::ADASUM) {
      message.set_request_type(Request::ADASUM);
    } else {
      message.set_request_type(Request::ALLREDUCE);
    }

    message.set_tensor_shape(tensors[n]->shape().to_vector());
    messages.push_back(std::move(message));

    TensorTableEntry e;
    e.tensor_name = names[n];
    e.context = std::move(contexts[n]);
    // input and output can be the same, only move when safe
    if(tensors[n] ! = outputs[n]) { e.tensor = std::move(tensors[n]);
      e.output = std::move(outputs[n]);
    } else {
      e.tensor = tensors[n];
      e.output = outputs[n];
    }
    e.ready_event = std::move(ready_events[n]);
    e.device = device;
    e.callback = std::move(callbacks[n]);

    // For each tensor, we create a corresponding TensorTableEntry to hold the weights of the tensor. The message is mainly some meta information metadata
    entries.push_back(std::move(e));
  }

  std::string tensors_enqueued;
  for (const auto& n : names) {
    tensors_enqueued += n + "; ";
  }

  // Only create groups larger than 1 tensor, unless disable_group_fusion is requested.
  // In that case, even single tensor groups are created to enforce disabling fusion.
  if (tensors.size(a) >1 || horovod_global.disable_group_fusion) {
    auto group_id = horovod_global.group_table.RegisterGroup(std::move(names));
    for (auto& message : messages) {
      message.set_group_id(group_id); }}// Add to GlobalState tensor_queue
  status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages);

  return status;
}
Copy the code

4.3.3 TensorQueue

Tensor and op are basically added to the TensorQueue, which is called as follows:

status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages);
Copy the code

The AddToTensorQueue function is similar to the AddToTensorQueueMulti function, except that the AddToTensorQueueMulti function handles multiple messages.

  • Add the MPI Request message to the horovod_globally. message_queue.
  • Add TensorTableEntry e to horovod_global.tensor_table;
// Add a TensorTableEntry as well as its message to the queue.
Status TensorQueue::AddToTensorQueue(TensorTableEntry& e, Request& message) {
  std::lock_guard<std::mutex> guard(mutex_);
  if (tensor_table_.find(e.tensor_name) ! = tensor_table_.end()) {
    return DUPLICATE_NAME_ERROR;
  }
  tensor_table_.emplace(e.tensor_name, std::move(e));
  message_queue_.push(std::move(message));
  return Status::OK(a); }Status TensorQueue::AddToTensorQueueMulti(std::vector
       
        & entries, std::vector
        
         & messages)
         {
  std::lock_guard<std::mutex> guard(mutex_);

  for (int i = 0; i < entries.size(a); ++i) {if (tensor_table_.find(entries[i].tensor_name) ! = tensor_table_.end()) {
      return DUPLICATE_NAME_ERROR;
    }
    tensor_table_.emplace(entries[i].tensor_name, std::move(entries[i]));
    message_queue_.push(std::move(messages[i]));
  }
  return Status::OK(a); }Copy the code

This adds to the Message Queue, and our logic is complete.

0 x05 summary

Horovod’s gradient synchronous update and AllReduce operation are summarized as follows:

  • First, HVD defines the TF asynchronous AllReduce OP and inserts the AllReduce OP into the TF execution Graph through the Wrap Optimizer.
  • The OP mainly packages the information required by All Reduce into requests and sends them to coordinator (Rank0).
  • Rank0 coordinates the request of all Rank and sends Response after all Rank are Ready for AllReduce operation.

The details are as follows:

+----------------------+ + | Computation Graph | Execution Thread | Background Communication Thread +---------+------------+ | | | | | v | | +----------------+ | | | | | TF Aysnc Op | | | | | +------+---------+ | | | | |  | | v | +-----------------------+ + | HorovodGlobalState | +---------------------+ EnqueueTensorAllreduce(tensor, op) | | | | +---------------+ | | | HorovodAllreduceOp | +--------------------------------------> | HorovodOp | +-------------------------> message_queue | | | +----+-----+---++ | | +---------------------+ ^ ^ ^ ^ ^ | tensor_table |  | | | | | | | +----------------------------+ | | | | +-----------------------+ | +---------------------+ | | +-----------------+ | | +-------+ | | | | | | | +------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+ | AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp | +------------+ +--------+ + + - + - + - + + + -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- + ^ ^ ^ ^ | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | +-----------------------------------+ | +-------+ +-------------+ | | | | | +-----+--------+ +---+----------+ +------------+--------+ +------------+---+ | MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce | +--------------+ +--------------+ +---------------------+ +----------------+Copy the code

Mobile phones are as follows:

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

This is enough for you to know about The Pytorch distributed training!

Horovod uses _ Distributed model training with HoroVOd

Spark’s new vision: Make deep learning easier to use

Scaling model training in PyTorch using distributed data parallel

Scaling model training in PyTorch using distributed data parallelism

A developer-friendly guide to mixed precision training with PyTorch

Developer-friendly PyTorch hybrid precision training guide

It’s 2020, why isn’t deep learning 100% on the cloud yet?

By 2020, why can’t we have 100% deep learning in the cloud?

Take you through the Horovod Distributed training framework

Using Horovod in Amazon SageMaker pipeline mode to implement multi-GPU distributed training

Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes

Horovod- Based on the TensorFlow distributed deep learning framework