0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the sixth in a series, looks at the Horovod background thread architecture.

This article follows on from above. Due to the word limit, this article is divided into two parts, please forgive me.

The previous links are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

0x04 Overall code

4.1 Background Threads

BackgroundThreadLoop is a background thread in the training process. It is mainly responsible for communicating with other nodes and processing the communication request from the front end. It will call RunLoopOnce and check whether there are any tensor in tensor_queue that needs to be communicated. If there are synchronization updates with other nodes, then perform communication operations.

You can see the basic logic in the BackgroundThreadLoop function:

  • Depending on the compilation configuration, it determines how to Initialize it. For example, mPI_context. Initialize is initialized only when the MPI is compiled.
  • Initialize the controller to create the corresponding controller for globalState based on the loaded collection communication library (MPI or Gloo).
  • Get various configurations, like local_rank;
  • Set background thread affinity;
  • Set GPU stream;
  • Set timeline configuration.
  • Set the Tensor Fusion threshold, cycle time, Response cache capacity, flag for hierarchical Allreduce….. ;
  • Set auto-tuning, chunk size;
  • Reset operation Manager;
  • Enter the key code RunLoopOnce;

The abbreviated version of the code looks like this:

BackgroundThreadLoop(HorovodGlobalState& state) {
 ......

#if HAVE_MPI
  // Initialize mpi context
#if HAVE_DDL
  // If DDL is enabled, let DDL ops manage MPI environment.
  auto mpi_ctx_manager = DDL_MPIContextManager(ddl_context, gpu_context);
#else
  // Otherwise, let MPI ops be in charge.
  auto mpi_ctx_manager = MPIContextManager(a);#endif
  // Mpi_context creates mpI threads and some mpiOps based on the information passed from the front end and environment variables
  mpi_context.Initialize(state.controller->GetRanks(), mpi_ctx_manager);
#endif.// The global_size, local_size, rank, and is_coordinator information of different nodes will be synchronized
  // Initialize controller
  state.controller->Initialize(a);int local_size = state.controller->GetLocalSize(a);int local_rank = state.controller->GetLocalRank(a); .// Set op_Manager, which registers ops for different collection communication libraries
  op_manager.reset(CreateOperationManager(state));

  // Signal that initialization is completed.
  state.initialization_done = true;

  // Iterate until shutdown.
  try {
    while (RunLoopOnce(state));
  } catch (const std::exception& ex) {
    LOG(ERROR) << "Horovod background loop uncaught exception: " << ex.what();
  }
}

Copy the code

4.2 Where to build a ring

Now, you might wonder, since Horovod is Ring Allreduce, where exactly is the ring set up? Let’s take a look at a few implementations. Because a detailed study would require in-depth MPI, GLOo, etc., which is beyond the scope of this article, we only have a general understanding.

2 NCCL call

Let’s look at NCCL first.

4.2.1.1 NCCL

NCCL is short for Nvidia Collective Multi-GPU Communication Library, It is a collective communication communication (All-gather, Reduce, broadcast) library for multiple Gpus. Nvidia has done a lot of optimization to achieve high communication speed over PCIe, Nvlink, and InfiniBand.

4.2.1.2 Horovod

In NCCLAllreduce::Execute we can see that NCCLAllreduce is called, which is the NCCL API, so we can infer that the argument *nccl_op_context_. Nccl_comm_ should be the key.

Status NCCLAllreduce::Execute(std::vector<TensorTableEntry>& entries,
                              const Response& response) {

  // Do allreduce.
  auto nccl_result = ncclAllReduce(fused_input_data, buffer_data,
                                   (size_t) num_elements,
                                   GetNCCLDataType(first_entry.tensor), ncclSum,
                                   *nccl_op_context_.nccl_comm_, *gpu_op_context_.stream);
}
Copy the code

Nccl_op_context_ is the NCCLOpContext type. NCCLOpContext is defined as follows:

class NCCLOpContext {
public:
  void InitNCCLComm(const std::vector<TensorTableEntry>& entries,
                    const std::vector<int32_t>& nccl_device_map);

  ncclComm_t* nccl_comm_;
};
Copy the code

So if we look at how the nccl_comm_ parameter is initialized, we can see that it calls ncclCommInitRank to initialize.

void NCCLOpContext::InitNCCLComm(const std::vector<TensorTableEntry>& entries,
                                 const std::vector<int32_t>& nccl_device_map) {
  // Ensure NCCL communicator is in the map before executing operation.
  ncclComm_t& nccl_comm = nccl_context_->nccl_comms[global_state_->current_nccl_stream][nccl_device_map];
  if (nccl_comm == nullptr) {
    auto& timeline = global_state_->timeline;
    timeline.ActivityStartAll(entries, INIT_NCCL);

    int nccl_rank, nccl_size;
    Communicator nccl_id_bcast_comm;
    // Get the rank information
    PopulateNCCLCommStrategy(nccl_rank, nccl_size, nccl_id_bcast_comm);

    ncclUniqueId nccl_id;
    global_state_->controller->Bcast((void*)&nccl_id, sizeof(nccl_id), 0,
                                         nccl_id_bcast_comm);

    ncclComm_t new_nccl_comm;
    // NCCL is called and the rank information is passed
    auto nccl_result = ncclCommInitRank(&new_nccl_comm, nccl_size, nccl_id, nccl_rank);
    nccl_context_->ErrorCheck("ncclCommInitRank", nccl_result, nccl_comm);
    nccl_comm = new_nccl_comm;

    // Barrier helps NCCL to synchronize after initialization and avoid
    // deadlock that we've been seeing without it.
    global_state_->controller->Barrier(Communicator::GLOBAL);
    timeline.ActivityEndAll(entries);
  }

  nccl_comm_ = &nccl_comm;
}
Copy the code

PopulateNCCLCommStrategy is just getting the rank information from the global state.

void NCCLOpContext::PopulateNCCLCommStrategy(int& nccl_rank, int& nccl_size,
                                             Communicator& nccl_id_bcast_comm) {
  if (communicator_type_ == Communicator::GLOBAL) {
    nccl_rank = global_state_->controller->GetRank(a); nccl_size = global_state_->controller->GetSize(a); }else if (communicator_type_ == Communicator::LOCAL) {
    nccl_rank = global_state_->controller->GetLocalRank(a); nccl_size = global_state_->controller->GetLocalSize(a); }else {
    throw std::logic_error("Communicator type " + std::to_string(communicator_type_) +
                            " is not supported in NCCL mode.");
  }
  nccl_id_bcast_comm = communicator_type_;
}
Copy the code

So we have to look at the NCCL source code.

4.2.1.3 NCCL In

You can see this in init.cc

NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
  NVTX3_FUNC_RANGE_IN(nccl_domain);
  int cudaDev;
  CUDACHECK(cudaGetDevice(&cudaDev));
  // Initialize here
  NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev));
  return ncclSuccess;
}
Copy the code

