0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the third in a series, takes you into the world of Horovod, starting with Python, to see what Horovodrun does.

Links to the first two are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

0x01 Background information

First, some background.

1.1 Distributed System

When designing parallel computers, the most straightforward approach is for multiple cells to share a single memory. Shared memory programming has great advantages in data exchange and access, and programs are simpler to write. But there is a big bottleneck in scalability.

Another approach is distributed memory. That is, each cell has a separate memory, and data access between cells is transmitted over the Internet. This architecture is much more portable and extensible, but message delivery becomes a programming challenge.

The combination of these two is the architecture of a distributed shared memory parallel computer, which is the most commonly used architecture today.

1.2 Parallel task communication

Parallel task communication is generally divided into P2P (point-to-point communication) and Collective communication.

  • P2P communication mode has only one sender and one receiver, namely point-to-point communication.
  • Collective Communication Multiple sender Multiple receive.

Collective Communication contains some common primitives

  • broadcast
  • The reduce of allreduce
  • Scatter, scatter the reduce
  • Gather, allgather
  • ring-base collectives
  • ring-allreduce

Traditional Collective communication assumes that the topology, which consists of communication nodes, is a fat tree to achieve the highest communication efficiency. The actual communication topology may be complex and not a fat tree. Therefore, Ring-based Collective communication is generally used.

1.3 the MPI

Message Passing Interface (MPI) is a kind of communication protocol that can support point-to-point and broadcast. There are many libraries to implement the MPI, including Open MPI, Intel MPI and so on.

MPI is a messaging programming model. Messaging means that users must explicitly send and receive messages to exchange data between processors. In this kind of parallel programming, each control flow has its own independent address space, and different control flows cannot directly access each other’s address space, which must be realized through explicit message passing. This programming mode is the main programming mode used by massively parallel processors (MPP) and clusters (Cluster). Messaging program design requires users to decompose problems well, organize data exchange between different control flows, and parallel computing has large granularity, so it is especially suitable for large-scale scalable parallel algorithms.

MPI is a process-based parallel environment. Processes have separate virtual address Spaces and processor schedules, and execute independently of each other. MPI is designed to support cluster systems connected over a network and communicate through messaging, which is the most basic feature of MPI.

1.4 the Open – the MPI

OpenMPI is a high-performance messaging library, originally developed as a converged technology and resource from several other projects (FT-MPI, LA-MPI, LAM/MPI, and PACX-MPI). It is an open source implementation of the MPI-2 standard, developed and maintained by a number of research institutions and enterprises. As a result, OpenMPI is able to draw expertise, industry expertise, and resources from the high performance community to create the best MPI libraries. OpenMPI offers many advantages to system and software vendors, program developers, and researchers. Easy to use and run itself on a wide variety of operating systems, interconnects, and batch/dispatch systems.

1.5 MPI usage Problems

Because MPI is distributed memory programming, later development involves the transfer of information between nodes. Data and programs are often on multiple nodes, so it is necessary to ensure that information is exchanged between nodes when executing commands.

In specific use, there are two problems:

  • How does this multi-machine open-MPi discover and establish connections?
  • In the training process of multi-machine and multi-card, how to establish the transmission ring determines the training efficiency, so how to do open-MPI?

On your first question:

Setting SSH no-password login does not require the user to enter a password. After the private and public keys are generated for each node, authentication is required. In this case, the login to the node can be ensured without password. The public key files of each child node are sent to the primary node and then added to the authentication file of the primary node. In this case, the primary node can log in to each child node without password. Finally, the authentication file is passed back to each child node to ensure that each child node can log in to other nodes without encryption.

When open-mpi starts, you can specify –hostfile or –host to specify the IP address or Hostname on which the task will be run. Open-mpi will attempt to link to the host using SSH keyless and execute a series of commands. It is used to synchronize environment variables, current paths, and start commands.

Of course, users can also send commands to remote machines in other ways, which can be specified by the environment variable OMPI_MCA_plm_rsh_agent.

On your second question:

When all the machines are connected and ready to compute, open-MPI integrates a component called HWLOC in order to communicate most efficiently. This component is mainly used to construct the topology of single hardware resources, and then construct the shortest path communication.

0 x02 entry point

Many machine learning frameworks use the following formula: shell scripts (optional), python and C++.

  • Shell script is the entrance to start and run, responsible for parsing parameters, confirming and calling the training program;
  • Python is the user interface, which introduces the C++ library, encapsulates the API, and is responsible for runtime and low-level C++ interactions;
  • C++ to realize the low-level training logic;

So let’s look at the Hordovodrun script first.

2.1 How To Run

One of the official examples of Hovorod operation is as follows:

horovodrun -np 2 -H localhost:4 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist.py
Copy the code

Where -NP refers to the number of processes, localhost:4 indicates four Gpus on the localhost node.

Note that if the virtual machine has only one core. To enforce parallelism, use the -NP argument, which automatically helps you cut a core into multiple processors, with each distributed processing being a slot.

So, we can start with the command horovodrun.

2.2 horovodrun

Entry files can be from the setup. Py see, it is mapped into horovod. Runner. Launch: run_commandline.

entry_points={
    'console_scripts': [
        'horovodrun = horovod.runner.launch:run_commandline']}Copy the code

So let’s look at run_commandline

2.3 run_commandline

This command is located in: horovod – master/horovod/runner/launch. Py, we extracted important part.

def run_commandline() :
    args = parse_args()
    _run(args)
Copy the code

So we go to the _run function. As you can see, Horovod will choose a different path depending on whether or not it is elastic training. We will first examine inelastic training _run_static in this series.

def _run(args) :
    # if hosts are not specified, either parse from hostfile, or default as
    # localhost
    if not args.hosts and not args.host_discovery_script:
        if args.hostfile:
            args.hosts = hosts.parse_host_files(args.hostfile)
        else:
            # Set hosts to localhost if not specified
            args.hosts = 'localhost:{np}'.format(np=args.np)

    # Convert nics into set
    args.nics = set(args.nics.split(', ')) if args.nics else None

    if _is_elastic(args):
        return _run_elastic(args)
    else:
        return _run_static(args) # Let's look here first
