0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This is the eleventh article in a series to see how Horovod runs on Spark (implemented by GLOO).

Horovod on Spark has two underlying implementations: MPI and GLOO. Due to space constraints, this article introduces the GLOO implementation. In order to a single article can be written, so this article and the above part of the repetition, hope understanding.

The other articles in this series are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

Horovod (10) — Run on Spark

0 x01 review

1.1 Overall sequence diagram

Let’s first review the overall sequence diagram of Horovod on Spark. It should be noted that the overall sequence diagram, starting with MPI_run, is an MPI-related implementation, but this article is Gloo, so it will start with mPI_run differently.

1.2 Overall Logic

Overall, the overall logic of Horovod on Spark is divided into the following phases:

  • Start SparkDriverService and use _make_SPARk_thread to start the Spark Task. Then horovod waits for the start to finish.
  • Multithreading starts spark Tasks in the Spark Executor, and each task runs a SparkTaskService. SparkTaskService registers SparkDriverTask in the hovorod main process and waits for the next command to start.
  • Horovod notifying each task to proceed to the next stage after receiving the completion of all tasks.
  • Horovod callmpi_run(Use againmpirun_rsh.pyStart the MPI cluster by starting orted (in this case SparkTaskService) on each Spark Executor;
  • Orted runs training code on top of each Spark Executor;

Having already analyzed the first three stages, this article continues the analysis of the next two.

0x02 Stage 4: Start the Job

Let’s look at the fourth stage, which is how to run a training job.

2.1 GLOO VS the MPI

The question for this article is how Gloo differs from an MPI implementation.

2.1.1 Trouble with MPI

MPI is in trouble because:

  • Usually, the MPI connects to the hosts through SSH, but this method does not enable Python funtion in the Spark Executor.
  • Orted needs to run in the Spark Executor, but mpirun does not know the combination of the Spark Executor IP and PORT when it starts, so it cannot be started directly.
  • So MPI uses RPC to start the user code:
    • SparkDriverService: SparkTaskService: SparkDriverService: SparkTaskService Start SparkTaskService in Spark Executor, and then set SparkTaskService IP to: The PORT is registered with SparkDriverService in the Horovod main process.
    • Using horovod. Spark. Driver. Mpirun_rsh to connect each Executor, then “remote shell” into the executors.
    • Start Orted directly with SparkTaskService.

2.1.2 Gloo Key points

Let’s look at the key points of Gloo. In normal mode, the Gloo scheme would:

  • Create a RendezvousServer with a KVStore, in which the driver stores information such as the IP of the workers involved in the communication into the KVStore. This information is used to help the worker call Gloo to construct an AllReduce communication ring.
  • The worker can then call gloo to access the RendezvousServer to construct the communication ring.

In Horovod on Spark, the key points are:

  • How is the RendezvousServer constructed, and how does the RendezvousServer know the IP :port of the Executor (or similar entity)?
  • How does SparkTaskService on Executor communicate with RendezvousServer to know its own network information and that of its neighbors?

Let’s look at the code for an answer.

2.2 Review the startup process

Let’s start by reviewing how we started.

The logic of horovod.spark.run is:

  • Handles various configurations such as timeout, nice… ;
  • Obtain Spark information, for example, obtain SparkContext from PySpark.
  • Build the SparkDriverService (SparkDriverService).
  • Using _make_SPARk_thread to start spark executors (and one SparkTaskService per Spark Executor), a cluster is built.
  • Each SparkTaskService will pass driver_service. SparkDriverClient. Register_task to horovod the Driver registration; This is where the RendezvousServer knows the SparkTaskService’s IP: PORT.
  • Wait for all spark tasks to finish with _notify_and_register_task_addresses;
  • using_launch_jobStart training;
  • Use spark_thread.join to collect training results;

The key point is that SparkTaskService itself has an internal HTTP server that registers its IP: PORT information with the Driver.

2.3 _launch_job

Let’s start with _launch_job.

The _launch_job is simple:

  • First of all,driver.get_common_interfacesObtain the network routing information, which will be recorded by RendezvousServer and eventually utilized by SparkTaskService on Executor;
  • Next, call run_contoller to start the job;
def _launch_job(use_mpi, use_gloo, settings, driver, env, stdout=None, stderr=None) :
    nics = driver.get_common_interfaces()
    # Transfer network routing information when gloo_run is called.
    run_controller(use_gloo, lambda: gloo_run(settings, nics, driver, env, stdout, stderr),
                   use_mpi, lambda: mpi_run(settings, nics, driver, env, stdout, stderr),
                   False.lambda: None,
                   settings.verbose)
Copy the code

2.3 Obtaining Route Information

The Driver’s get_common_interfaces are different from the get_common_interfaces in normal mode. At this time, the SparkTaskService information in the Spark Executor has been saved in the Driver. You can directly obtain the information.

def get_common_interfaces(self) :
    if self._nics is not None:
        return self._nics

    nics = None
    if len(self._task_addresses_for_tasks) > 0:
        # in Elastic Horovod on Spark with auto-scaling
        # keys in task_addresses are in range(max_np or proc_num)
        # but not all keys may exist, so we don't do for index in range(proc_num)
        indices = list(self._task_addresses_for_tasks.keys())
        nics = set(self._task_addresses_for_tasks[indices[0]].keys())
        for index in indices[1:]:
            nics.intersection_update(self._task_addresses_for_tasks[index].keys())

    return nics
Copy the code

2.4 run_controller

It is based on configuration and compilation to process, choose gloo, JS, or MPI.

def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
    if use_gloo:
        gloo_run()
    elif use_mpi:
        mpi_run()
    elif use_jsrun:
        js_run()
    else:
        if mpi_built(verbose=verbose):
            if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
                js_run()
            else:
                mpi_run()
        elif gloo_built(verbose=verbose):
            gloo_run()

Copy the code

So let’s start the job and analyze the GLOO.

0 x03 Gloo implementation

Compared to the MPI, the Gloo part is clearer.

3.1 gloo_run

Go back to 2.3 run_controller.

It is based on configuration and compilation to process, choose gloo, JS, or MPI.

def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
    if use_gloo:
        gloo_run() This article is called here
    elif use_mpi:
        mpi_run() # mpi will be called here
    elif use_jsrun:
        js_run()
    else:
        if mpi_built(verbose=verbose):
            if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
                js_run()
            else:
                mpi_run() # mpi will be called here
        elif gloo_built(verbose=verbose):
            gloo_run() This article is called here

Copy the code

If gloo is configured, we use gloo_run:

def gloo_run(settings, nics, driver, env, stdout=None, stderr=None) :
    """ Run distributed gloo jobs. :param settings: Settings for running the distributed jobs. Note: settings.num_proc and settings.hosts must not be None. :param nics: Interfaces to use by gloo. :param driver: The Spark driver service that tasks are connected to. :param env: Environment dictionary to use for running gloo jobs. Can be None. :param stdout: Horovod stdout is redirected to this stream. :param stderr: Horovod stderr is redirected to this stream. """
    if env is None:
        env = {}

    # we don't want the key to be serialized along with settings from here on
    key = settings.key
    settings.key = None

    # Each thread will use SparkTaskClient to launch the job on each remote host. If an
    # error occurs in one thread, entire process will be terminated. Otherwise,
    # threads will keep running and ssh session.
    iface = list(nics)[0]
    server_ip = driver.addresses()[iface][0] [0]
    # Here builds the commands that need to be executed
    command = (sys.executable,
               '-m'.'horovod.spark.task.gloo_exec_fn'.# This is the code that runs in task
               codec.dumps_base64(driver.addresses()),
               codec.dumps_base64(settings))

    # think of _exec_command_fn as the ability to execute commands
    exec_command = _exec_command_fn(driver, key, settings, env,
                                    stdout, stderr, settings.prefix_output_with_timestamp)
    # Routing information is passed in
    launch_gloo(command, exec_command, settings, nics, {}, server_ip)
Copy the code

It is important to note that _exec_command_fn is as follows. We can think of _exec_command_fn as the ability to execute a command:

def _exec_command_fn(driver, key, settings, env, stdout, stderr, prefix_output_with_timestamp) :
    def _exec_command(command, slot_info, events) :
        host = slot_info.hostname # the host name
        local_rank = slot_info.local_rank # local rank
        verbose = settings.verbose
        # Runtime capability encapsulated in RSH
        result = rsh(driver.addresses(), key, host, command, env, local_rank, verbose,
                     stdout, stderr, prefix_output_with_timestamp, False, events)
        return result, time.time()
    return _exec_command
Copy the code

From horovod.spark.driver. RSH import RSH is called. Here’s the key.

3.2 launch_gloo

Here are the main ones:

  • First of all, notice that in the argument,
    • Command is roughly: ‘python’,’-m’,’horovod.spark.task.gloo_exec_fn’;
    • Exec_command is roughly RSH XXXX. Python -m horovod.spark.task.gloo_exec_fn python -m horovod.spark.task.gloo_exec_fn
  • Establishes the RendezvousServer;
  • Slot_info_to_command is built, which specifies which slot to run on;
  • Call execute_function_multithreaded to run the command with multiple threads;
def launch_gloo(command, exec_command, settings, nics, env, server_ip) :
    """ Launches the given command multiple times using gloo. Each command is launched via exec_command. :param command: command to launch :param exec_command: means to execute a single command :param settings: settings for the distribution :param nics: common interfaces :param env: environment to use :param server_ip: ip to use for rendezvous server """.# start global rendezvous server and get port that it is listening on
    Set up RendezvousServer, which will be used by the underlying Gloo C++ environment
    rendezvous = RendezvousServer(settings.verbose)

    # allocate processes into slots
    Horovod: horovod: horovod: Horovod: Horovod: Horovod: Horovod: Horovod: Horovod
    hosts = parse_hosts(settings.hosts)
    host_alloc_plan = get_host_assignments(hosts, settings.num_proc)

    # start global rendezvous server and get port that it is listening on
    global_rendezv_port = rendezvous.start()
    rendezvous.init(host_alloc_plan)
    # Obtain the executable command
    run_command = get_run_command(command, server_ip, nics, global_rendezv_port)

    # Get a slot command that can be executed on the slot
    slot_info_to_command = _slot_info_to_command_fn(run_command, env)
    event = register_shutdown_event()
    # Build args_list based on slot_info_to_command_fn, where each ARG is a slot command
    args_list = [[slot_info_to_command(slot_info), slot_info, [event]]
                 for slot_info in host_alloc_plan]

    # If an error occurs in one thread, entire process will be terminated.
    # Otherwise, threads will keep running.
    # multithreaded execution, executing each arg (slot command) on each exec_command, args_list including information such as HOROVOD_GLOO_RENDEZVOUS_ADDR
    res = threads.execute_function_multithreaded(exec_command,
                                                 args_list,
                                                 block_until_all_done=True)...Copy the code

The details are shown in the figure below:

               launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn'
                    +       exec_command = rsh xxxx)
                    |
                    |
                    |
                    |
                    |
                    v
               RendezvousServer
                    +
                    |
                    |   get_run_command
                    |
                    |
                    v
 run_command = HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222
               HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ......
               python +m horovod.spark.task.gloo_exec_fn
 exec_command = rsh xxxx

                    +
                    |
                    |   _slot_info_to_command_fn
                    |
                    v

slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo......
                       HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ...... python -m horovod.spark.task.gloo_exec_fn exec_command = rsh xxxx + | | | v threads.execute_function_multithreaded + | |  vCopy the code

Mobile phones are as follows:

3.2.1 get_run_command

The key get_run_command used in the launch_gloo code is to call create_run_env_vars to get the information gloo needs and build the run_command based on it. The format is as follows:

HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py


Copy the code

The code is as follows:

def get_run_command(command, server_ip, nics, port, elastic=False) :
    env_vars = create_run_env_vars(server_ip, nics, port, elastic)
    env_string = "".join(
        [f"{k}={str(v)}" for k, v in env_vars.items()])
    run_command = (
        '{env_string} '
        '{command}'  # expect a lot of environment variables
        .format(env_string=env_string,
                command=' '.join(quote(par) for par in command)))
    return run_command


Copy the code

3.2.2 create_run_env_vars

The create_run_env_vars function builds information about gloo running, which is then passed to the Spark Executor.

def create_run_env_vars(server_ip, nics, port, elastic=False) :
    run_envs = {
        'HOROVOD_GLOO_RENDEZVOUS_ADDR': server_ip,
        'HOROVOD_GLOO_RENDEZVOUS_PORT': port,
        'HOROVOD_CONTROLLER': "gloo".'HOROVOD_CPU_OPERATIONS': "gloo".'HOROVOD_GLOO_IFACE': list(nics)[0].# TODO: add multiple ifaces in future
        'NCCL_SOCKET_IFNAME': ', '.join(nics), # Here is the network routing information needed to build the loop
    }
    if elastic:
        run_envs["HOROVOD_ELASTIC"] = "1"
    return run_envs