So let’s see, ncclAsyncInit is called to do the final initialization, passing in the total number of ranks, the process’s own MyRank.

static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
  ncclResult_t res;
  char* env = getenv("NCCL_COMM_ID");

  NCCLCHECKGOTO(ncclInit(), res, end);
  // Make sure the CUDA runtime is initialized.
  CUDACHECKGOTO(cudaFree(NULL), res, end);
  NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank"."newcomm"), res, end);

  if (ncclAsyncMode()) {
    // ncclAsyncInit is called to complete the final initialization, passing in the total number of rank, the process's own myRank
    NCCLCHECKGOTO(ncclAsyncInit(ncclCommInitRankSync, newcomm, nranks, commId, myrank, cudaDev), res, end);
  } else {
    NCCLCHECKGOTO(ncclCommInitRankSync(newcomm, nranks, commId, myrank, cudaDev), res, end);
  }

end:
  if (ncclAsyncMode()) return ncclAsyncErrCheck(res);
  else return res;
}
Copy the code

NcclComm_t is actually a typedef for ncclComm, so let’s look at the ncclComm definition, which includes the total number of ranks, myRank for the process itself.

struct ncclComm {
  struct ncclChannel channels[MAXCHANNELS];.// Bitmasks for ncclTransportP2pSetup
  int connect;
  uint32_t* connectSend;
  uint32_t* connectRecv;

  int rank;    // my rank in the communicator
  int nRanks;  // number of GPUs in communicator
  int cudaDev; // my cuda device index
  int64_t busId;   // my PCI bus ID in int format

  int node;
  int nNodes;
  int localRanks;

  // Intra-process sync
  int intraRank;
  int intraRanks;
  int* intraBarrier;
  intintraPhase; . };Copy the code

So, we can basically see that horovod sends in the rank information, and NCCL groups rings based on that.

4.2.2 GLOO

In GlooContext::Initialize, you can see that Horovod sent rank information to the Rendezvous Server through Rendezvous.

The Gloo is ringed internally.

Among them, cross_Rank is needed by Hierarchical AllReduce.

void GlooContext::Initialize(const std::string& gloo_iface) {

  attr device_attr;
  device_attr.iface = gloo_iface;

  device_attr.ai_family = AF_UNSPEC;
  auto dev = CreateDevice(device_attr);
  auto timeout = GetTimeoutFromEnv(a);auto host_env = std::getenv(HOROVOD_HOSTNAME); std::string hostname = host_env ! =nullptr ? std::string(host_env) : std::string("localhost");

  int rank = GetIntEnvOrDefault(HOROVOD_RANK, 0);
  int size = GetIntEnvOrDefault(HOROVOD_SIZE, 1);
  int local_rank = GetIntEnvOrDefault(HOROVOD_LOCAL_RANK, 0);
  int local_size = GetIntEnvOrDefault(HOROVOD_LOCAL_SIZE, 1);
  int cross_rank = GetIntEnvOrDefault(HOROVOD_CROSS_RANK, 0);
  int cross_size = GetIntEnvOrDefault(HOROVOD_CROSS_SIZE, 1);

  auto rendezvous_addr_env = std::getenv(HOROVOD_GLOO_RENDEZVOUS_ADDR);
  auto rendezvous_port = GetIntEnvOrDefault(HOROVOD_GLOO_RENDEZVOUS_PORT, - 1);

  bool elastic = GetBoolEnvOrDefault(HOROVOD_ELASTIC, false);
  if (elastic && reset_) {
    std::string server_addr = rendezvous_addr_env;
    std::string scope = HOROVOD_GLOO_GET_RANK_AND_SIZE;
    HTTPStore init_store(server_addr, rendezvous_port, scope, rank);

    auto key = hostname + ":" + std::to_string(local_rank);
    std::vector<char> result = init_store.get(key);
    std::string s(result.begin(), result.end());
    std::stringstream ss(s);

    int last_rank = rank;
    int last_size = size;
    int last_local_rank = local_rank;
    int last_local_size = local_size;
    int last_cross_rank = cross_rank;
    int last_cross_size = cross_size;

    rank = ParseNextInt(ss);
    size = ParseNextInt(ss);
    local_rank = ParseNextInt(ss);
    local_size = ParseNextInt(ss);
    cross_rank = ParseNextInt(ss);
    cross_size = ParseNextInt(ss);

    SetEnv(HOROVOD_RANK, std::to_string(rank).c_str());
    SetEnv(HOROVOD_SIZE, std::to_string(size).c_str());
    SetEnv(HOROVOD_LOCAL_RANK, std::to_string(local_rank).c_str());
    SetEnv(HOROVOD_LOCAL_SIZE, std::to_string(local_size).c_str());
    SetEnv(HOROVOD_CROSS_RANK, std::to_string(cross_rank).c_str());
    SetEnv(HOROVOD_CROSS_SIZE, std::to_string(cross_size).c_str());
  }

  // Set up different Rendezvous servers
  ctx = Rendezvous(HOROVOD_GLOO_GLOBAL_PREFIX,
                   rendezvous_addr_env, rendezvous_port,
                   rank, size, dev, timeout);

  local_ctx = Rendezvous(HOROVOD_GLOO_LOCAL_PREFIX + hostname,
                         rendezvous_addr_env, rendezvous_port,
                         local_rank, local_size, dev, timeout);

  cross_ctx = Rendezvous(HOROVOD_GLOO_CROSS_PREFIX + std::to_string(local_rank),
                         rendezvous_addr_env, rendezvous_port,
                         cross_rank, cross_size, dev, timeout);
}
Copy the code

Holdings MPI

You can see in MPIContext::Initialize, where you set the various ranks.

void MPIContext::Initialize(const std::vector<int>& ranks,
                            MPIContextManager& ctx_manager) {

  auto mpi_threads_disable = std::getenv(HOROVOD_MPI_THREADS_DISABLE);
  int required = MPI_THREAD_MULTIPLE;
  if(mpi_threads_disable ! =nullptr &&
      std::strtol(mpi_threads_disable, nullptr.10) > 0) {
    required = MPI_THREAD_SINGLE;
  }
  int is_mpi_initialized = 0;
  MPI_Initialized(&is_mpi_initialized);
  if (is_mpi_initialized) {
    int provided;
    MPI_Query_thread(&provided);
  } else {
    // MPI environment has not been created, using manager to initialize.
    ctx_manager.EnvInitialize(required);
    should_finalize = true;
  }

  if(! ranks.empty()) {
    MPI_Group world_group;
    MPI_Comm_group(MPI_COMM_WORLD, &world_group);
    MPI_Group work_group;
    MPI_Group_incl(world_group, ranks.size(), ranks.data(), &work_group);
    MPI_Comm_create_group(MPI_COMM_WORLD, work_group, 0, &(mpi_comm));
    if (mpi_comm == MPI_COMM_NULL) {
      mpi_comm = MPI_COMM_WORLD;
    }
    MPI_Group_free(&world_group);
    MPI_Group_free(&work_group);
  } else if(! mpi_comm) {// No ranks were given and no communicator provided to horovod_init() so use
    // MPI_COMM_WORLD
    MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm);
  }

  // Create local comm, Determine local rank by querying the local communicator.
  MPI_Comm_split_type(mpi_comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
                      &local_comm);

  // Get local rank and world rank for cross comm establishment.
  int local_rank, world_rank;
  MPI_Comm_rank(mpi_comm, &world_rank);
  MPI_Comm_rank(local_comm, &local_rank);

  // Create cross node communicator.
  MPI_Comm_split(mpi_comm, local_rank, world_rank, &cross_comm);

  // Create custom MPI float16 data type.
  MPI_Type_contiguous(2, MPI_BYTE, &mpi_float16_t);
  MPI_Type_commit(&mpi_float16_t);

  // Create custom MPI float16 summation op.
  MPI_Op_create(&float16_sum, 1, &mpi_float16_sum);
}
Copy the code

