0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This is the tenth article in a series to see how Horovod runs on Spark.

Horovod on Spark has two underlying implementations: MPI and GLOO. Due to space constraints, this article covers the MPI implementation, and the next one covers the GLOO implementation.

The other articles in this series are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

0 x01 review

1.1 Overall sequence diagram

To begin with, let’s review the overall sequence diagram of Horovod on Spark, so that we have a big picture in mind.

1.2 Overall Logic

Overall, the overall logic of Horovod on Spark is divided into the following phases:

  • Start SparkDriverService and use _make_SPARk_thread to start the Spark Task. Then horovod waits for the start to finish.
  • Multithreading starts spark Tasks in the Spark Executor, and each task runs a SparkTaskService. SparkTaskService registers SparkDriverTask in the hovorod main process and waits for the next command to start.
  • Horovod notifying each task to proceed to the next stage after receiving the completion of all tasks.
  • Horovod callmpi_run(Use againmpirun_rsh.pyStart the MPI cluster by starting orted (in this case SparkTaskService) on each Spark Executor;
  • Orted runs training code on top of each executor;

Having already analyzed the first three stages, this article continues the analysis of the next two.

1.3 the problem

In combination with the above flow chart, there is one problem that can be confusing.

Horovod is supposed to call mpirun directly to remotely start Orted, which is the MPI executable. Mpirun is an alias for Orterun, and ortedRun will eventually call orted). But why not call orted directly on the flowchart, instead of mpirun_rsh.py, and then start orted with SparkTaskService?

The reason should be:

  • Usually, MPI connects to hosts over SSH, but this method does not enable Python functions in the Spark Executor.
  • Orted needs to run in the Spark Executor, but mpirun does not know the combination of the Spark Executor IP and PORT when it starts, so it cannot be started directly.
  • So MPI uses RPC to start the user code:
    • The IP: PORT combination information can be obtained only through the interaction between SparkDriverService and SparkTaskService.
    • usehorovod.spark.driver.mpirun_rshTo connect each Executor, and then “remote Shell “onto the executors.
    • Start Orted directly with SparkTaskService.

0x02 Stage 4: Start the Job

Let’s look at the fourth stage, which is how to run a training job.

2.1 _launch_job

The _launch_job is simple:

  • First of all,driver.get_common_interfacesObtain network routing information;
  • Next, call run_contoller to start the job;
def _launch_job(use_mpi, use_gloo, settings, driver, env, stdout=None, stderr=None) :
    nics = driver.get_common_interfaces()
    run_controller(use_gloo, lambda: gloo_run(settings, nics, driver, env, stdout, stderr),
                   use_mpi, lambda: mpi_run(settings, nics, driver, env, stdout, stderr),
                   False.lambda: None,
                   settings.verbose)
Copy the code

2.2 Obtaining Routing Information

Get_common_interfaces are different from get_common_interfaces in normal mode. At this time, the SparkTaskService information in the Spark Executor has been saved in the Driver. You can directly obtain the information.

def get_common_interfaces(self) :
    if self._nics is not None:
        return self._nics

    nics = None
    if len(self._task_addresses_for_tasks) > 0:
        # in Elastic Horovod on Spark with auto-scaling
        # keys in task_addresses are in range(max_np or proc_num)
        # but not all keys may exist, so we don't do for index in range(proc_num)
        indices = list(self._task_addresses_for_tasks.keys())
        nics = set(self._task_addresses_for_tasks[indices[0]].keys()) # Direct access
        for index in indices[1:]:
            nics.intersection_update(self._task_addresses_for_tasks[index].keys())

    return nics
Copy the code

2.3 run_controller

It is based on configuration and compilation to process, choose gloo, JS, or MPI.

def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
    if use_gloo:
        gloo_run()
    elif use_mpi:
        mpi_run()
    elif use_jsrun:
        js_run()
    else:
        if mpi_built(verbose=verbose):
            if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
                js_run()
            else:
                mpi_run()
        elif gloo_built(verbose=verbose):
            gloo_run()

Copy the code

Therefore, we started to start the job. Specifically, we divided it into MPI and GLOO for analysis.

0 x03 MPI experiment

We are going to do some MPI experiments first because:

  • There are some strange looking behaviors in the MPI call, or some tricks.
  • These tricks are helpful for the MPI-based implementation of “Horovod on Spark”, but are a huge distraction from understanding the code.
  • We don’t have the time or energy to look at how the MPI source code is implemented because it’s beyond the scope of this article.

So we can only make some assumptions and estimates about MPI implementation mechanisms for some strange behaviors. Then we tested our hypothesis with a simple experiment.

3.1 points

We execute the following mpi command format, which is designed to simulate the mpI of Horovod:

mpirun --allow-run-as-root -n 4 --hostfile ./remote_hosts -mca plm_rsh_agent "python rsh.py" python user_function.py
Copy the code

Here’s the problem:

  • What does plm_rsh_agent “python rsh.py” do?
  • What are the tricks in rsh.py? How to call a remote MPI program?
  • Does python user_function.py run after rsh.py?

3.2 Interpretation of Nouns

3.2.1 orterun & orted