Copy the code

3.3 RSH

In execute_function_multithreaded, RSH is invoked and eventually interacts with the Spark Executor.

Specific will:

  • Obtain the driver handle;
  • Use the driver Handle to call SparkDriverClient to obtain task information.
  • Obtain task handle;
  • Call SparkTaskClient’s run_command method to send a command to the Spark Executor, The command parameter here is roughly “‘python -m horovod.spark.task.gloo_exec_fn”;
  • Wait for the result of operation;

When RSH is called, Commands may include something like horovod_gloo_rendezvous OUS_ADDR=1.1.1.1 horoVOD_gloo_rendezvous OUS_port =2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo, so SparkDriverService knows how to construct a Ring route.

def rsh(driver_addresses, key, host_hash, command, env, local_rank, verbose,
        stdout=None, stderr=None, prefix_output_with_timestamp=False,
        background=True, events=None) :
    """ Method to run a command remotely given a host hash, local rank and driver addresses. This method connects to the SparkDriverService running on the Spark driver, retrieves all information required to connect to the task with given local rank of that host hash and invoke the command  there. The method returns immediately after launching the command if background is True (default). When background is set to False, this method waits for command termination and returns command's result. If there is an exception while waiting for the result (i.e. connection reset) it returns -1. :param driver_addresses: driver's addresses :param key: used for encryption of parameters passed across the hosts :param host_hash: host hash to connect to :param command: command and arguments to invoke :param env: environment to use :param local_rank: local rank on the host of task to run the command in :param verbose: verbosity level :param stdout: Task stdout is redirected to this stream. :param stderr: Task stderr is redirected to this stream. :param prefix_output_with_timestamp: shows timestamp in stdout/stderr forwarding on the driver if True :param background: run command in background if True, returns command result otherwise :param events: events to abort the command, only if background is True :return exit code if background is False """
    if ':' in host_hash:
        raise Exception('Illegal host hash provided. Are you using Open MPI 4.0.0+? ')

    # Obtain the driver handle
    driver_client = driver_service.SparkDriverClient(driver_addresses, key, verbose=verbose)
    Use the configuration to determine which task to run
    task_indices = driver_client.task_host_hash_indices(host_hash)
    task_index = task_indices[local_rank]
    task_addresses = driver_client.all_task_addresses(task_index)
    # Get task handle
    task_client = task_service.SparkTaskClient(task_index, task_addresses, key, verbose=verbose)
    task_client.stream_command_output(stdout, stderr)
    Python -m horovod.spark.task.gloo_exec_fn
    task_client.run_command(command, env,
                            capture_stdout=stdout is not None,
                            capture_stderr=stderr is not None,
                            prefix_output_with_timestamp=prefix_output_with_timestamp)

    if not background:
        events = events or []
        stop = threading.Event()
        for event in events:
            on_event(event, task_client.abort_command, stop=stop)
        try:
            exit_code = task_client.wait_for_command_exit_code()
            return exit_code
        except:
            traceback.print_exc()
            return -1
        finally:
            stop.set(a)Copy the code