Copy the code

2.4 Inelastic training _run_static

We did the following in _run_static:

  • First parse the parameters to get Settings.
  • Will be calleddriver_service.get_common_interfacesObtain information about network adapters and other hosts, and assign slots according to this information. This part is complicated, and we will explain the details in a special article (next article).
  • Here’s the question: Why do you want information about the relationship between host, slot, and rank? Due to engineering considerations, the role of rank is differentiated in the underlying C++ world: rank 0 is the master and rank n is the worker, so this information needs to be determined and passed to the C++ world.
  • By default, there are no run parameters, so _launch_job is executed to start the training job.

The specific code is as follows:

def _run_static(args) :

    settings = hvd_settings.Settings(verbose=2 if args.verbose else 0,
                                     ssh_port=args.ssh_port,
                                     ssh_identity_file=args.ssh_identity_file,
                                     extra_mpi_args=args.mpi_args,
                                     tcp_flag=args.tcp_flag,
                                     binding_args=args.binding_args,
                                     key=secret.make_secret_key(),
                                     start_timeout=tmout,
                                     num_proc=args.np,
                                     hosts=args.hosts,
                                     output_filename=args.output_filename,
                                     run_func_mode=args.run_func is not None,
                                     nics=args.nics,...)
	  # First parse the parameters and get Settings
    fn_cache = None
    if not args.disable_cache:
        params = ' '
        if args.np:
            params += str(args.np) + ' '
        if args.hosts:
            params += str(args.hosts) + ' '
        if args.ssh_port:
            params += str(args.ssh_port)
        if args.ssh_identity_file:
            params += args.ssh_identity_file
        parameters_hash = hashlib.md5(params.encode('utf-8')).hexdigest()
        fn_cache = cache.Cache(CACHE_FOLDER, CACHE_STALENESS_THRESHOLD_MINUTES,
                               parameters_hash)

    # Obtain information about network adapters and other hosts, and allocate slots according to this information
    all_host_names, _ = hosts.parse_hosts_and_slots(args.hosts)
    remote_host_names = network.filter_local_addresses(all_host_names)

    nics = driver_service.get_common_interfaces(settings, all_host_names,
                                                remote_host_names, fn_cache)

    if args.run_func:
        # get the driver IPv4 address
        driver_ip = network.get_driver_ip(nics)
        run_func_server = KVStoreServer(verbose=settings.verbose) # Start the internal KV server
        run_func_server_port = run_func_server.start_server()
        put_data_into_kvstore(driver_ip, run_func_server_port,
                              'runfunc'.'func', args.run_func) # Store 'func', args.run_func as KV

        command = [sys.executable, '-m'.'horovod.runner.run_task'.str(driver_ip), str(run_func_server_port)]

        try:
            _launch_job(args, settings, nics, command)
            results = [None] * args.np
            for i in range(args.np):
                results[i] = read_data_from_kvstore(driver_ip, run_func_server_port,'runfunc_result'.str(i))
            return results
        finally:
            run_func_server.shutdown_server()
    else:
        command = args.command
        _launch_job(args, settings, nics, command) # We focus here
        return None
Copy the code

The current logic is as follows:

              +-----------+
              |horovodrun |
              +-----+-----+
                    |
                    |
                    v
           +--------+--------+
           | run_commandline |
           +----+------+-----+
                |      |
      +---------+      +--------+
      |                         |
      |                         |
      v                         v
+-----+--------+           +----+--------+
| _run_elastic |           | _run_static |
|              |           |             |
+--------------+           +-------------+
Copy the code

Now that we’ve analyzed the entry to horovod, we’ll look at how to start the Job.

0x03 Running a training Job

3.1 _launch_job

The _launch_job is called based on configuration or installation. We see three possibilities: Gloo, MPI, js.

Jsrun data is hard to find, so let’s focus on gloo and MPI.

def _launch_job(args, settings, nics, command) :
    env = os.environ.copy()
    config_parser.set_env_from_args(env, args)

    def gloo_run_fn() :
        driver_ip = network.get_driver_ip(nics)
        gloo_run(settings, nics, env, driver_ip, command)

    def mpi_run_fn() :
        mpi_run(settings, nics, env, command)

    def js_run_fn() :
        js_run(settings, nics, env, command)

    run_controller(args.use_gloo, gloo_run_fn,
                   args.use_mpi, mpi_run_fn,
                   args.use_jsrun, js_run_fn,
                   args.verbose)
Copy the code

3.2 run_controller

Run_controller is still a mediation function, specifically importing gloo or MPI.

def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
    if use_gloo:
        gloo_run()
    elif use_mpi:
        mpi_run()
    elif use_jsrun:
        js_run()
    else:
        if mpi_built(verbose=verbose):
            if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
                js_run()
            else:
                mpi_run()
        elif gloo_built(verbose=verbose):
            gloo_run()
Copy the code

The current logic is as follows:

              +-----------+
              |horovodrun |
              +-----+-----+
                    |
                    |
                    v
           +--------+--------+
           | run_commandline |
           +----+------+-----+
                |      |
      +---------+      +--------+
      |                         |
      |                         |
      v                         v
+-----+--------+           +----+--------+
| _run_elastic |           | _run_static |
|              |           |             |
+--------------+           +------+------+
                                  |
                                  |
                                  v
                           +------+------+
                           | _launch_job |
                           |             |
                           +------+------+
                                  |
                                  |
                                  v
                        +---------+--------+
                        |  run_controller  |
                        |                  |
                        +----+----+-----+--+
                             |    |     |
               +-------------+    |     +--------+
               |                  |              |
               |                  |              |
               v                  v              v
        +------+---+       +------+----+     +---+-----+
        | gloo_run |       |   mpi_run |     | js_run  |
        |          |       |           |     |         |
        +----------+       +-----------+     +---------+

Copy the code

So we are divided into two branches: Gloo & MPi.

0 x04 Gloo implementation

4.1 introduction of Gloo