0x05 Service logic

Let’s look at the business logic in detail.

5.1 Overall RunLoopOnce Services

RunLoopOnce is responsible for the overall business logic and has the following functions:

  • Calculate whether sleep is needed, that is, check whether one cycle time has elapsed since the last cycle;

  • Use ComputeResponseList to coordinate rank 0 with worker to obtain Request and calculate response;

    Rank 0 is going to iterate through the response_list, executing each response one by one.

    Response_list is processed by rank 0, and response cache is processed by another rank.

  • So PerformOperation is going to do collective for each response

  • If auto tune is needed, sync the parameters;

We can see that Horovod’s workflow is largely a producer-consumer model, as described earlier. The controller’s job here is to coordinate: it will talk to each rank about which requests are ready, and it will perform collective actions for those requests.

The abbreviated version of the code looks like this:

bool RunLoopOnce(HorovodGlobalState& state) {
  // This delay determines thread frequency and communication message latency.// Let the rank 0 coordinate with the worker to get the Request and calculate the response
  auto response_list =
      state.controller->ComputeResponseList(horovod_global.shut_down, state);

  // Get tensor name and size data for autotuning..// Perform the collective operation. All nodes should end up performing
  // the same operation.
  // For each response, do collective
  int rank = state.controller->GetRank(a);for (auto& response : response_list.responses()) {
    PerformOperation(response, horovod_global);
  }

  // If auto tune is required, sync the parameters
  if (state.parameter_manager.IsAutoTuning()) {
    bool should_sync =
        state.parameter_manager.Update(tensor_names, total_tensor_size);

    if (should_sync) {
      state.controller->SynchronizeParameters();
    }
  }

  return! response_list.shutdown(a); }Copy the code

The process is as follows:

+---------------------------------+
|                                 |             +-----------------------------+
|  BackgroundThreadLoop           |             |                             |
|                                 |             | OperationManager            |
|   +--------------------------+  |             |                             |
|   |  RunLoopOnce             |  |             |                             |
|   |                          |  |             |                             |
|   |                          |  |             |                             |
|   |     ComputeResponseList  |  |    +----------> ExecuteOperation          |
|   |             +            |  |    |        |                             |
|   |             |            |  |    |        |                             |
|   |             |            |  |    |        |                             |
|   |             |            |  |    | 1| | | | v | | | | | | | | | | | | | | PerformOperation +----------+ | | | | | | | | | +--------------------------+ | | |  | | | | +---------------------------------+ +-----------------------------+Copy the code

5.2 ComputeResponseList computes response

The single most important function call in a background thread is ComputeResponseList. ComputeResponseList implements the coordination process, that is, to coordinate rank 0 with the worker to obtain the Request and calculate the response.

Horovod also follows Coordinator’s design, similar to Baidu’s. Coordinators in Both Baidu and Horovod are actors that coordinate the work of multiple processes. Horovod also introduces a new abstract OP_Manager when actually performing the computation. To some extent, we can think of controller as an abstraction of communication and coordination management capabilities, whereas OP_Manager is an abstraction of actual computing.

5.2.1 General Ideas

The function of the Controller: : ComputeResponseList is: the worker sends a request to rank zero, then the coordinator to handle all the request of the worker, find ready, fusion, finally the results sent to the other rank:

  • PopMessagesFromQueue takes all the current requests from your GlobalState Tensor Quene and processes them using caching. It is then cached in message_queue_tmp through a series of processes;
  • Cache information is synchronized with each other to get response list jointly stored by each worker;
  • Determine whether further synchronization is required, such as whether the response is all in cache;
  • If synchronization is not required, then
    • Note All messages in the queue are cached and no additional coordination is required. So we just merge the cached response and put it in the response_list, and the next time slice will continue processing;
  • If synchronization is required, then
    • If I have rank zero,

      • Since Rank 0 also participates in machine learning training, you need to add the request from Rank 0 to the message table as well. Accept requests from other ranks and add them to message_table_. Synchronization blocks at this point.
      • Rank 0 accepts requests from other ranks by using RecvReadyTensors and adds them to ready_to_reduce. Synchronization blocks at this point. The Coordinator continues to receive this information until the number of Done requests is equal to global_size.
      • Then traverse rank 0+1 ~ rank n, process each response of rank one by one;
      • Finally, message Table has all the reducible lists, and responses are derived from the following three parts:
        • Source 1, response_cache_ in rank 0;
        • Source 2, processing ready_to_reduce one by one;
        • Source 3, join_Response
      • Use FuseResponses to do fusion on the tensor: merge some tensor into a big one, and then do collective operation.
      • The coordinator will find all the tensors that are ready to reduce and return a response to all the workers via SendFinalTensors(response_list), or an error if the information is misunderstood. A Done is also sent when the send is complete.
    • For any other rank, then:

      • When the worker reaches the front all_reduce sentence, it will be sorted into a message_list via the SendReadyTensors function to the primary node (coordinator, Rank 0) sends a Request indicating that I intend to reduce, then iteratively sends the tensor information about preparing the reduce through the Message_list, and finally has a Done Request, then blocks synchronously.
      • The Worker uses RecvFinalTensors(response_list) to listen for response information, but accepts the ready response list from Rank 0, synchronously blocking. When Done is received, it attempts to call PerFormation to reduce.
    • Coordinators and workers can organize the synchronized information into an array of responses to the subsequent PerformOperation.

The coordinator and the corresponding worker block the same instruction:

  • SendReadyTensors and RecvReadyTensors are blocked to MPI_Gather;
  • SendFinalTensors and RecvFinalTensors to MPI_Bcast;

The coordinator sends MPI_Bcast, and the worker sends MPI_Gather. In communication, the length of the message needs to be synchronized first, and then the message is synchronized.