Therefore, the logic is as follows: Finally, run python -m horovod.spark.task.gloo_exec_fn on the Spark Executor:

                                                                                                          Horovod Job    +    Spark Host
                                                                                                                         |
SparkDriverService                           horovod.spark.run                                                           |                    SparkTaskService
         +                                        +                                                                      |                           +
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                                                                                               |                           |
         |                                   launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn'       |                           |
         |                                        +       exec_command = rsh xxxx)                                       |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                   RendezvousServer                                                            |                           |
         |                                        +                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |   get_run_command                                                    |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                    run_command = HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222       |                           |
         |                                  HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ......                     |                           |
         |                                  python +m horovod.spark.task.gloo_exec_fn                                    |                           |
         |                    exec_command = rsh xxxx                                                                    |                           |
         |                                                                                                               |                           |
         |                                        +                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |   _slot_info_to_command_fn                                           |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                                                                                               |                           |
         |                    slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo......    |                           |
         |                                        HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222| | | HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ...... | | | python +m horovod.spark.task.gloo_exec_fn | | | exec_command = rsh xxxx | | | + | | | | | | | | | | | v | | | threads.execute_function_multithreaded | | | + | | | | | | | v | | | rsh | | | + | | | <----------------------------------+ | | | | task_host_hash_indices | | | | | | | | <----------------------------------+  | run_command(command, env) | RunCommandRequest | | all_task_addresses | | | | | +---------------------------------------------------------------------------------------------> | | | | + | | | run command | | | + | | | | | | | | v v | | + vCopy the code