Gloo is an mpI-like collective communications library created by Facebook (github.com/facebookinc…

The main characteristics of the collection communication library are: In general, according to the interface provisions provided by MPI, I implemented the related interfaces including point-to-point communication (SEND,RECV, etc.) and collection communication (REDUCE, BROADCAST, ALLREDUCE, etc.), and then made corresponding changes in the underlying implementation according to the needs of my own hardware or system. Ensure the stability and performance of the interface.

Gloo provides an optimized implementation of a collection communication program for cpus and Gpus. It is particularly suitable for gpus because it can perform communication without using GPUDirect to transfer data to the CPU’s memory. It is also capable of using NCCL to perform fast intra-node communication and to perform its own inter-node routine calculations. You don’t need to think about copying memory data, you just need to implement logic.

Gloo supports and optimizes collective Communication. Using the GLOo backend on gpus is faster because data can be exchanged directly between Gpus without going through CPU or memory.

Why did Horovod choose Gloo? In addition to its versatility and performance, I think it is a highlight that it can be redeveloped, such as the Rendezvous feature below, which was used by Horovod for flexibility training (which we will discuss later).

Gloo and MPI both have similar effects:

  • On the one hand, Horovod integrates Gloo-based AllReduce, similar to NCCL, which is used as a gradient protocol.

  • Gloo, on the other hand, can be used to start multiple processes (represented by ranks in Hovorod) for parallel computing;

The details are as follows:

   +-----------------------+   +-----------------------+  +------------------------+
   |  gloo_run      slot 1 |   | gloo_run     slot 2   |  |  gloo_run  slot 3      |
   |                       |   |                       |  |                        |
   | +-------------------+ |   | +------------------+  |  | +------------------+   |
   | | python train.py   | |   | |  python train.py |  |  | | python train.py  |   |
+----+                   +<------+                  +<------+                  +<------+
|  | |                   | |   | |                  |  |  | |                  |   |   |
|  | +-------------------+ |   | +------------------+  |  | +------------------+   |   |
|  |                       |   |                       |  |                        |   |
|  +-----------------------+   +-----------------------+  +------------------------+   |
|                                                                                      |
|                                                                                      |
|                                                                                      |
v-------------------------------------------------------------------------------------->
                                     Ring Allreduce on Gloo
Copy the code

4.2 Rendezvous function

2 Rendezvous concept

In the Gloo document, it says:

The rendezvous process needs to happen exactly once per Gloo context.
It makes participating Gloo processes exchange details for setting up their communication channels. For example, when the TCP transport is used, processes exchange IP address and port number details of listening sockets.

Rendezvous can be executed by accessing a key/value store that is accessible by all participating processes. Every process is responsible for setting a number of keys and will wait until their peers have set their keys. The values stored against these keys hold
the information that is passed to the transport layer.
Copy the code

It roughly means:

Gloo has a Rendezvous process within each Gloo context, which Gloo uses to exchange details needed for communication.

Rendezvous implementations can rely on access to a KVstore. The details are interacted through KVstore.

Take Horovod as an example:

  • Horovod starts a driver process in addition to the worker process when conducting fault-tolerant AllReduce training. This driver process is used to help the worker call gloo to construct the AllReduce communication ring.
  • A RendezvousServer with a KVStore is created in the driver process, and the driver stores information such as the IP addresses of the workers involved in the communication into the KVStore.
  • The worker can then call gloo to access the RendezvousServer to construct the communication ring.

4.2.2 RendezvousServer

As you can see from the following code, rendezvous TtpServer has been initiated (that is, an extension of HTTPServer) :

class RendezvousServer:
    def __init__(self, verbose=0) :
        self._httpd = None
        self._listen_thread = None
        self._verbose = verbose

    # Rendezvous function finds a available port, create http socket,
    # and start listening loop to handle request
    # self.httpd.init needs to be called after server start
    def start(self, handler_cls=RendezvousHandler) : # It will be introduced soon
        self._httpd, port = find_port(
            lambda addr: RendezvousHTTPServer(
                addr, handler_cls, self._verbose))

        # start the listening loop
        self._listen_thread = in_thread(target=self._httpd.serve_forever)

        return port

    def init(self, host_alloc_plan) :
        self._httpd.init(host_alloc_plan)

    def stop(self) :
        self._httpd.shutdown()
        self._listen_thread.join()
Copy the code

Holdings KVStore

KVStore is embodied by the KVStoreHandler, whose rendezvous Shandler inherits, and is then used as the Handler by the RendezvousServer.

The KVStoreHandler lite code is as follows:

class KVStoreHandler(SimpleHTTPRequestHandler) :

    # Override PUT handler
    def do_PUT(self) :
        paths = self.path.split('/')
        _, scope, key = paths

        # Get body length
        content_length = int(self.headers['Content-Length'])
        value = self.rfile.read(content_length)
        self._put_value(scope, key, value)
        self.send_status_code(OK)

    def _put_value(self, scope, key, value) :
        with self.server.cache_lock:
            scope_dict = self.server.cache.setdefault(scope, {})
            scope_dict[key] = value
Copy the code

4.2.4 Underlying usage

How do you use Rendezvous? Briefly:

  • The Python world constructs a RendezvousServer whose address is configured in an environment variable (or otherwise).
  • In the C++ world, examples are horovod/common/gloo/gloo_context.h, horovod/common/gloo/gloo_context.cc. Get the address, port, and so on of the python-configured RendezvousServer, and then build the context required by gloo.
#define HOROVOD_HOSTNAME "HOROVOD_HOSTNAME"
#define HOROVOD_RANK "HOROVOD_RANK"
#define HOROVOD_SIZE "HOROVOD_SIZE"
#define HOROVOD_LOCAL_RANK "HOROVOD_LOCAL_RANK"
#define HOROVOD_LOCAL_SIZE "HOROVOD_LOCAL_SIZE"
#define HOROVOD_CROSS_RANK "HOROVOD_CROSS_RANK"
#define HOROVOD_CROSS_SIZE "HOROVOD_CROSS_SIZE"
#define HOROVOD_ELASTIC "HOROVOD_ELASTIC"

  ctx = Rendezvous(HOROVOD_GLOO_GLOBAL_PREFIX,
                   rendezvous_addr_env, rendezvous_port,
                   rank, size, dev, timeout);

  local_ctx = Rendezvous(HOROVOD_GLOO_LOCAL_PREFIX + hostname,
                         rendezvous_addr_env, rendezvous_port,
                         local_rank, local_size, dev, timeout);

  cross_ctx = Rendezvous(HOROVOD_GLOO_CROSS_PREFIX + std::to_string(local_rank),
                         rendezvous_addr_env, rendezvous_port,
                         cross_rank, cross_size, dev, timeout);
Copy the code

The C++ world will get the IP, port of RendezvousServer from the python world:

          +--------------------->  System Env  +------------------+
          |  addr, port, ...                     addr, port, ...  |
          |                            +                          |
          |                            |                          |
          |                            |                          |
          |                            |                          |
          |                            |                          |
          |                            |                          |
          |    Python                  |              C++         |
          |                            |                          |
          |                            |                          |
          |                            |                          |
          |                            |                          v
+---------+---------------+            |             +------------+--------+
| RendezvousServer        |            |             |GlooContext          |
|                         |            |             |                     |
|                         |            |             |                     |
|                         |            |             |                     |
|    RendezvousHandler    |            |             |      Rendezvous     |
|                         |            |             |                     |
+-------------------------+            |             +---------------------+
                                       |
                                       +
Copy the code

4.3 Horovd’s Gloo entry

Gloo_run is the relevant entry for the GLOo module in the Horovod.

The comments are clear: each thread will use SSH to start a training job on a remote host.

def gloo_run(settings, nics, env, server_ip, command) :
    # Each thread will use ssh command to launch the job on each remote host. If an
    # error occurs in one thread, entire process will be terminated. Otherwise,
    # threads will keep running and ssh session.
    exec_command = _exec_command_fn(settings)
    launch_gloo(command, exec_command, settings, nics, env, server_ip)
Copy the code

To run exec_command, use launch_gloo.

The command argument is like “[‘python’, ‘train.py’]”.

4.4 Building an Executable Environment

The first part of gloo_run is exec_command = _exec_command_fn(Settings), which generates an environment for executing commands based on various configurations. If it is remote, you need to generate the relevant remote runnable command environment (including switching directories, remote execution, and so on).

4.4.1 _exec_command_fn

Specifically, it can be divided into two parts:

  • Use get_remote_command to generate the relevant remote runnable environment, such as in front of a training script'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no';
  • By adjusting input and output, safe_SHELL_exec. Execute is used to realize the safe execution capability.

The details are as follows:

def _exec_command_fn(settings) :
    """ executes the jobs defined by run command on hosts. :param hosts_alloc: list of dict indicating the allocating info. For example, [{'Hostname':'worker-0', 'Rank': 0, 'Local_rank': 0, 'Cross_rank':0, 'Size':2, 'Local_size':1, 'Cross_size':2}, {'Hostname':'worker-1', 'Rank': 1, 'Local_rank': 0, 'Cross_rank':1, 'Size':2, 'Local_size':1, 'Cross_size':2} ] :type hosts_alloc: list(dict) :param remote_host_names: names that are resolved to one of the addresses of remote hosts interfaces. :param _run_command: command to execute """
    def _exec_command(command, slot_info, events) :
        index = slot_info.rank
        host_name = slot_info.hostname
        host_address = network.resolve_host_address(host_name)
        local_addresses = network.get_local_host_addresses()
        # Need to build remote commands
        if host_address not in local_addresses:
            local_command = quote('cd {pwd} > /dev/null 2>&1 ; {command}'
                                  .format(pwd=os.getcwd(), command=command))
            command = get_remote_command(local_command,
                                         host=host_name,
                                         port=settings.ssh_port,
                                         identity_file=settings.ssh_identity_file)

        # Redirect output if requested
        # Adjust input and output, using safe_shell_exec. Execute to achieve safe execution capability
        stdout = stderr = None
        stdout_file = stderr_file = None
        if settings.output_filename:
            padded_rank = _pad_rank(index, settings.num_proc)
            output_dir_rank = os.path.join(settings.output_filename, 'rank.{rank}'.format(rank=padded_rank))
            if not os.path.exists(output_dir_rank):
                os.mkdir(output_dir_rank)

            stdout_file = open(os.path.join(output_dir_rank, 'stdout'), 'w')
            stderr_file = open(os.path.join(output_dir_rank, 'stderr'), 'w')

            stdout = MultiFile([sys.stdout, stdout_file])
            stderr = MultiFile([sys.stderr, stderr_file])

            # Implement security execution capability
            exit_code = safe_shell_exec.execute(command, 
                                                index=index,
                                                stdout=stdout,
                                                stderr=stderr,
                                                events=events,...)

        return exit_code, time.time()

    return _exec_command
Copy the code

4.4.2 get_remote_command

This function gets how to run on a remote host. This function is a relatively new addition and is related to the KubeFlow mpi operator, which will be analyzed later.

SSH_COMMAND_PREFIX = 'ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no'

def get_ssh_command(local_command, host, port=None, identity_file=None, timeout_s=None) :
    port_arg = f'-p {port}' if port is not None else ' '
    identity_file_arg = f'-i {identity_file}' if identity_file is not None else ' '
    timeout_arg = f'-o ConnectTimeout={timeout_s}' if timeout_s is not None else ' '
    return f'{SSH_COMMAND_PREFIX} {host} {port_arg} {identity_file_arg} {timeout_arg} {local_command}'


def get_remote_command(local_command, host, port=None, identity_file=None, timeout_s=None) :
    return f'{env_util.KUBEFLOW_MPI_EXEC} {host} {local_command}' if env_util.is_kubeflow_mpi() \
        else get_ssh_command(local_command, host, port, identity_file, timeout_s)
Copy the code

The logic is as follows:

command : python train.py + | | v +---------+-------------+ | | | get_remote_command | | | +---------+-------------+ | | v ssh -o . python train.py + | | | v +---------+--------------+ |safe_shell_exec.execute | | | +------------------------+Copy the code

4.5 Using gloo to Execute commands

Once you have the executable environment exec_command and the command command, you can use gloo to execute the command.

Each command is executed by exec_command.

Launch_gloo to get orders, all kinds of configuration information, network card information (nics, such as {} ‘lo’), the host information, etc., and then began to run, is to start running our training code, concrete is:

  • Establish RendezvousServer, which will be used by the underlying Gloo C++ environment;
  • Host_alloc_plan = get_host_assignments allocates slots based on host, i.e. which rank of horovod should run on which slot on which host;
  • Get_run_command Obtains an executable command.
  • Slot_info_to_command_fn to get the slot commands that can be executed on the slot;
  • Build the args_list based on slot_info_to_command_fn. Each ARG in the list is a slot command.
  • Multithreaded execution, executing each ARG (slot Command) on top of each exec_command;

The code is as follows:

def launch_gloo(command, exec_command, settings, nics, env, server_ip) :
    """ Launches the given command multiple times using gloo. Each command is launched via exec_command. :param command: command to launch :param exec_command: means to execute a single command :param settings: settings for the distribution :param nics: common interfaces :param env: environment to use :param server_ip: ip to use for rendezvous server """
    # Make the output directory if it does not exist
    if settings.output_filename:
        _mkdir_p(settings.output_filename)

    # start global rendezvous server and get port that it is listening on
    Set up RendezvousServer, which will be used by the underlying Gloo C++ environment
    rendezvous = RendezvousServer(settings.verbose)

    # allocate processes into slots
    Horovod: horovod: horovod: Horovod: Horovod: Horovod: Horovod: Horovod: Horovod
    hosts = parse_hosts(settings.hosts)
    host_alloc_plan = get_host_assignments(hosts, settings.num_proc)

    # start global rendezvous server and get port that it is listening on
    global_rendezv_port = rendezvous.start()
    rendezvous.init(host_alloc_plan)
    # Obtain the executable command
    run_command = get_run_command(command, server_ip, nics, global_rendezv_port)

    # Get a slot command that can be executed on the slot
    slot_info_to_command = _slot_info_to_command_fn(run_command, env)
    event = register_shutdown_event()
    # Build args_list based on slot_info_to_command_fn, where each ARG is a slot command
    args_list = [[slot_info_to_command(slot_info), slot_info, [event]]
                 for slot_info in host_alloc_plan]

    # If an error occurs in one thread, entire process will be terminated.
    # Otherwise, threads will keep running.
    # Multi-threaded execution, executing each ARG (slot command) on each exec_command
    res = threads.execute_function_multithreaded(exec_command,
                                                 args_list,
                                                 block_until_all_done=True)

    for name, value in sorted(res.items(), key=lambda item: item[1] [1]):
        exit_code, timestamp = value
Copy the code

4.5.1 Slot Allocation Scheme

As mentioned above that Horovod performs tasks on top of slots, we need to look at how the slots are allocated.

4.5.1.1 Parsing from input Parameters

As you can see from the following code, the slot is automatically resolved using parse_hosts.

def parse_hosts(hosts_string) :
    """Parse a string of comma-separated hostname:slots mappings into a list of HostItem objects. :param hosts_string: list of addresses and number of processes on each host. For example: - 'worker - 0-2, worker - 1:2' - '10.11.11.11:4,10.11. 11.12:4: return: a list of HostInfo objects describing host to slot mappings :rtype: list[HostInfo] """
    return [HostInfo.from_string(host_string) for host_string in hosts_string.split(', ')]
Copy the code

The information about hostinfo.from_string is as follows:

class HostInfo:
    def __init__(self, hostname, slots) :
        self.hostname = hostname
        self.slots = slots

    @staticmethod
    def from_string(host_string) :
        hostname, slots = host_string.strip().split(':')
        return HostInfo(hostname, int(slots))
Copy the code
4.5.1.2 Allocation Scheme

Get_host_assignments will assign the processes in the Horovod according to host and process vacancies, i.e. giving the corresponding relationship between a Horovod rank and slot. If you set np, you have a number of slots.

The given allocation scheme is similar to the following, so that we know which rank corresponds to which slot on which host:

[
  SlotInfo(hostname='h1', rank=0, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),
	SlotInfo(hostname='h2', rank=1, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),]Copy the code

The code is as follows:

def get_host_assignments(hosts, min_np, max_np=None) :
    """Assign hosts with process capacities (slots) to ranks in the Horovod process. This function will try to allocate as many as possible processes on the same host to leverage local network. :param hosts: list of HostInfo objects describing host and slot capacity :type hosts: list[HostInfo] :param min_np: minimum number of processes to be allocated :param max_np: (optional) maximum number of processes to be allocated :return: a list of the allocation of process on hosts in a `SlotInfo` object. :rtype: list[SlotInfo] """
    host_ranks = []
    cross_ranks = collections.defaultdict(dict)
    rank = 0
    # Build rank, local rank, cross Rank (hierarchical AllReduce needed) based on hosts information
    for host_info in hosts:
        ranks = []
        for local_rank in range(host_info.slots):
            if rank == max_np:
                break

            ranks.append(rank)
            rank += 1

            cross_ranks_at_local = cross_ranks[local_rank]
            cross_ranks_at_local[host_info.hostname] = len(cross_ranks_at_local)

        host_ranks.append((host_info, ranks))

    world_size = rank

    Horovod rank = horovod rank = horovod slot Return an alloc_list, each SlotInfo containing various rank information
    alloc_list = []
    for host_info, ranks in host_ranks:
        local_size = len(ranks)
        for local_rank, rank in enumerate(ranks):
            cross_ranks_at_local = cross_ranks[local_rank]
            cross_rank = cross_ranks_at_local[host_info.hostname]
            cross_size = len(cross_ranks_at_local)

            alloc_list.append(
                SlotInfo(
                    hostname=host_info.hostname,
                    rank=rank,
                    local_rank=local_rank,
                    cross_rank=cross_rank,
                    size=world_size,
                    local_size=local_size,
                    cross_size=cross_size))

    return alloc_list
Copy the code

4.5.2 Obtaining the run command

Get_run_command takes the Gloo variable from the environment variable and adds it to the command. After this step, you get a command similar to the following:

HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
Copy the code

This can be abbreviated to: {horovod_gloo_env} command.

The code is:

def create_run_env_vars(server_ip, nics, port, elastic=False) :
    Get Gloo variables from environment variables
    run_envs = {
        'HOROVOD_GLOO_RENDEZVOUS_ADDR': server_ip,
        'HOROVOD_GLOO_RENDEZVOUS_PORT': port,
        'HOROVOD_CONTROLLER': "gloo".'HOROVOD_CPU_OPERATIONS': "gloo".'HOROVOD_GLOO_IFACE': list(nics)[0].# TODO: add multiple ifaces in future
        'NCCL_SOCKET_IFNAME': ', '.join(nics),
    }
    if elastic:
        run_envs["HOROVOD_ELASTIC"] = "1"
    return run_envs

def get_run_command(command, server_ip, nics, port, elastic=False) :
    env_vars = create_run_env_vars(server_ip, nics, port, elastic)
    env_string = "".join(
        [f"{k}={str(v)}" for k, v in env_vars.items()])
    run_command = (
        '{env_string} '
        '{command}'  # expect a lot of environment variables
        .format(env_string=env_string,
                command=' '.join(quote(par) for par in command)))
    return run_command
Copy the code

4.5.3 Running the command to obtain the slot

Once the command is given, the horovod env and env, and slot allocations are further modified to suit gloo running. These are commands that can be run on specific slots.

This format can be abbreviated as {horovod_gloo_env} {horovod_rendezvous _env} {env} run_command.

After this step, you get something like this:

HOROVOD_HOSTNAME=1.11.1. HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1 
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo 
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
Copy the code

The specific code is as follows:

def _slot_info_to_command_fn(run_command, env) :
    # TODO: Workaround for over-buffered outputs. Investigate how mpirun avoids this problem.
    env = copy.copy(env)  # copy env so we do not leak env modifications
    env['PYTHONUNBUFFERED'] = '1'

    def slot_info_to_command(slot_info) :
        """ Given a slot_info, creates a command used by gloo to launch a single job. :param slot_info: host and slot to execute the run command on :return: """
        env_vars = create_slot_env_vars(slot_info)
        horovod_rendez_env = "".join(
            [f"{k}={str(v)}" for k, v in env_vars.items()])

        return '{horovod_env} {env} {run_command}' .format(
            horovod_env=horovod_rendez_env,
            env=' '.join(['%s=%s' % (key, quote(value)) for key, value in env.items()
                          if env_util.is_exportable(key)]),
            run_command=run_command)

    return slot_info_to_command
Copy the code

4.5.4 Multithreaded Invocation Commands

This is what starts the multithreading to make the call. The gloo_run comment makes it clear: when execute_function_multithreaded is called, each thread will use SSH to start the training job on the remote host.

Remember earlier in “Building an Executable Environment” : Use get_remote_command to generate the relevant remote runnable environment, such as’ SSH -o PasswordAuthentication=no -o StrictHostKeyChecking=no’ in front of the training script. So you understand how to do it on the remote side.

Run locally, the command is roughly:

cd /code directory > /dev/null 2> &1
HOROVOD_HOSTNAME=1.11.1. HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1 
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo 
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
Copy the code

If you run the command remotely, you need to add SSH information, which is roughly as follows:

ssh -o PasswordAuthentication=no -o StrictHostKeyChecking=no 1.11.1.
cd /code directory > /dev/null 2> &1
HOROVOD_HOSTNAME=1.11.1. HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1 
SHELL=/bin/bash PATH=XXXX USER=xxx PWD=xxx SSH_CONNECTION="1.1.1.1 11 2.2.2.2 22" HOME=xxx SSH_CLIENZT=xxxx
HOROVOD_GLOO_IFACE=lo NCCL_SOCKET_IFNAME=lo 
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
Copy the code

Execute_function_multithreaded:

  • fnThat’s the program runtime environment that I mentioned earlierexec_command.
  • fn(*arg[:-1])Is in theexec_commandIn the runslot_info_to_command.
def execute_function_multithreaded(fn,
                                   args_list,
                                   block_until_all_done=True,
                                   max_concurrent_executions=1000) :
    """ Executes fn in multiple threads each with one set of the args in the args_list. :param fn: function to be executed :type fn: :param args_list: :type args_list: list(list) :param block_until_all_done: if is True, function will block until all the threads are done and will return the results of each thread's execution. :type block_until_all_done: bool :param max_concurrent_executions: :type max_concurrent_executions: int :return: If block_until_all_done is False, returns None. If block_until_all_done is True, function returns the dict of results. { index: execution result of fn with args_list[index] } :rtype: dict """
    result_queue = queue.Queue()
    worker_queue = queue.Queue()

    for i, arg in enumerate(args_list):
        arg.append(i)
        worker_queue.put(arg)

    def fn_execute() :
        while True:
            try:
                arg = worker_queue.get(block=False)
            except queue.Empty:
                return
            exec_index = arg[-1]
            # fn is the program runtime environment exec_command
            # fn(*arg[:-1]) runs slot_info_to_command in exec_command
            res = fn(*arg[:-1]) 
            result_queue.put((exec_index, res))

    threads = []
    number_of_threads = min(max_concurrent_executions, len(args_list))

    # Execute fn_execute in multiple threads
    for _ in range(number_of_threads):
        thread = in_thread(target=fn_execute, daemon=not block_until_all_done)
        threads.append(thread)

    # Returns the results only if block_until_all_done is set.
    If set, block waits
    results = None
    if block_until_all_done:

        # Because join() cannot be interrupted by signal, a single join()
        # needs to be separated into join()s with timeout in a while loop.
        have_alive_child = True
        while have_alive_child:
            have_alive_child = False
            for t in threads:
                t.join(0.1)
                if t.is_alive():
                    have_alive_child = True

        results = {}
        while not result_queue.empty():
            item = result_queue.get()
            results[item[0]] = item[1]

    return results
Copy the code

Python train.py goes into our training code.

As you can see, after combining various information, you build a result that can be executed, and then execute it on multiple hosts:

  • On the left side of the figure, information such as host is obtained from parameters, and then the slot information is parsed out.
  • On the right, python train.py is a command to run, generating an executable command environment based on various configurations. If it is remote, you need to generate the relevant remote runnable command environment (including switching directories, remote execution, etc.);
  • In the middle of the figure, the command to run is python train.py, with env information added, and gloo information added. Then, after combining the slot information on the left with the executable command environment on the right, you get a command that can run on multiple threads, and therefore run in multiple slots.
args : '10.11.11.11:4,10.11. 11.12:4'python train.py command : python train.py + + + | | | | | | v v v +----------+--------+ +----------+----------+ +---------+-------------+ | parse_hosts | | get_run_command | | | +----------+--------+ | | | get_remote_command | | +----------+----------+ | | | |  +---------+-------------+ v | | +------------+-----------+ v | | get_host_assignments | v | | gloo python train.py +------------+-----------+ + ssh -o ... python train.py | | + | | | v | | | | SlotInfo(hostname='h2', rank=1) v v + +-----------+---------------+ +---------+--------------+ | | _slot_info_to_command_fn | |safe_shell_exec.execute  | +-----------------------> | | | | +-----------+---------------+ +---------+--------------+ | | | | v | | HOROVOD_CONTROLLER=gloo python train.py | + | | | | | v | +-------------+-------------------+ | | | | | execute_function_multithreaded | <---------------+ | | +---------------------------------+Copy the code

Mobile phones are as follows:

4.6 c + +, for example,

Let’s give you the underlying code to see what Gloo can do.

So this is the one in Horovod where Rank 0 eventually sends a built Tensor to the other ranks.

void GlooController::SendFinalTensors(ResponseList& response_list) {
  // Notify all nodes which tensors we'd like to reduce at this step.
  std::string encoded_response;
  ResponseList::SerializeToString(response_list, encoded_response);

  // Boardcast the response length
  int encoded_response_length = (int)encoded_response.length() + 1;
  {
    gloo::BroadcastOptions opts(gloo_context_.ctx);
    opts.setOutput(&encoded_response_length, 1);
    opts.setRoot(RANK_ZERO);
    gloo::broadcast(opts); // Broadcast to other rank
  }

  // Boardcast the response
  {
    gloo::BroadcastOptions opts(gloo_context_.ctx);
    opts.setOutput((uint8_t*)(encoded_response.c_str()),
                   encoded_response_length);
    opts.setRoot(RANK_ZERO);
    gloo::broadcast(opts); // Broadcast to other rank}}Copy the code

0 x05 Mpi implementation

5.1 openmpi libraries

Horovod relies heavily on OpenMPi here.

  • Message Passing Interface (MPI) : Message Passing Interface (MPI) is a cross-language communication protocol for writing parallel computers. Support for peer-to-peer and broadcast. MPI is a messaging application interface that includes protocol and semantics that indicate how it plays out in various implementations. MPI’s goals are high performance, scale, and portability.
  • OpenMPI: Open Message Passing Interface. OpenMPI is an implementation of MPI, a library project.

MPI’s role in Hovorod is unique:

  • Horovod integrates MMPI-based AllReduce, similar to NCCL, which is used as a gradient protocol.

  • On the other hand, MPI can be used to start multiple processes (represented by ranks in Hovorod) on all machines for parallel computing;

5.2 mpi_run function

This part of the code is located in: horovod/runner/mpi_run.py.

First, the key code is excerpched as follows. You can see that its core is to run the mpirun command.

# I am the key code in the following large section of code!
mpirun_command = (
        'mpirun {basic_args} '
        '-np {num_proc}{ppn_arg}{hosts_arg} '
        '{binding_args} '
        '{mpi_args} '
        '{mpi_ssh_args} '
        '{tcp_intf_arg} '
        '{nccl_socket_intf_arg} '
        '{output_filename_arg} '
        '{env} {extra_mpi_args} {command}'  
        .format(basic_args=basic_args,
                num_proc=settings.num_proc,
                ppn_arg=ppn_arg,
                hosts_arg=hosts_arg,
                binding_args=binding_args,
                mpi_args=' '.join(mpi_impl_flags),
                tcp_intf_arg=tcp_intf_arg,
                nccl_socket_intf_arg=nccl_socket_intf_arg,
                mpi_ssh_args=mpi_ssh_args,
                output_filename_arg=' '.join(output),
                env=env_list,
                extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else ' ',
                command=' '.join(quote(par) for par in command))
    )

    # Execute the mpirun command.
    if settings.run_func_mode:
        exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
    else:
        os.execve('/bin/sh'['/bin/sh'.'-c', mpirun_command], env)
Copy the code

All parameters of the mpirun command are built based on various configurations and parameters, such as SSH parameters, mpI parameters, NCCL parameters, and so on.

The following is an example of the mpirun command:

mpirun --allow-run-as-root --np 2 -bind-to none -map-by slot \
    -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
    -mca pml ob1 -mca btl ^openib \
    python train.py
Copy the code

The specific code is as follows:

# The above code is a snippet of me
def mpi_run(settings, nics, env, command, stdout=None, stderr=None) :
    """ Runs mpi_run. Args: settings: Settings for running MPI. Note: settings.num_proc and settings.hosts must not be None. nics: Interfaces to include by MPI. env: Environment dictionary to use for running command. command: Command and arguments to run as a list of string. stdout: Stdout of the mpi process. Only used when settings.run_func_mode is True. stderr: Stderr of the mpi process. Only used when settings.run_func_mode is True. """
    # Get various configurations
    mpi_impl_flags, impl_binding_args, mpi = _get_mpi_implementation_flags(settings.tcp_flag, env=env)
    impi = _IMPI_IMPL == mpi

    # Process SSH parameters
    ssh_args = []
    if settings.ssh_port:
        ssh_args += [f'-p {settings.ssh_port}']
    if settings.ssh_identity_file:
        ssh_args += [f'-i {settings.ssh_identity_file}']

    mpi_ssh_args = ' '
    if ssh_args:
        joined_ssh_args = ' '.join(ssh_args)
        mpi_ssh_args = f'-bootstrap=ssh -bootstrap-exec-args \"{joined_ssh_args}\ "' if impi else f'-mca plm_rsh_args \"{joined_ssh_args}\ "'

    # Process network configuration, network card information, etc
    tcp_intf_arg = '-mca btl_tcp_if_include {nics}'.format(
        nics=', '.join(nics)) if nics and not impi else ' '
    nccl_socket_intf_arg = '-{opt} NCCL_SOCKET_IFNAME={nics}'.format(
        opt='genv' if impi else 'x',
        nics=', '.join(nics)) if nics else ' '

    # Process host information
    # On large cluster runs (e.g. Summit), we need extra settings to work around OpenMPI issues
    host_names, host_to_slots = hosts.parse_hosts_and_slots(settings.hosts)
    if not impi and host_names and len(host_names) >= _LARGE_CLUSTER_THRESHOLD:
        mpi_impl_flags.append('-mca plm_rsh_no_tree_spawn true')
        mpi_impl_flags.append('-mca plm_rsh_num_concurrent {}'.format(len(host_names)))

    # if user does not specify any hosts, mpirun by default uses local host.
    # There is no need to specify localhost.
    hosts_arg = '-{opt} {hosts}'.format(opt='hosts' if impi else 'H',
                hosts=', '.join(host_names) if host_names and impi else settings.hosts)

    # Handle PPN configuration
    ppn_arg = ' '
    if host_to_slots and impi:
        ppn = host_to_slots[host_names[0]]
        for h_name in host_names[1:]:
        ppn_arg = ' -ppn {} '.format(ppn)

    # Handle timeout configuration
    if settings.prefix_output_with_timestamp and not impi:
        mpi_impl_flags.append('--timestamp-output')

    binding_args = settings.binding_args if settings.binding_args and not impi else ' '.join(impl_binding_args)

    The configuration needs to be run as root
    basic_args = '-l' if impi else '--allow-run-as-root --tag-output'

    output = []
    if settings.output_filename:
        output.append('-outfile-pattern' if impi else '--output-filename')
        output.append(settings.output_filename)

    # Build environment information list
    env_list = ' ' if impi else ' '.join(
                    '-x %s' % key for key in sorted(env.keys()) if env_util.is_exportable(key))

    # Build the final MPI command
    # Pass all the env variables to the mpirun command.
    mpirun_command = (
        'mpirun {basic_args} '
        '-np {num_proc}{ppn_arg}{hosts_arg} '
        '{binding_args} '
        '{mpi_args} '
        '{mpi_ssh_args} '
        '{tcp_intf_arg} '
        '{nccl_socket_intf_arg} '
        '{output_filename_arg} '
        '{env} {extra_mpi_args} {command}'  # expect a lot of environment variables
        .format(basic_args=basic_args,
                num_proc=settings.num_proc,
                ppn_arg=ppn_arg,
                hosts_arg=hosts_arg,
                binding_args=binding_args,
                mpi_args=' '.join(mpi_impl_flags),
                tcp_intf_arg=tcp_intf_arg,
                nccl_socket_intf_arg=nccl_socket_intf_arg,
                mpi_ssh_args=mpi_ssh_args,
                output_filename_arg=' '.join(output),
                env=env_list,
                extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else ' ',
                command=' '.join(quote(par) for par in command))
    )

    # we need the driver's PATH and PYTHONPATH in env to run mpirun,
    # env for mpirun is different to env encoded in mpirun_command
    for var in ['PATH'.'PYTHONPATH'] :if var not in env and var in os.environ:
            # copy env so we do not leak env modifications
            env = copy.copy(env)
            # copy var over from os.environ
            env[var] = os.environ[var]

    # Execute the mpirun command.
    if settings.run_func_mode:
        exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
    else:
        os.execve('/bin/sh'['/bin/sh'.'-c', mpirun_command], env)
Copy the code

5.3 mpirun command

Since mpi_run is run using the mpirun command, let’s take a look.

Mpirun is the startup script of MPI program, which simplifies the startup process of parallel process and shields the low-level implementation details as much as possible, so as to provide users with a general MPI parallel mechanism.

When running a parallel program with the mpirun command, the -np argument specifies the number of processes to run in parallel. Mpirun first start a process on the local node, and then according to the/usr/local/share/those LINUX files listed in the host, to start a process for each host. If the number of processes is greater than the number of parallel nodes available, the extra processes will repeat the above rule. After the process is allocated according to this mechanism, each node is generally assigned a fixed label, similar to an ID card, which will be used in the subsequent messaging.

One thing to note here is that the actual running

Orterun (Open MPI SPMD/MPMD initiator; Mpirun/mpiexec is just a symbolic link to it)

The following command is an example:

mpirun -np 4 \
    -bind-to none -map-by slot \
    -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
    -mca pml ob1 -mca btl ^openib \
    python train.py
Copy the code

0 x06 summary

Comparing the gloo and MPI implementations, we can still see the difference.

6.1 gloo

Gloo is just a library that needs Horovod for command distribution.

Gloo requires horovod to implement both locally and remotely, That is, the get_remote_command function implements’ SSH -o PasswordAuthentication=no -o StrictHostKeyChecking=no’.

Gloo needs to implement RendezvousServer, which the underlying layer uses to communicate.

6.2 the mpi

Mpi is much more powerful. As long as the commands are configured to be wrapped by mpirun, OpenMPI can distribute and execute commands by itself. At the end of the day, Horovod is an MPirun program, and even with Tensor Flow, it’s an MPI program that can interact with each other.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Horovod Flexibility training

MPI, OpenMPI and deep learning

Introduction to MPI and parallel computing (part 1) : a preliminary knowledge of parallel computing

Set up an MPI cluster environment

Horovod- Based on the TensorFlow distributed deep learning framework

Horovod source analysis

How to understand Nvidia multi-GPU multi-card communication framework NCCL?