The details are as follows:

                                                      +
                                                      |
                    ComputeResponseList in rank 0     |     ComputeResponseList in worker(rank n)
                                                      |
                                                      |
                       message_queue_tmp              |          message_queue_tmp
                                                      |
                               +                      |                +
                               |                      |                |
                               |PopMessagesFromQueue  |                | PopMessagesFromQueue
                               |                      |                |
                               |                      |                |
                               |           CoordinateCacheAndState     |
                               |                      |                |
                               |    <--------------------------------> |
                               |                      |                |
                               v                      |                v
                                                      |
RecvReadyTensors(ready_to_reduce, ready_list)  <------------->  SendReadyTensors(message_list)
                               +                      |                +
                               |                      |                |
                               |                      |                |
                               |                      |                |
                               |                      |                |
                               v                      |                |
                        message_table_                |                |
                               +                      |                |
                               |                      |                |
                               |                      |                |
                               v                      |                |
                         FuseResponses                |                |
                               +                      |                |
                               |                      |                |
                               |                      |                |
                               v                      |                v
           SendFinalTensors(response_list)  <---------------->  RecvFinalTensors(response_list)
                               +                      |                +
                               |                      |                |
                               |                      |                |
                               |                      |                |
                               v                      |                v
                        PerformOperation              |           PerformOperation
                                                      |
                                                      +

Copy the code

The mobile phone is shown in the picture:

5.2.2 Detailed analysis

Below is a more detailed analysis, referred to the information on the Internet, oneself also do understand to read.