At first, when you see this command, it’s very confusing. Because it’s not mentioned in the code.

In fact, outed is an MPI executable.

Mpirun is an alias for Orterun, and ortedRun will eventually call orted.

The detailed explanation is as follows, the information source is cn.voidcc.com/question/p-…

Mpirun and mpiexec are essentially the same – the name of the process initiator in many MPI implementations. The MPI standard does not mention how to start and control levels, but it recommends (though does not require) that, if there is any type of initiator, it should be named MPIEXEC. Some MPI implementations start with mpirun and then use MPIEXEC for compatibility. Other implementations do the opposite. Finally, most implementations use two names to provide their initiators. In practice, there should be no difference between what Mpirun and Mpiexec do.

Different MPI implementations have different ways of starting and controlling the process. MPICH starts with an infrastructure called MPD (multi-purpose daemon or otherwise). Then switch to the new Hydra Process Manager. Because the functions of Hydra are different from those of MPD, the command line parameters of Mpiexec based on Hydra are different from those of MPD, and the user can explicitly choose the command line parameters based on Hydra, so it can be used as mpiexec. The old one is called mpiexec.mpd. There might be an MpIC-based MPI library that only provides the Hydra launcher, and then mpiexec and mpiexec. Hydra would be the same executable. Intel MPI is based on MPICH, and its new version uses the Hydra process manager.

Open MPI is built on the Open Running Environment (ORTE) and its own process initiator is called Orterun. For compatibility, Orterun is also symlinked to mpirun and mpiexec.

Conclusion:

  • mpiexec.somethingIs a specific version of a given implementation that the MPI process started
  • mpiexecandmpirunIs the symbolic link of the generic name to the actual launch usual copy or
  • allmpiexecandmpirunIt should be done
  • Some implementations name their emittersmpiexecSome people name itmpirun, someone has named it both, which is often a source of confusion when multiple MPI implementations are available in the system path simultaneously (for example, when installing from a distribution)

3.2.2 MPi Orterun source code

Mpi orterun source code is as follows, the most important is to call orte_submit_job to submit the job.

int orterun(int argc, char *argv[])
{
    orte_submit_status_t launchst, completest;

    /* orte_submit_init() will also check if the user is running as root (and may issue a warning/exit). */
    if(ORTE_SUCCESS ! =orte_submit_init(argc, argv, NULL)) {
        exit(1);
    }

    /* setup to listen for commands sent specifically to me, even though I would probably * be the one sending them! Unfortunately, since I am a participating daemon, * there are times I need to send a command to "all daemons", and that means *I* have * to receive it too */
    orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON,
                            ORTE_RML_PERSISTENT, orte_daemon_recv, NULL);

    /* if the user just wants us to terminate a DVM, then do so */
    if (orte_cmd_options.terminate_dvm) {
        // Omit part of the code
    } else {
        /* spawn the job and its daemons */
        memset(&launchst, 0.sizeof(launchst));
        memset(&completest, 0.sizeof(completest));
        launchst.active = true;
        completest.active = true;
      
        // Submit the job here
        if(ORTE_SUCCESS ! =orte_submit_job(argv, NULL,
                                            launched, &launchst,
                                            completed, &completest)) {
            ORTE_UPDATE_EXIT_STATUS(1);
            gotoDONE; }}// wait for response and unpack the status, jobid
    // Omit part of the code
}
Copy the code

3.3 Experimental Design

3.3.1 components

There are several components whose functions are as follows:

  • The host file. Specifies which host to run this time and how many MPI processes are running on the host.
  • RSH. Py. It functions as the RSH agent to issue commands to the remote server.
    • MPI users can also send commands to remote machines in other ways.
    • Users can use a remote shell (sshrsh) without logging in to the host. By default,mpirunusessh.
    • ifmpirunusesshProblems can be tried inmpirunCommand--mca plm_rsh_agent rshOption to usershCommand to connect.
  • User_function. Py. Is the function that the user wants to execute.

3.3.2 Host file remote_hosts

The contents of the remote_hosts file are as follows:

1.11.1.:2
2.22.2.:2
Copy the code

The meaning is:

  • 1.1.1.1 This IP address runs two slots, that is, two MPI processes.
  • 2.2.2.2 This IP address runs two slots, namely, two MPI processes.

3.3.3 RSH. Py

Rsh.py prints the incoming commands from the MPI and runs the new commands in the MPI process started on the remote host:

import os
import sys
import subprocess