Mobile phones are as follows:

3.4 gloo_exec_fn

Note that the Executor on Spark Host is already running at this point.

Gloo_exec_fn corresponds to mpirun_exec_fn in the previous MPI version.

Spark Run horovod.spark.task.gloo_exec_fn on Executor.

The contents of horovod.spark.task.gloo_exec_fn are as follows:

from horovod.spark.task import task_exec
from horovod.runner.common.util import codec

def main(driver_addresses, settings) :
    task_exec(driver_addresses, settings, 'HOROVOD_RANK'.'HOROVOD_LOCAL_RANK')

if __name__ == '__main__':
    if len(sys.argv) ! =3:
        print('Usage: %s <driver addresses> <settings>' % sys.argv[0])
        sys.exit(1)
    main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))


Copy the code

0x04 Stage 5: Run user code

The task_exec function runs user code to train.

Task_exec is located in: horovod/spark/task/__init__.py

Specific will:

  • Call SparkDriverClient to obtain task information.
  • Call SparkTaskClient to get the user code;
  • Execute user code and so on.
def task_exec(driver_addresses, settings, rank_env, local_rank_env) :
    # Die if parent process terminates
    in_thread(target=_parent_process_monitor, args=(os.getppid(),))

    key = codec.loads_base64(os.environ[secret.HOROVOD_SECRET_KEY])
    rank = int(os.environ[rank_env])
    local_rank = int(os.environ[local_rank_env])
    driver_client = driver_service.SparkDriverClient(driver_addresses, key,
                                                     verbose=settings.verbose)

    # tell driver about local rank and rank
    # in elastic mode the driver already knows this mapping
    # for simplicity we keep code paths the same for elastic and static mode
    host_hash = os.environ['HOROVOD_HOSTNAME']
    task_index = driver_client.set_local_rank_to_rank(host_hash, local_rank, rank)

    # gather available resources from task service
    task_addresses = driver_client.all_task_addresses(task_index)
    task_client = task_service.SparkTaskClient(task_index, task_addresses, key,
                                               verbose=settings.verbose)
    task_info.set_resources(task_client.resources())

    fn, args, kwargs = driver_client.code()
    result = fn(*args, **kwargs)
    task_client.register_code_result(result)