ResponseList Controller::ComputeResponseList(std::atomic_bool& shut_down,
                                             HorovodGlobalState& state) {
  // Update cache capacity if autotuning is active.
  if (parameter_manager_.IsAutoTuning()) {
    response_cache_.set_capacity((int)parameter_manager_.CacheEnabled() *
                                 cache_capacity_);
  }

  // Copy the data structures out from parameters.
  // However, don't keep the lock for the rest of the loop, so that
  // enqueued stream callbacks can continue.

  CacheCoordinator cache_coordinator(response_cache_.num_active_bits());

  // Take all the current requests from Tensor Quene and process them
  // message queue used only in this cycle
  std::deque<Request> message_queue_tmp;
  tensor_queue_.PopMessagesFromQueue(message_queue_tmp);
  for (auto& message : message_queue_tmp) {
    if (message.request_type() == Request::JOIN) {
      state.joined = true;
      // set_uncached_in_queue Indicates that there is no cache
      cache_coordinator.set_uncached_in_queue(true);
      continue;
    }

    // Cache is used to cache how many responses this rank has received.
    // Keep track of cache hits
    if (response_cache_.capacity(a) >0) {
      // We need to see if the tensor has got the response. Why cache? All reduce is implemented immediately after all are ready.
      // The new tensor has the same parameters as the cached tensor, such as device, dtype, shape, and so on. If not, the cache is identified as INVALID. Does this change during deep learning training?
      auto cache_ = response_cache_.cached(message);
      if (cache_ == ResponseCache::CacheState::HIT) {
        uint32_t cache_bit = response_cache_.peek_cache_bit(message);
        cache_coordinator.record_hit(cache_bit);

        // Record initial time cached tensor is encountered in queue.
        stall_inspector_.RecordCachedTensorStart(message.tensor_name());

      } else {
        // If there is no cache
        if (cache_ == ResponseCache::CacheState::INVALID) {
          // Handle invalid cache records
          uint32_t cache_bit = response_cache_.peek_cache_bit(message);
          cache_coordinator.record_invalid_bit(cache_bit);
        }
        // If there is no cache, add to set_uncached_in_queue
        cache_coordinator.set_uncached_in_queue(true);

        // Remove from stall
        // Remove timing entry if uncached or marked invalid.
        stall_inspector_.RemoveCachedTensor(message.tensor_name()); }}}if (state.joined && response_cache_.capacity(a) >0) {
    for (uint32_t bit : response_cache_.list_all_bits()) {
      cache_coordinator.record_hit(bit); }}// Flag indicating that the background thread should shut down.
  bool should_shut_down = shut_down;

  / / processing stalled
  // Check for stalled tensors.
  if (stall_inspector_.ShouldPerformCheck()) {
    if (is_coordinator_) {
      should_shut_down |= stall_inspector_.CheckForStalledTensors(size_);
    }

    if (response_cache_.capacity(a) >0) {
      stall_inspector_.InvalidateStalledCachedTensors(cache_coordinator);
    }
    stall_inspector_.UpdateCheckTime(a); } cache_coordinator.set_should_shut_down(should_shut_down);

  if (response_cache_.capacity(a) >0) {
    // Why do we synchronize cache information with each other?
    // Obtain common cache hits and cache invalidations across workers. Also,
    // determine if any worker has uncached messages in queue or requests
    // a shutdown. This function removes any invalid cache entries, if they
    // exist.
    // This will synchronize and remove invalid from response_cache_.
    // The goal is to get the response list stored by each worker
    CoordinateCacheAndState(cache_coordinator);
      
    // Remove uncommon cached tensors from queue and replace to state
    // queue for next cycle. Skip adding common cached tensors to
    // queue as they are handled separately.
      
    // In this case, cache_coordinator is already a response list shared by all workers. Responses that are not in the common Response list need to be removed.
    // Why do some workers have no certain response?
    Tensor request messages to see if there is cache and update tensor_queue_ accordingly.
    std::deque<Request> messages_to_replace;
    size_t num_messages = message_queue_tmp.size(a);for (size_t i = 0; i < num_messages; ++i) {
      auto& message = message_queue_tmp.front(a);if (response_cache_.cached(message) == ResponseCache::CacheState::HIT) {
        uint32_t cache_bit = response_cache_.peek_cache_bit(message);
        if (cache_coordinator.cache_hits().find(cache_bit) ==
            cache_coordinator.cache_hits().end()) {
          // Try to process again in next cycle.
          messages_to_replace.push_back(std::move(message));
        } else {
          // Remove timing entry for messages being handled this cycle.
          stall_inspector_.RemoveCachedTensor(message.tensor_name()); }}else {
        // Remove timing entry for messages being handled this cycle.
        stall_inspector_.RemoveCachedTensor(message.tensor_name());
        message_queue_tmp.push_back(std::move(message));
      }
      message_queue_tmp.pop_front(a); } tensor_queue_.PushMessagesToQueue(messages_to_replace);
  } 
  // End of response_cache_.capacity()

  
  ResponseList response_list;
  response_list.set_shutdown(cache_coordinator.should_shut_down());

  bool need_communication = true;
  // Determine if further synchronization is required. For example, response is all in cache.
  if (response_cache_.capacity(a) >0 &&
      !cache_coordinator.uncached_in_queue()) {
    // if cache is enabled and no uncached new message coming in, no need for
    // additional communications
    need_communication = false;

    // If no messages to send, we can simply return an empty response list;
    if (cache_coordinator.cache_hits().empty()) {
      return response_list;
    }
    // otherwise we need to add cached messages to response list.
  }

  if(! need_communication) {// All messages in the queue are cached and no additional coordination is required. So I'm just going to merge the cached response into the response_list
    // If all messages in queue have responses in cache, use fast path with
    // no additional coordination.

    // If group fusion is disabled, fuse tensors in groups separately
    if(state.disable_group_fusion && ! group_table_.empty()) {
      // Note: need group order to be based on position in cache for global consistency
      std::vector<int> common_ready_groups;
      std::unordered_set<int> processed;
      for (auto bit : cache_coordinator.cache_hits()) {
        const auto& tensor_name = response_cache_.peek_response(bit).tensor_names(to)0];
        int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
        if(group_id ! = NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
          common_ready_groups.push_back(group_id);
          processed.insert(group_id); }}for (auto id : common_ready_groups) {
        std::deque<Response> responses;
        for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
          auto bit = response_cache_.peek_cache_bit(tensor_name);
          responses.push_back(response_cache_.get_response(bit));
          // Erase cache hit to avoid processing a second time.
          cache_coordinator.erase_hit(bit);
        }

        FuseResponses(responses, state, response_list);
      }
    }

    std::deque<Response> responses;
    // Convert cache hits to responses. Populate so that least
    // recently used responses get priority. All workers call the code
    // here so we use the get method here to consistently update the cache
    // order.
    for (auto bit : cache_coordinator.cache_hits()) {
      responses.push_back(response_cache_.get_response(bit));
    }

    // Fuse responses as normal.
    FuseResponses(responses, state, response_list);
    response_list.set_shutdown(cache_coordinator.should_shut_down());
  } else {
    // There are no cached messages coming in. We need to find out if these messages can be reduced.
    // There are uncached messages coming in, need communication to figure out
    // whether those are ready to be reduced.

    // Collect all tensors that are ready to be reduced. Record them in the
    // tensor count table (rank zero) or send them to rank zero to be
    // recorded (everyone else).
    std::vector<std::string> ready_to_reduce;

    if (is_coordinator_) {
      // I am a rank 0, and for the master process, I have a ready tensor.
      // Rank 0 will also participate in machine learning training, so you need to add the request from rank 0 to the message table.
      while(! message_queue_tmp.empty()) { // Note that the request in message_queue_tmp is from the master process
        // Pop the first available message
        Request message = message_queue_tmp.front(a); message_queue_tmp.pop_front(a);if (message.request_type() == Request::JOIN) {
          state.joined_size++;
          continue;
        }

        bool reduce = IncrementTensorCount(message, state.joined_size);
        stall_inspector_.RecordUncachedTensorStart(
            message.tensor_name(), message.request_rank(), size_);
        if (reduce) {
          ready_to_reduce.push_back(message.tensor_name()); }}// Accept other rank requests and add other rank ready requests to message_table_.
      // This is where synchronization blocks
      // Receive ready tensors from other ranks
      std::vector<RequestList> ready_list;
      RecvReadyTensors(ready_to_reduce, ready_list);

      // Process all rank requests.
      // Process messages.
      // Pass through rank 0+1 ~ rank n, process response of each rank one by one
      for (int i = 1; i < size_; ++i) { // size_ is how many ranks there are
        
        // Response list for each rank.
        auto received_message_list = ready_list[i];
        for (auto& received_message : received_message_list.requests()) {
          auto& received_name = received_message.tensor_name(a);// The Join type message indicates that a new rank has joined. Horovod supports flexibility
          if (received_message.request_type() == Request::JOIN) {
            state.joined_size++;  // Add the number of ranks that the tensor is already ready for. If all the ranks are ready, then send the other rank
            continue;
          }

          bool reduce = IncrementTensorCount(received_message, state.joined_size);
          stall_inspector_.RecordUncachedTensorStart(
              received_message.tensor_name(), received_message.request_rank(),
              size_);
            
          // If the maximum value is reached, add the reduce function to ready_to_reduce.
          if (reduce) {
            ready_to_reduce.push_back(received_name); }}if (received_message_list.shutdown()) {
          // Received SHUTDOWN request from one of the workers.
          should_shut_down = true; }}// Check if tensors from previous ticks are ready to reduce after Joins.
      // Traverse message_table_ to see if the response processed in the last round can be reduced in this round
      if (state.joined_size > 0) {
        for (auto& table_iter : message_table_) {
          int count = (int)table_iter.second.size(a);if (count == (size_ - state.joined_size) &&
              std::find(ready_to_reduce.begin(), ready_to_reduce.end(),
                        table_iter.first) == ready_to_reduce.end()) {
            state.timeline.NegotiateEnd(table_iter.first);
            ready_to_reduce.push_back(table_iter.first); }}}// Fuse tensors in groups before processing others.
      if(state.disable_group_fusion && ! group_table_.empty()) {

        // Extract set of common groups from coordinator tensor list and cache hits.
        std::vector<int> common_ready_groups;
        std::unordered_set<int> processed;

        for (const auto& tensor_name : ready_to_reduce) {
          int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
          if(group_id ! = NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
            common_ready_groups.push_back(group_id);
            processed.insert(group_id);
            // Leaving name in list, to be skipped later.}}if (response_cache_.capacity(a) >0) {
          for (auto bit : cache_coordinator.cache_hits()) {
            const auto& tensor_name = response_cache_.peek_response(bit).tensor_names(to)0];
            int group_id = group_table_.GetGroupIDFromTensorName(tensor_name);
            if(group_id ! = NULL_GROUP_ID && processed.find(group_id) == processed.end()) {
              common_ready_groups.push_back(group_id);
              processed.insert(group_id); }}}// For each ready group, form and fuse response lists independently
        for (auto id : common_ready_groups) {
          std::deque<Response> responses;
          for (const auto &tensor_name : group_table_.GetGroupTensorNames(id)) {
            if (message_table_.find(tensor_name) ! = message_table_.end()) {
              // Uncached message
              Response response = ConstructResponse(tensor_name, state.joined_size);
              responses.push_back(std::move(response));

            } else {
              // Cached message
              auto bit = response_cache_.peek_cache_bit(tensor_name);
              responses.push_back(response_cache_.get_response(bit));
              // Erase cache hit to avoid processing a second time.
              cache_coordinator.erase_hit(bit); }}FuseResponses(responses, state, response_list); }}// At this point, the Message table has all reducible lists
        
      // At this point, rank zero should have a fully updated tensor count
      // table and should know all the tensors that need to be reduced or
      // gathered, and everyone else should have sent all their information
      // to rank zero. We can now do reductions and gathers; rank zero will
      // choose which ones and in what order, and will notify the other ranks
      // before doing each reduction.
      std::deque<Response> responses;

      // The sources of responses are the following three parts
        
      // source 1, response_cache_ in rank 0
      if (response_cache_.capacity(a) >0) {
        // Prepopulate response list with cached responses. Populate so that
        // least recently used responses get priority. Since only the
        // coordinator rank calls this code, use peek instead of get here to
        // preserve cache order across workers.
        // No need to do this when all ranks did Join.
        if (state.joined_size < size_) {
          for (auto bit : cache_coordinator.cache_hits()) {
            responses.push_back(response_cache_.peek_response(bit)); }}}// Source 2, process ready_to_reduce one by one
      for (auto& tensor_name : ready_to_reduce) {
        // Skip tensors in group that were handled earlier.
        if(state.disable_group_fusion && ! group_table_.empty() &&
            group_table_.GetGroupIDFromTensorName(tensor_name) ! = NULL_GROUP_ID) {continue;
        }

        Response response = ConstructResponse(tensor_name, state.joined_size);
        responses.push_back(std::move(response));
      }
        
      // Source 3, join_response
      if (state.joined_size == size_) {
        // All ranks did Join(). Send the response, reset joined size.
        Response join_response;
        join_response.set_response_type(Response::JOIN);
        join_response.add_tensor_name(JOIN_TENSOR_NAME);
        responses.push_back(std::move(join_response));
        state.joined_size = 0;
      }
        
      // Merge
      FuseResponses(responses, state, response_list);
      response_list.set_shutdown(should_shut_down);

      // Broadcast final results to other ranks.
      SendFinalTensors(response_list);

    } else {
      // If I am another rank and I am not the master, I will send my ready tensor to the master and receive my ready tensor list
      RequestList message_list;
      message_list.set_shutdown(should_shut_down);
      while(! message_queue_tmp.empty()) {
        message_list.add_request(message_queue_tmp.front());
        message_queue_tmp.pop_front(a); }// Send Request to Rank 0, synchronization block
      // Send ready tensors to rank zero
      SendReadyTensors(message_list);

      // Receive a ready response list from Rank 0 and block synchronously
      // Receive final tensors to be processed from rank zero
      RecvFinalTensors(response_list); }}if(! response_list.responses().empty()) {
    std::string tensors_ready;
    for (const auto& r : response_list.responses()) {
      tensors_ready += r.tensor_names_string() + "; "; }}// If need_communication is false, meaning no uncached message coming in,
  // thus no need to update cache.
  if (need_communication && response_cache_.capacity(a) >0) {
    // All workers add supported responses to cache. This updates the cache
    // order consistently across workers.
    for (auto& response : response_list.responses()) {
      if ((response.response_type() == Response::ResponseType::ALLREDUCE ||
           response.response_type() == Response::ResponseType::ADASUM ||
           response.response_type() == Response::ResponseType::ALLTOALL) &&
          (int)response.devices().size() == size_) {
        response_cache_.put(response, tensor_queue_, state.joined); }}}// Reassign cache bits based on current cache order.
  response_cache_.update_cache_bits(a);return response_list;
}