if __name__ == '__main__':
  command = "".join(sys.argv[0:)print(command)
  new_command = "".join(sys.argv[2:)print(new_command)
  subprocess.Popen(new_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
Copy the code

3.3.4 user_function. Py

It reads as follows, just to test, print a statement.

print('hello world')
Copy the code

3.4 Experimental Results

We run the mpi command on 1.1.1.1.

mpirun --allow-run-as-root -n 4 --hostfile ./remote_hosts -mca plm_rsh_agent "python rsh.py" python user_function.py
Copy the code

Here are the results:

Plm_rsh_agent "python rsh.py"
rsh.py 1.11.1. orted -mca ess "env" -mca ess_base_jobid "41114481152" -mca ess_base_vpid 1 -mca ess_base_num_procs "4" -mca ored_node_regex "IP - [two] - 1-1-1, [two] 2.2.2 @ 0 (2)" -mca orted_hnp_uri "41114481152.0, TCP: / / 1.1.1.1:53405" -mca plm "rsh" --tree-spawn -mca orte_parent_uri "41114481152.0, TCP: / / 1.1.1.1:53405"  -mca plm_rsh_agent "python rsh.py" -mca pmix "^s1,s2,cray,isolated"

Plm_rsh_agent "python rsh.py" "python rsh.py"
orted -mca ess "env" -mca ess_base_jobid "41114481152" -mca ess_base_vpid 1 -mca ess_base_num_procs "4" -mca ored_node_regex "IP - [two] - 1-1-1, [two] 2.2.2 @ 0 (2)" -mca orted_hnp_uri "41114481152.0, TCP: / / 1.1.1.1:53405" -mca plm "rsh" --tree-spawn -mca orte_parent_uri "41114481152.0, TCP: / / 1.1.1.1:53405"  -mca plm_rsh_agent "python rsh.py" -mca pmix "^s1,s2,cray,isolated"

# Here's what user_function.py executes
hello world
Copy the code

So we know that

  • Plm_rsh_agent “Python rsh.py” is used to run MPI orted on the remote end.
  • Python user_function.py is run after RSH and is run in the remote orted.
  • During the execution of rsh.py, there is something strange about the command content it receives.

3.5 Running Process

The operation process is as follows:

  1. Mpirun runmpirun -mca plm_rsh_agent "python rsh.py" python user_function.py, an MPI daemon is run on the remote end to respond to processing.
  2. Mpirun calls rsh.py;
  3. Rsh. py uses subprocess(orted -mca plm_rsh_agent “Python rsh.py”) to start orted remotely (which communicates with daemon) and run user code;

The details are as follows:

                                                         1.11.1.        +          2.22.2.
                                                                        |
                                                                        |
                                                                        |  1      +---------------+
mpirun -mca plm_rsh_agent "python rsh.py" python user_function.py  +----------->  |  MPI deamon   |
                                                                        |         +-------+-------+
             +                                                          |                 |
             |                                                          |                 |
             | 2                                                        |                 |
             |                                                          |                 |  3
             |                                                          |                 |
             |  rsh.py 1.11.1. orted -mca plm_rsh_agent "python rsh.py"| | | | v | | +-------+--------------------------+ | | | orted | | | | | v | | | +------------+------------------------------------------------------+ | | +---------------------------+ | | rsh.py | | |  | user_function.py | | | | | | | | | | rsh.py1.11.1. orted -mca plm_rsh_agent "python rsh.py"        |   |         |   |                           |  |
|                                                                   |   |   3     |   |      print('hello world') |  |
|    subprocess(orted -mca plm_rsh_agent "python rsh.py") + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - > | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | +-------------------------------------------------------------------+ + +----------------------------------+Copy the code

Mobile phones are as follows:

3.6 was catnip analysis

We found a couple of weird things:

  • Mpirun runmpirun -mca plm_rsh_agent "python rsh.py" python user_function.py;
  • Mpirun calls rsh.py, but in the argv received by rsh.py, there is also an argvplm_rsh_agent "python rsh.py". You shouldn’t have this parameter at this point,Since rsh.py is already called, there should be no more arguments;
  • Rsh. py runs the remote MPI usingorted -mca plm_rsh_agent "python rsh.py"There are still someplm_rsh_agent "python rsh.py"This parameter. And it shouldn’t be this time, becauseOrted is already running on the remote side, so pass an RSH agent parameter for remote control;

That is, the plm_rSH_agent “python rsh.py” parameter is passed by the MPI to all stages, whether the RSH agent or the remote MPI.

RSH agent is trick. It is not clear why MPI wants plm_rSH_agent “python rsh.py” to be passed in the various phases of the intent, perhaps for better control.

When orted -mca plm_rsh_agent “python rsh.py” is run, the remote MPI daemon will not run the RSH agent anymore.

In the Spark task, plm_rSH_agent “python rsh.py” is found in the Spark task.

0 x04 MPI implementation

In general, Horovod on Spark is run in MPI mode, so let’s focus on that.

4.1 mpi_run in spark

Horovod /spark/mpi_run.py: horovod/spark/mpi_run.py

  • Generate agent of remote shell based on various configurations;
  • Generate executable commands based on various configurations;
  • Call hr_mpi_run (horovod.runner. Mpi_run is mPI_run in normal mode) to run the command.

For example, rsh_agent looks like this:

("/usr/bin/python"."-m"."horovod.spark.driver.mpirun_rsh"."xxxxx"."yyy")Copy the code

The command will look something like this:

("/usr/bin/python"."-m"."horovod.spark.task.mpirun_exec_fn"."xxxxx"."yyy")Copy the code

The specific code is as follows:

from horovod.runner.mpi_run import mpi_run as hr_mpi_run

def mpi_run(settings, nics, driver, env, stdout=None, stderr=None) :
    """ Runs mpirun. :param settings: Settings for running MPI. Note: settings.num_proc and settings.hosts must not be None. :param nics: Interfaces to include by MPI. :param driver: The Spark driver service that tasks are connected to. :param env: Environment dictionary to use for running MPI. Can be None. :param stdout: Stdout of the mpi process. Only used when settings.run_func_mode is True. :param stderr: Stderr of the mpi process. Only used when settings.run_func_mode is True. """
    env = {} if env is None else copy.copy(env)  # copy env so we do not leak env modifications

    # Pass secret key through the environment variables.
    env[secret.HOROVOD_SECRET_KEY] = codec.dumps_base64(settings.key)
    # we don't want the key to be serialized along with settings from here on
    settings.key = None

    # splice rsh_agent
    rsh_agent = (sys.executable,
                 '-m'.'horovod.spark.driver.mpirun_rsh',
                 codec.dumps_base64(driver.addresses()),
                 codec.dumps_base64(settings))
    settings.extra_mpi_args = ('{extra_mpi_args} -x NCCL_DEBUG=INFO -mca plm_rsh_agent "{rsh_agent}"'
                               .format(extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else ' ',
                                       rsh_agent=' '.join(rsh_agent)))
    # Splice command
    command = (sys.executable,
               '-m'.'horovod.spark.task.mpirun_exec_fn',
               codec.dumps_base64(driver.addresses()),
               codec.dumps_base64(settings))
    hr_mpi_run(settings, nics, env, command, stdout=stdout, stderr=stderr)
Copy the code

4.2 mpi_run in normal

Horovod.runner. Mpi_run: horovod.runner. Mpi_run: horovod.runner.

Horovod.runner. Mpi_run first builds all parameters of the mpirun command based on various configurations and parameters, such as SSH parameters, mpI parameters, NCCL parameters, and so on.

You get command that looks something like this:

mpirun --allow-run-as-root --map-by slot -x SSH_CONNECITION -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx" /usr/bin/python -m horovod.spark.task.mpurun_exec_fn xxxxx
Copy the code

The specific code is as follows:

def mpi_run(settings, nics, env, command, stdout=None, stderr=None) :
    """ Runs mpi_run. Args: settings: Settings for running MPI. Note: settings.num_proc and settings.hosts must not be None. nics: Interfaces to include by MPI. env: Environment dictionary to use for running command. command: Command and arguments to run as a list of string. stdout: Stdout of the mpi process. Only used when settings.run_func_mode is True. stderr: Stderr of the mpi process. Only used when settings.run_func_mode is True. """

    # Get mpI related configuration
    mpi_impl_flags, impl_binding_args, mpi = _get_mpi_implementation_flags(settings.tcp_flag, env=env)
    impi = _IMPI_IMPL == mpi

    # Obtain SSH configuration
    ssh_args = []
    if settings.ssh_port:
        ssh_args += [f'-p {settings.ssh_port}']
    if settings.ssh_identity_file:
        ssh_args += [f'-i {settings.ssh_identity_file}']

    mpi_ssh_args = ' '
    if ssh_args:
        joined_ssh_args = ' '.join(ssh_args)
        mpi_ssh_args = f'-bootstrap=ssh -bootstrap-exec-args \"{joined_ssh_args}\ "' if impi else f'-mca plm_rsh_args \"{joined_ssh_args}\ "'

    # Network card information
    tcp_intf_arg = '-mca btl_tcp_if_include {nics}'.format(
        nics=', '.join(nics)) if nics and not impi else ' '
    nccl_socket_intf_arg = '-{opt} NCCL_SOCKET_IFNAME={nics}'.format(
        opt='genv' if impi else 'x',
        nics=', '.join(nics)) if nics else ' '

    # On large cluster runs (e.g. Summit), we need extra settings to work around OpenMPI issues
    host_names, host_to_slots = hosts.parse_hosts_and_slots(settings.hosts)
    if not impi and host_names and len(host_names) >= _LARGE_CLUSTER_THRESHOLD:
        mpi_impl_flags.append('-mca plm_rsh_no_tree_spawn true')
        mpi_impl_flags.append('-mca plm_rsh_num_concurrent {}'.format(len(host_names)))

    # if user does not specify any hosts, mpirun by default uses local host.
    # There is no need to specify localhost.
    hosts_arg = '-{opt} {hosts}'.format(opt='hosts' if impi else 'H',
                hosts=', '.join(host_names) if host_names and impi else settings.hosts)

    ppn_arg = ' '
    if host_to_slots and impi:
        ppn = host_to_slots[host_names[0]]
        for h_name in host_names[1] :ifppn ! = host_to_slots[h_name]:raise Exception('''Different slots in -hosts parameter are not supported in Intel(R) MPI. Use -machinefile 
      
        for this purpose.'''
      )
        ppn_arg = ' -ppn {} '.format(ppn)

    if settings.prefix_output_with_timestamp and not impi:
        mpi_impl_flags.append('--timestamp-output')

    binding_args = settings.binding_args if settings.binding_args and not impi else ' '.join(impl_binding_args)

    basic_args = '-l' if impi else '--allow-run-as-root --tag-output'

    output = []
    if settings.output_filename:
        output.append('-outfile-pattern' if impi else '--output-filename')
        output.append(settings.output_filename)

    env_list = ' ' if impi else ' '.join(
                    '-x %s' % key for key in sorted(env.keys()) if env_util.is_exportable(key))

    # Pass all the env variables to the mpirun command.
    mpirun_command = (
        'mpirun {basic_args} '
        '-np {num_proc}{ppn_arg}{hosts_arg} '
        '{binding_args} '
        '{mpi_args} '
        '{mpi_ssh_args} '
        '{tcp_intf_arg} '
        '{nccl_socket_intf_arg} '
        '{output_filename_arg} '
        '{env} {extra_mpi_args} {command}'  # expect a lot of environment variables
        .format(basic_args=basic_args,
                num_proc=settings.num_proc,
                ppn_arg=ppn_arg,
                hosts_arg=hosts_arg,
                binding_args=binding_args,
                mpi_args=' '.join(mpi_impl_flags),
                tcp_intf_arg=tcp_intf_arg,
                nccl_socket_intf_arg=nccl_socket_intf_arg,
                mpi_ssh_args=mpi_ssh_args,
                output_filename_arg=' '.join(output),
                env=env_list,
                extra_mpi_args=settings.extra_mpi_args if settings.extra_mpi_args else ' ',
                command=' '.join(quote(par) for par in command))
    )

    # we need the driver's PATH and PYTHONPATH in env to run mpirun,
    # env for mpirun is different to env encoded in mpirun_command
    for var in ['PATH'.'PYTHONPATH'] :if var not in env and var in os.environ:
            # copy env so we do not leak env modifications
            env = copy.copy(env)
            # copy var over from os.environ
            env[var] = os.environ[var]

    # Execute the mpirun command.
    if settings.run_func_mode:
        exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
    else:
        os.execve('/bin/sh'['/bin/sh'.'-c', mpirun_command], env)
Copy the code

4.3 Executing Commands

The current orders are:

mpirun --allow-run-as-root --map-by slot -x SSH_CONNECITION -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx" /usr/bin/python -m horovod.spark.task.mpurun_exec_fn xxxxx
Copy the code

So let’s do the analysis.

** When mpi_run has the command ready, he calls safe_shell_exec.execute or bin/sh to execute the command. ** For safe_shell_exec.execute, the commands it needs to execute are:

mpirun --allow-run-as-root --map-by slot -x SSH_CONNECITION -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx" /usr/bin/python -m horovod.spark.task.mpurun_exec_fn xxxxx
Copy the code

. In this way, is the first call safe_shell_exec execute or bin/sh execution “/ usr/bin/python – m horovod spark. The driver. The mpirun_rsh XXXXX”, Then execute horovod. Spark. Task. Mpurun_exec_fn XXXXX.

4.3.1 the mpi parameters

For mpirun, the — MCA pls_rSH_agent RSH parameter tells nodes to use RSH for communication.

So that we know horovod. Spark. Driver. Mpirun_rsh is a time when the node communication, executed first script.

That is to say, when mpirun want to run a program in different nodes (horovod. Spark. Task. Mpurun_exec_fn), First run horovod. Spark. Driver. Mpirun_rsh to nodes in different places on starting a orted, second run on the different orted horovod. Spark. Task. Mpurun_exec_fn.

4.3.3 mpirun_rsh. Py

So, horovod. Spark. Driver. Mpirun_rsh will be the first to run, we need to first take a look at, is the bottom part in below.

The function of mpirun_rsh.py, as stated in the comment, is to be called by MPI in order to connect to a host and execute the specified command.

The command, usually orted, is used to create an MPI cluster. The Orted process is then used to start the remote process (the Python method of the Horovod user). The Orted process will run on the task with the lowest index, and other tasks on the same host will execute no-op and wait for the Orted task to finish.

Method run by MPI to connect to a host hash and execute the given command.

The command is usually `orted` to setup the MPI cluster. That `orted` process
is then used to spin-up the actual remote process, the Horovod user's Python method. The `orted` process will run on the lowest task index and all other tasks with the same host hash are  expected to no-op (see `horovod.spark._task_fn`) and wait for the first task to terminate.Copy the code

But the code is actually quite simple, it calls RSH directly, so we’ll have to keep looking.

if len(sys.argv) < 5:
    print('Usage: %s <service addresses> <settings> <host hash> '
          '
      
        '
       % sys.argv[0])
    sys.exit(1)

addresses = codec.loads_base64(sys.argv[1])
key = codec.loads_base64(os.environ.get(secret.HOROVOD_SECRET_KEY))
settings = codec.loads_base64(sys.argv[2])
host_hash = sys.argv[3]
command = "".join(sys.argv[4:])
env = {}  # orted does not need any env vars, the target training code gets env from mpirun

# Since tasks with the same host hash have shared memory,
# we will run only one orted process on the first task.
rsh(addresses, key, host_hash, command, env, 0, settings.verbose) # Direct call
Copy the code

4.3.4 RSH

This is the actual implementation of the above logic, so what RSH does is:

  • Interacts with SparkDriverService running on the SparkDriver and obtains information about tasks to be run from SparkDriverService.
  • Interact with SparkTaskService in Spark Executor and run command.

The code is:

  • Use driver_client.task_host_hash_indices(host_hash) to obtain all tasks on a host from SparkDriverService running on the SparkDriver.
  • Task_indices [local_rank] is used to obtain the corresponding task.
  • Driver_client.all_task_addresses (task_index)
  • Using task_service. SparkTaskClient. Run_command to run the command;

The following is an example of a command that has been escaped by mpirun:

/usr/local/bin/orted -mca ess "env" -mcc ess_base_num_procs "2" -mca orte_hnp_uri "xxxx" -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx"
Copy the code

The specific code is:

def rsh(driver_addresses, key, host_hash, command, env, local_rank, verbose,
        stdout=None, stderr=None, prefix_output_with_timestamp=False,
        background=True, events=None) :
    """ Method to run a command remotely given a host hash, local rank and driver addresses. This method connects to the SparkDriverService running on the Spark driver, retrieves all information required to connect to the task with given local rank of that host hash and invoke the command  there. The method returns immediately after launching the command if background is True (default). When background is set to False, this method waits for command termination and returns command's result. If there is an exception while waiting for the result (i.e. connection reset) it returns -1. :param driver_addresses: driver's addresses :param key: used for encryption of parameters passed across the hosts :param host_hash: host hash to connect to :param command: command and arguments to invoke :param env: environment to use :param local_rank: local rank on the host of task to run the command in :param verbose: verbosity level :param stdout: Task stdout is redirected to this stream. :param stderr: Task stderr is redirected to this stream. :param prefix_output_with_timestamp: shows timestamp in stdout/stderr forwarding on the driver if True :param background: run command in background if True, returns command result otherwise :param events: events to abort the command, only if background is True :return exit code if background is False """
    driver_client = driver_service.SparkDriverClient(driver_addresses, key, verbose=verbose)
    task_indices = driver_client.task_host_hash_indices(host_hash)
    task_index = task_indices[local_rank]
    task_addresses = driver_client.all_task_addresses(task_index)
    task_client = task_service.SparkTaskClient(task_index, task_addresses, key, verbose=verbose)
    task_client.stream_command_output(stdout, stderr)
    task_client.run_command(command, env,
                            capture_stdout=stdout is not None,
                            capture_stderr=stderr is not None,
                            prefix_output_with_timestamp=prefix_output_with_timestamp)

    if not background:
        events = events or []
        stop = threading.Event()
        for event in events:
            on_event(event, task_client.abort_command, stop=stop)

        try:
            exit_code = task_client.wait_for_command_exit_code()
            return exit_code
        except:
            traceback.print_exc()
            return -1
        finally:
            stop.set(a)Copy the code

4.3.5 Sending commands

The run_Command is to send a RunCommandRequest to SparkTaskService.

class BasicTaskClient(network.BasicClient) :

  def run_command(self, command, env,
                    capture_stdout=False, capture_stderr=False,
                    prefix_output_with_timestamp=False) :
        self._send(RunCommandRequest(command, env,
                                     capture_stdout, capture_stderr,
                                     prefix_output_with_timestamp))
Copy the code

The details are shown in the following logic:

Compare this to the previous test code:

                                                   Our test code     +    Horovod on spark
                                                                     |
                                                                     |
 mpirun -mca plm_rsh_agent "python rsh.py" python user_function.py   |    mpirun pls_rsh_agent "python mpirun_rsh" python -m mpurun_exec_fn
                                                                     |
          +                                                          |           +
          |                                                          |           |
          |  rsh.py 1.11.1. orted -mca plm_rsh_agent "python rsh.py" |           |    orted -mca pls_rsh_agent "python -m mpirun_rsh"
          |                                                          |           |
          v                                                                      v
+----------------------------------------------------------------+   |    +------+---------------------------------------------------+
| rsh.py (via SSH)                                               |   |    | mpirun_rsh                                               |
|                                                                |   |    |                                                          |
|    rsh.py 1.11.1. orted -mca plm_rsh_agent "python rsh.py"     |   |    +------+---------------------------------------------------+
|                                                                |   |           |
|                                                                |   |           |
|                                                                |   |           v
|                                                                |   |    +----------------------------------------------------------+
|                                                                |   |    | rsh (via RPC)                                            |
|                                                                |   |    |                                                          |
|    subprocess(orted -mca plm_rsh_agent "python rsh.py")        |   |    |                                                          |
|                                                                |   |    |  task_client = task_service.SparkTaskClient              |
|                                                                |   |    |                                                          |
|                                                                |   |    |  task_client.run_command(                                |
|                                                                |   |    |       orted -mca pls_rsh_agent "python -m mpirun_rsh") | | | | | | + -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | +------+---------------------------------------------------+ | | | | | | v | v +---------+------------------------------------------------------+ | +------+---------------------------------------------------+ | user_function.py | | | mpirun_exec_fn.py | | | | | | |print('hello world')                                        |   |    |              task_exec +--------> user_function          |
|                                                                |   |    |                                                          |
+----------------------------------------------------------------+   |    +----------------------------------------------------------+
                                                                     +

Copy the code

Mobile phones are as follows:

Therefore, the next step is to run the Spark Executor.

4.4 Run in Spark Executor

Again, notice that this is already a remote Spark Executor.

Last mentioned, the system will use task_service. SparkTaskClient. Run_command to run the command;

The following is an example of a command that has been escaped by mpirun:

/usr/local/bin/orted -mca ess "env" -mcc ess_base_num_procs "2" -mca orte_hnp_uri "xxxx" -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx"
Copy the code

Note that after the command is executed on the Spark Executor, mpirun_exec_fn is run.

Let’s look at how to handle RunCommandRequest. This is done in the BasicTaskService.

4.4.1 RunCommandRequest

As you can see, once the message is received, the call to _run_command completes.

def _handle(self, req, client_address) :
    if isinstance(req, RunCommandRequest):
        self._wait_cond.acquire()
        try:
            if self._command_thread is None:
                # we add req.env to _command_env and make this available to the executed command
                if self._command_env:
                    env = self._command_env.copy()
                    self._add_envs(env, req.env)
                    req.env = env

                # We only permit executing exactly one command, so this is idempotent.
                self._command_abort = threading.Event()
                self._command_stdout = Pipe() if req.capture_stdout else None
                self._command_stderr = Pipe() if req.capture_stderr else None
                args = (req.command, req.env, self._command_abort,
                        self._command_stdout, self._command_stderr,
                        self._index,
                        req.prefix_output_with_timestamp)
                self._command_thread = in_thread(self._run_command, args)
        finally:
            self._wait_cond.notify_all()
            self._wait_cond.release()
        return network.AckResponse()
Copy the code

4.4.2 _run_command

_run_command is run directly by calling safe_shell_exec.execute.

def _run_command(self, command, env, event,
                 stdout=None, stderr=None, index=None,
                 prefix_output_with_timestamp=False) :
    self._command_exit_code = safe_shell_exec.execute(
        command,
        env=env,
        stdout=stdout, stderr=stderr,
        index=index,
        prefix_output_with_timestamp=prefix_output_with_timestamp,
        events=[event])
    if stdout:
        stdout.close()
    if stderr:
        stderr.close()
Copy the code

Therefore, the next step is to start execution in the Spark Executor

/usr/local/bin/orted -mca ess "env" -mcc ess_base_num_procs "2" -mca orte_hnp_uri "xxxx" -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx"
Copy the code

Note that this is in the Spark Executor, so the next behavior will be different.

4.4.3 mpirun_rsh

Mpirun_rsh still calls RSH.

addresses = codec.loads_base64(sys.argv[1])
key = codec.loads_base64(os.environ.get(secret.HOROVOD_SECRET_KEY))
settings = codec.loads_base64(sys.argv[2])
host_hash = sys.argv[3]
command = "".join(sys.argv[4:])
env = {}  # orted does not need any env vars, the target training code gets env from mpirun

# Since tasks with the same host hash have shared memory,
# we will run only one orted process on the first task.
rsh(addresses, key, host_hash, command, env, 0, settings.verbose)
Copy the code

4.4.4 RSH

The code is as follows:

def rsh(driver_addresses, key, host_hash, command, env, local_rank, verbose,
        stdout=None, stderr=None, prefix_output_with_timestamp=False,
        background=True, events=None) :
    driver_client = driver_service.SparkDriverClient(driver_addresses, key, verbose=verbose)
    task_indices = driver_client.task_host_hash_indices(host_hash)
    task_index = task_indices[local_rank]
    task_addresses = driver_client.all_task_addresses(task_index)
    task_client = task_service.SparkTaskClient(task_index, task_addresses, key, verbose=verbose)
    task_client.stream_command_output(stdout, stderr)
    task_client.run_command(command, env,
                            capture_stdout=stdout is not None,
                            capture_stderr=stderr is not None,
                            prefix_output_with_timestamp=prefix_output_with_timestamp)
Copy the code

But now the run is different.

In this case, call the Spark Executor again

/usr/local/bin/orted -mca ess "env" -mcc ess_base_num_procs "2" -mca orte_hnp_uri "xxxx" -mca pls_rsh_agent "/usr/bin/python -m horovod.spark.driver.mpirun_rsh xxxxx"
Copy the code

Recall 0 x03 MPI experiment results, we know that pls_rsh_agent “/ usr/bin/python – m horovod spark. The driver. The mpirun_rsh XXXXX” this part on the far side actually there would be no practical effect. The remote Orted, in turn, continues to run the mpirun_exec_fn that was passed in.

If any friend has a deep understanding of MPI, also hope to give advice.

4.4.5 mpirun_exec_fn

Code is located in: horovod/spark/task/mpirun_exec_fn py.

Task_exec is called.

def main(driver_addresses, settings) :
    # prepend HOROVOD_SPARK_PYTHONPATH to PYTHONPATH
    if 'HOROVOD_SPARK_PYTHONPATH' in os.environ:
        ppath = os.environ['HOROVOD_SPARK_PYTHONPATH']

        # add injected HOROVOD_SPARK_PYTHONPATH to sys.path
        for p in reversed(ppath.split(os.pathsep)):
            sys.path.insert(1, p)  # don't put it in front which is usually .

        if 'PYTHONPATH' in os.environ:
            ppath = os.pathsep.join([ppath, os.environ['PYTHONPATH']])
        os.environ['PYTHONPATH'] = ppath

    # change current working dir to where the Spark worker runs
    # because orted runs this script where mpirun was executed
    # this env var is injected by the Spark task service
    work_dir = os.environ.get('HOROVOD_SPARK_WORK_DIR')
    if work_dir:
        os.chdir(work_dir)

    task_exec(driver_addresses, settings, 'OMPI_COMM_WORLD_RANK'.'OMPI_COMM_WORLD_LOCAL_RANK')
Copy the code

0x05 Stage 5: Run user code

5.1 task_exec

Task_exec is used to run user code.

As you can see, the previously stored code is pulled from the Driver and run.

def task_exec(driver_addresses, settings, rank_env, local_rank_env) :
    # Die if parent process terminates
    in_thread(target=_parent_process_monitor, args=(os.getppid(),))

    key = codec.loads_base64(os.environ[secret.HOROVOD_SECRET_KEY])
    rank = int(os.environ[rank_env])
    local_rank = int(os.environ[local_rank_env])
    driver_client = driver_service.SparkDriverClient(driver_addresses, key,
                                                     verbose=settings.verbose)

    # tell driver about local rank and rank
    # in elastic mode the driver already knows this mapping
    # for simplicity we keep code paths the same for elastic and static mode
    host_hash = os.environ['HOROVOD_HOSTNAME']
    task_index = driver_client.set_local_rank_to_rank(host_hash, local_rank, rank)

    # gather available resources from task service
    task_addresses = driver_client.all_task_addresses(task_index)
    task_client = task_service.SparkTaskClient(task_index, task_addresses, key,
                                               verbose=settings.verbose)
    task_info.set_resources(task_client.resources())

    fn, args, kwargs = driver_client.code()
    result = fn(*args, **kwargs)
    task_client.register_code_result(result)
Copy the code

5.2 Obtaining the training code

In MapReduce, Jar packages (binary libraries) are distributed to nodes, which then execute the corresponding code in the Jar packages. It’s actually very inconvenient.

Spark solves this problem with function serialization, which is one of Spark’s contributions to distributed programming. The Spark system automatically serializes the custom functions you write (your business functions) to each node for execution. The function serialization function provides the following benefits to Spark: Users can use spark-shell to directly write distributed code on the command line, perform operations in real time, and obtain real-time results.

For example, initialization/coordination takes place in the Driver program, but the actual code execution takes place in the Executor in the Worker node. When an Executor needs a class that is encapsulated by the Driver, the Driver needs to serialize the class to the Executor. The Driver needs to implement Serializable Serializable Serializable class.

Horovod on Spark is a direct transfer of python training code raw text, which can be run directly because of pyton’s scripting features.

In the SparkDriverClient class, it sends a CodeRequest request to the Driver:

def code(self) :
    resp = self._send(CodeRequest())
    return resp.fn, resp.args, resp.kwargs
Copy the code

In SparkDriverService, after the CodeRequest request is received, it is processed.

if isinstance(req, CodeRequest):
    return CodeResponse(self._fn, self._args, self._kwargs)
Copy the code

SparkDriverService sends the training code _fn and its parameters to SparkTaskService.

class CodeResponse(object) :
    def __init__(self, fn, args, kwargs) :
        self.fn = fn
        """Function."""

        self.args = args
        """Function args."""

        self.kwargs = kwargs
        """Function kwargs."""
Copy the code

The final logic is something like this:

+---------------------------------+ +---------------------------------+ | Horovod Main thread | | Spark Executor | | | |  | | | | | | +-------------------------+ |1 register    |        +----------------------+ |
|  |     SparkDriverService  | <---------------------------------+  SparkTaskService    | |
|  |                         |    |                     |        |                      | |
|  |                         |    |      2 notify start |        |                      | |
|  |                         | +-------------------------------> |                      | |
|  |                         |    |                     |        |                      | |
|  |                         |    |                     |        |                      | |
|  |                         |    | 3RunCommandRequest | | | | | | | +--------------------------------------> orted mpirun_rsh| | | | | | | | + | | | | | | |  | |4           | |
|  |                         |    |                     |        |        |             | |
|  |                         |    |                     |        |        v             | |
|  |                         |    |                     |        |      task_exec       | |
|  |                         |    |                     |        |        +             | |
|  |                         |    |                     |        |        | 5           | |
|  |                         |    |                     +        |        |             | |
|  |                         |    |6 set_local_rank_to_rank      |        v             | |
|  |                         | +------------------------+---------> SparkTaskClient     | |
|  |                         |    |                     |        |                      | |
|  |                         |    |    7 code()         |        |                      | |
|  |                         | +---------------------------------------> 8 fn()         | |
|  |                         |    |                     |        |                      | |
|  +-------------------------+    |                     |        +----------------------+ |
+---------------------------------+                     +---------------------------------+

Copy the code

Mobile phones are as follows:

Now, the spark on MPI analysis is complete. The following section describes Spark on GLOO.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0xFF

What are the differences and relationships between mpirun, mpiexec and mpiexec. Hydra?