Copy the code

The final code looks like this:

                                                                                                          Horovod Job    +    Spark Host
                                                                                                                         |
SparkDriverService                           horovod.spark.run                                                           |                    SparkTaskService
         +                                        +                                                                      |                           +
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                                                                                               |                           |
         |                                   launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn'       |                           |
         |                                        +       exec_command = rsh xxxx)                                       |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                   RendezvousServer                                                            |                           |
         |                                        +                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |   get_run_command                                                    |                           |
         |                                        |                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                     run_command = rendevous_addr, rendevous_port python -m horovod.spark.task.gloo_exec_fn    |                           |
         |                    exec_command = rsh xxxx                                                                    |                           |
         |                                                                                                               |                           |
         |                                        +                                                                      |                           |
         |                                        |                                                                      |                           |
         |                                        |   _slot_info_to_command_fn                                           |                           |
         |                                        |                                                                      |                           |
         |                                        v                                                                      |                           |
         |                                                                                                               |                           |
         |                    slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo...... | | | rendevous_addr, rendevous_port python -m horovod.spark.task.gloo_exec_fn | | | exec_command = rsh xxxx | | | + | | | | | | | | | | | v |  | | threads.execute_function_multithreaded | | | + | | | | | | | v | | | rsh | | | | | | | <----------------------------------+ | | | | task_host_hash_indices | | | | | | | | <----------------------------------+  | run_command(command, env) | RunCommandRequest | | all_task_addresses | | | | | +---------------------------------------------------------------------------------------------> | | | | | | | | run command | | | + | | | code() | | <----------------------------------------------------------------------------------------------------------------------- --------------+ | | | | | | +----------------------------------------------------------------------------------------------------------------------- --------------> | | | | code of gloo_exec_fn | | | | | | | | gloo_exec_fn | | | | | | | | | | | task_exec v | | | v | | + vCopy the code

Mobile phones are as follows:

0 x05 summary

In normal mode, the Gloo scheme will:

  • Create a RendezvousServer with a KVStore, in which the driver stores information such as the IP of the workers involved in the communication into the KVStore.
  • The worker can then call gloo to access the RendezvousServer to construct the communication ring.

In Horovod on Spark via GLOO, the key points are:

  • How to construct the RendezvousServer? How does the RendezvousServer know the Executor IP :port?
    • The answer is:
      • In the driver of Horovod, RendezvousServer is created.
      • In before initialization process, each can through driver_service SparkTaskService SparkDriverClient. Register_task to horovod the Driver registration; This is where the RendezvousServer tells you the SparkTaskService’s IP: PORT.
  • How does SparkTaskService on Executor communicate with RendezvousServer to know its own network information and that of its neighbors?
    • The answer is:
      • In execute_function_multithreaded, RSH is invoked and eventually interacts with the Spark Executor.
      • When RSH is called, it will be similar toHOROVOD_GLOO_RENDEZVOUS_ADDR = 1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS = gloo HOROVOD_GLOO_IFACE = loThe information is passed, including the RendezvousServer’s address, so that the SparkTaskService in the Spark Executor knows how to find the RendezvousServer, which in turn knows how to build the ring.

Horovod on Spark: Horovod on Spark: Horovod on Spark

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.