Copy the code

We’re going to focus on a couple of functions.

5.2.3 requires IncrementTensorCount

The effect of IncrementTensorCount is to calculate if all the tensor is ready.

If bool ready_to_reduce = count == (size_-joined_size), then we know that this can be allreduce.

bool Controller::IncrementTensorCount(const Request& msg, int joined_size) {
  auto& name = msg.tensor_name(a);auto table_iter = message_table_.find(name);
  if (table_iter == message_table_.end()) {
    std::vector<Request> messages = {msg};
    messages.reserve(static_cast<unsigned long>(size_));
    message_table_.emplace(name, std::move(messages));
    table_iter = message_table_.find(name);
  } else {
    std::vector<Request>& messages = table_iter->second;
    messages.push_back(msg);
  }

  std::vector<Request>& messages = table_iter->second;
  int count = (int)messages.size(a);bool ready_to_reduce = count == (size_ - joined_size); // Determine whether allReduce is possible

  return ready_to_reduce;
}
Copy the code

The actual call is to rank 0 to see if it is allReduce.

If you add IncrementTensorCount, you are complete. You can add the Request to message_table_.

    if (is_coordinator_) {

      while(! message_queue_tmp.empty()) {
        // Pop the first available message
        Request message = message_queue_tmp.front(a); message_queue_tmp.pop_front(a);if (message.request_type() == Request::JOIN) {
          state.joined_size++;
          continue;
        }

        // Call here
        bool reduce = IncrementTensorCount(message, state.joined_size);
        stall_inspector_.RecordUncachedTensorStart(
            message.tensor_name(), message.request_rank(), size_);
        if (reduce) {
          ready_to_reduce.push_back(message.tensor_name()); }}Copy the code

5.2.4 RecvReadyTensors

This function is used to collect requests from other ranks.

  • Use MPI_Gather to determine the message length;
  • Collect messages using MPI_Gatherv;
  • I’m not going to process rank 0 because rank 0 is already processed;
void MPIController::RecvReadyTensors(std::vector
       <:string>
        & ready_to_reduce, std::vector
        
         & ready_list)
         {
  // Rank zero has put all its own tensors in the tensor count table.
  // Now, it should count all the tensors that are coming from other
  // ranks at this tick.

  // 1. Get message lengths from every rank.
  auto recvcounts = new int[size_];
  recvcounts[0] = 0;
  MPI_Gather(MPI_IN_PLACE, 1, MPI_INT, recvcounts, 1, MPI_INT, RANK_ZERO,
             mpi_ctx_.mpi_comm);

  // 2. Compute displacements.
  auto displcmnts = new int[size_];
  size_t total_size = 0;
  for (int i = 0; i < size_; ++i) {
    if (i == 0) {
      displcmnts[i] = 0;
    } else {
      displcmnts[i] = recvcounts[i - 1] + displcmnts[i - 1];
    }
    total_size += recvcounts[i];
  }

  // 3. Collect messages from every rank.
  auto buffer = new uint8_t[total_size];
  MPI_Gatherv(nullptr.0, MPI_BYTE, buffer, recvcounts, displcmnts, MPI_BYTE,
              RANK_ZERO, mpi_ctx_.mpi_comm);

  // 4. Process messages.
  // create a dummy list for rank 0
  ready_list.emplace_back(a);for (int i = 1; i < size_; ++i) {
    auto rank_buffer_ptr = buffer + displcmnts[i];
    RequestList received_message_list;
    RequestList::ParseFromBytes(received_message_list, rank_buffer_ptr);
    ready_list.push_back(std::move(received_message_list));
  }

  // 5. Free buffers.
  delete[] recvcounts;
  delete[] displcmnts;
  delete[] buffer;
}
Copy the code

5.2.5 SendReadyTensors

This function synchronizes other rank requests to Rank 0.

  • Use MPI_Gather to determine the message length;
  • Collect messages using MPI_Gatherv;
void MPIController::SendReadyTensors(RequestList& message_list) {
  std::string encoded_message;
  RequestList::SerializeToString(message_list, encoded_message);
  int encoded_message_length = (int)encoded_message.length() + 1;
  int ret_code = MPI_Gather(&encoded_message_length, 1, MPI_INT, nullptr.1,
                            MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);

  ret_code = MPI_Gatherv((void*)encoded_message.c_str(), encoded_message_length,
                         MPI_BYTE, nullptr.nullptr.nullptr, MPI_BYTE,
                         RANK_ZERO, mpi_ctx_.mpi_comm);
}
Copy the code

5.2.6 SendFinalTensors

Rank 0 sends the final result to another rank;

void MPIController::SendFinalTensors(ResponseList& response_list) {
  // Notify all nodes which tensors we'd like to reduce at this step.
  std::string encoded_response;
  ResponseList::SerializeToString(response_list, encoded_response);
  int encoded_response_length = (int)encoded_response.length() + 1;
  MPI_Bcast(&encoded_response_length, 1, MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);

  MPI_Bcast((void*)encoded_response.c_str(), encoded_response_length, MPI_BYTE,
            RANK_ZERO, mpi_ctx_.mpi_comm);
}
Copy the code

5.2.7 RecvFinalTensors

The worker receives a ready response list from Rank 0 and blocks synchronously

void MPIController::RecvFinalTensors(ResponseList& response_list) {
  int msg_length;
  int ret_code =
      MPI_Bcast(&msg_length, 1, MPI_INT, RANK_ZERO, mpi_ctx_.mpi_comm);

  auto buffer = new uint8_t[msg_length];
  ret_code =
      MPI_Bcast(buffer, msg_length, MPI_BYTE, RANK_ZERO, mpi_ctx_.mpi_comm);
    
  ResponseList::ParseFromBytes(response_list, buffer);
  delete[] buffer;
}
Copy the code

5.3 Perform operations based on Response

We’re going to look at another important operation, PerformOperation, which is to perform an operation based on response.

They are called in the following order:

  • BackgroundThreadLoop calls RunLoopOnce;
  • RunLoopOnce if rank 0, then response_list is handled, and PerformOperation is called;
  • PerformOperation then calls op_manager -> ExecuteOperation—— ExecuteAllreduce;

As you can see, ComputeResponseList returns the response_list, and the tensor for those responses can do allReduce. And then it’s going to go through each response, do PerformOperation.

 auto response_list =
   state.controller->ComputeResponseList(horovod_global.shut_down, state);
   
  int rank = state.controller->GetRank(a);for (auto& response : response_list.responses()) {
    PerformOperation(response, horovod_global);
  }  
Copy the code

5.3.1 PerformOperation

Continue running RunLoopOnce from ComputeResponseList, The worker Node polls for each response according to the response_list returned by ComputeResponseList and calls PerformOperation to complete the corresponding Reduce work.

Status = op_manager->ExecuteOperation(entries, response); The details are as follows:

  • Can from horovod_global PerformOperation tensor_queue through function corresponding TensorEntry GetTensorEntriesFromResponse out;

  • If buffer has not been initialized, call horovod_global.Fusion_buffer.initializeBuffer to initialize.

  • Then status = op_manager->ExecuteOperation(entries, response) will call a different op->Execute(entries, response) to perform the reduce operation;

  • Then call callback for different entries, where callback is usually the front end to do the corresponding operation;

// Process a Response by doing a reduction, a gather, a broadcast, or
// raising an error.
void PerformOperation(Response response, HorovodGlobalState& state) {
  std::vector<TensorTableEntry> entries;
  auto& timeline = horovod_global.timeline;
  if (response.response_type() != Response::JOIN) {
    horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
                                                             state.joined);

    if (entries.size(a) >1) { // If there is more than one, fuse can be performed to increase the throughput
      auto first_entry = entries[0];
      Status status = horovod_global.fusion_buffer.InitializeBuffer(
          horovod_global.controller->TensorFusionThresholdBytes(),
          first_entry.device, first_entry.context,
          horovod_global.current_nccl_stream,
          [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
          [&]() { timeline.ActivityEndAll(entries); });
      if(! status.ok()) {
        for (auto& e : entries) {
          timeline.End(e.tensor_name, nullptr);
          // Callback can be null if the rank sent Join request.
          if(e.callback ! =nullptr) {
            e.callback(status); }}return; }}// On GPU data readiness is signalled by ready_event.
    Tensor will have to wait for the data to be synchronized to video memory, even though it can be done
    std::vector<TensorTableEntry> waiting_tensors;
    for (auto& e : entries) {
      if(e.ready_event ! =nullptr) {
        timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
        waiting_tensors.push_back(e); }}while(! waiting_tensors.empty()) {
      for (auto it = waiting_tensors.begin(a); it ! = waiting_tensors.end();) {
        if (it->ready_event->Ready()) {
          timeline.ActivityEnd(it->tensor_name);
          timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
          it = waiting_tensors.erase(it);
        } else {
          ++it;
        }
      }
      std::this_thread::sleep_for(std::chrono::nanoseconds(100));
    }
  }

  Status status;
  try {
    // Perform the collective operation
    status = op_manager->ExecuteOperation(entries, response);
  } catch (const std::exception& ex) {
    status = Status::UnknownError(ex.what()); }...// Call the callback function
}
Copy the code

5.3.2 ExecuteOperation

Then status = op_manager->ExecuteOperation(entries, response) calls a different op->Execute(entries, response) to perform the reduce operation.

This brings us to OperationManager.

Status OperationManager::ExecuteOperation(std::vector<TensorTableEntry>& entries,
                                          const Response& response) const {
  if (response.response_type() == Response::ALLREDUCE) {
    return ExecuteAllreduce(entries, response);
  } else if (response.response_type() == Response::ALLGATHER) {
    return ExecuteAllgather(entries, response);
  } else if (response.response_type() == Response::BROADCAST) {
    return ExecuteBroadcast(entries, response);
  } else if (response.response_type() == Response::ALLTOALL) {
    return ExecuteAlltoall(entries, response);
  } else if (response.response_type() == Response::JOIN) {
    return ExecuteJoin(entries, response);
  } else if (response.response_type() == Response::ADASUM) {
    return ExecuteAdasum(entries, response);
  } else if (response.response_type() == Response::ERROR) {
    return ExecuteError(entries, response);
  } else {
    throw std::logic_error("No operation found for response type provided"); }}Copy the code

5.3.3 ExecuteAllreduce

op->Execute(entries, response); That is, something like mpiallreduce.execute is called.

Status OperationManager::ExecuteAllreduce(std::vector<TensorTableEntry>& entries,
                                          const Response& response) const {
  for (auto& op : allreduce_ops_) {
    if (op->Enabled(*param_manager_, entries, response)) {
      return op->Execute(entries, response); }}}Copy the code

Where does allreduce_ops_ come from? In the OperationManager builder.

allreduce_ops_(std::move(allreduce_ops)),
Copy the code

So let’s look at allreduce_Ops.

5.3.4 allreduce_ops

Add allReduce_OPS in CreateOperationManager.

As you can see, the types added are roughly as follows:

  • MPI_GPUAllreduce
  • NCCLHierarchicalAllreduce
  • NCCLAllreduce
  • DDLAllreduce
  • GlooAllreduce
  • CCLAllreduce
  • MPIAllreduce
  • .
OperationManager* CreateOperationManager(HorovodGlobalState& state) {
  // Order of these operations is very important. Operations will be checked
  // sequentially from the first to the last. The first 'Enabled' operation will
  // be executed.
  std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops;
  std::vector<std::shared_ptr<AllgatherOp>> allgather_ops;
  std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops;
  std::vector<std::shared_ptr<AllreduceOp>> adasum_ops;
  std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops;

#if HAVE_MPI && HAVE_GPU // Add MPI_GPUAllreduce if an MPI is built
  if (mpi_context.IsEnabled()) {
#if HOROVOD_GPU_ALLREDUCE == 'M'
    allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
        new MPI_GPUAllreduce(&mpi_context, &gpu_context, &state)));

#elif HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N' // Add AdasumGpuAllreduceOp if NCCL is compiled
    adasum_ops.push_back(std::shared_ptr<AllreduceOp>(new AdasumGpuAllreduceOp(&mpi_context, &nccl_context, &gpu_context, &state)));

    allreduce_ops.push_back(
        std::shared_ptr<AllreduceOp>(new NCCLHierarchicalAllreduce(
            &nccl_context, &mpi_context, &gpu_context, &state)));

#elif HAVE_DDL && HOROVOD_GPU_ALLREDUCE == 'D'// Add DDLAllreduce if DDL is compiled
    allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
        new DDLAllreduce(&ddl_context, &gpu_context, &state)));
#endif

#if HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N'// Add NCCLAllreduce if NCCL is compiled
  allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
      new NCCLAllreduce(&nccl_context, &gpu_context, &state)));
#endif
Copy the code

5.3.5 MPIAllreduce

Since there are many types of Allreduce_OPS, we use MPIAllreduce as an example as follows:

class MPIAllreduce : public AllreduceOp {
public:
  MPIAllreduce(MPIContext* mpi_context, HorovodGlobalState* global_state);

  virtual ~MPIAllreduce() = default;

  Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) override;

  bool Enabled(const ParameterManager& param_manager,
               const std::vector<TensorTableEntry>& entries,
               const Response& response) const override;

protected:
  MPIContext* mpi_context_;
};
Copy the code

MPIAllreduce::Execute MPI_Allreduce is used and fusion, such as MemcpyOutFusionBuffer, is processed.

#include "mpi_operations.h"

Status MPIAllreduce::Execute(std::vector<TensorTableEntry>& entries, const Response& response) {
  auto& first_entry = entries[0];

  const void* fused_input_data;
  void* buffer_data;
  size_t buffer_len;
  int64_t num_elements = NumElements(entries);

  // Copy memory into the fusion buffer.
  auto& timeline = global_state_->timeline;
  if (entries.size(a) >1) {
    timeline.ActivityStartAll(entries, MEMCPY_IN_FUSION_BUFFER);
    MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len);
    timeline.ActivityEndAll(entries);
  } else {
    fused_input_data = first_entry.tensor->data(a); buffer_data = (void*) first_entry.output->data(a); buffer_len = (size_t) first_entry.output->size(a); }if (response.prescale_factor() != 1.0) {
    // Execute prescaling op
    ScaleBuffer(response.prescale_factor(), entries, fused_input_data, buffer_data, num_elements);
    fused_input_data = buffer_data; // for unfused, scale is done out of place
  }

  // Do allreduce.
  timeline.ActivityStartAll(entries, MPI_ALLREDUCE);
  const void* sendbuf = entries.size(a) >1 || fused_input_data == buffer_data
                        ? MPI_IN_PLACE : fused_input_data;
  int op = MPI_Allreduce(sendbuf, buffer_data,
                         (int) num_elements,
                         mpi_context_->GetMPIDataType(first_entry.tensor),
                         mpi_context_->GetMPISumOp(first_entry.tensor->dtype()),
                         mpi_context_->GetMPICommunicator(Communicator::GLOBAL));
  timeline.ActivityEndAll(entries);

  if (response.postscale_factor() != 1.0) {
    // Execute postscaling op
    ScaleBuffer(response.postscale_factor(), entries, buffer_data, buffer_data, num_elements);
  }

  // Copy memory out of the fusion buffer.
  if (entries.size(a) >1) {
    timeline.ActivityStartAll(entries, MEMCPY_OUT_FUSION_BUFFER);
    MemcpyOutFusionBuffer(buffer_data, entries);
    timeline.ActivityEndAll(entries);
  }

  return Status::OK(a); }Copy the code

The specific logic is as follows:

+---------------------------------+
|                                 |             +-----------------------+
|  BackgroundThreadLoop           |             |                       |
|                                 |             | OperationManager      |
|   +--------------------------+  |             |                       |
|   |  RunLoopOnce             |  |             |                       |
|   |                          |  |             |                       |
|   |                          |  |             |                       |      +-->  GPUAllreduce
|   |     ComputeResponseList  |  |    +----------> ExecuteOperation    |      |
|   |             +            |  |    |        |           +           |      |
|   |             |            |  |    |        |           |           |      +-->  NCCLHierarchicalAllreduce
|   |             |            |  |    |        |           |           |      |
|   |             |            |  |    | 1      |           |  2| | | | v | | | | | | +--> NCCLAllreduce | | | | | | | | | | | PerformOperation +----------+ | v | | | | | | | ExecuteAllreduce | +--> DDLAllreduce | +--------------------------+ | | + | | | | | | | | +---------------------------------+ | | | +--> GlooAllreduce | | allreduce_ops----------+ | | | | +----------------+ | |  | +--> | MPIAllreduce | +-----------------------+ | | | | | +----------------------------------> Execute |3            |                |
                                                                                    +----------------+

Copy the code

Mobile phones are as follows:

Now that the background thread architecture is clear, we need to go back and see how the optimizer is implemented in the next article.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

This is enough for you to know about The Pytorch distributed training!

Horovod uses _ Distributed model training with HoroVOd

Spark’s new vision: Make deep learning easier to use

Scaling model training in PyTorch using distributed data parallel

Scaling model training in PyTorch using distributed data parallelism

A developer-friendly guide to mixed precision training with PyTorch

Developer-friendly PyTorch hybrid precision training guide

It’s 2020, why isn’t deep learning 100% on the cloud yet?

By 2020, why can’t we have 100% deep learning in the cloud?

Take you through the Horovod Distributed training framework

Using Horovod in Amazon SageMaker pipeline mode to implement multi-GPU distributed training

Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes

Horovod- Based on the TensorFlow distributed deep learning framework

This article explains the Tensorflow distributed training requirements

Horovod source analysis

Horovod Source Code Analysis (part 1)

MPI, OpenMPI and deep learning

Horovod communication strategy