0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This article, the thirteenth in a series, looks at Driver roles in the elastic implementation of Horovod.

This section corresponds to the Driver Main section in the architecture diagram. Since this section is strongly related to Host Discovery, it is shown together. Since the main body of elasticity training is the Driver, this article analyzes the calling code and the Driver together.

Links to other articles in this series are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

Horovod (10) — Run on Spark

Horovod (11) — on Spark — GLOO scheme

Horovod (12) — a distributed training framework for deep learning horoVOd (12) — an overall architecture for elastic training

0 x01 role

The first thing we need to remember is the role of resilience training.

1.1 Role Setting

Horovod’s flex training consists of two roles, the driver process and the worker process. The driver process runs on the CPU node, while the worker process can run on the CPU or GPU node.

These roles are still very similar to Spark’s Driver, Executor. The Driver process can be considered as the Driver or master node of Spark. Workers are similar to Spark’s Executor.

The details are as follows:

+------------------------------+ | | | Driver | | | | | +-----+-------+--------+-------+ ^ ^ ^ | | | | | | +-------------+ | +--------------+ | | | | | | | | | v v v +--------+----+ +-------+------+ +----+--------+ | Worker | |  Worker | | Worker | | | | | | | | host1 | | host2 | | host3 | +-------------+ +--------------+ +-------------+Copy the code

1.2 responsibilities

The role’s responsibilities are as follows:

Master (control node) Responsibilities:

  • Responsible for real-time detection of whether the existing worker (work node) changes or drops;
  • Responsible for real-time monitoring of host changes through scripts;
  • Responsible for assigning tasks to the living worker (work node);
  • When a process fails due to a failed AllReduce call, the master organizes the remaining living processes through the blacklist mechanism to construct a new ring.
  • If a new host is added, a new worker is generated on top of the new host, and the new worker and the old worker construct a new communication ring together.

Worker (s) responsibilities:

  • Responsible for reporting (actually passive, no active mechanism) their status (i.e. training completion);

  • Is responsible for performing training on the data that the worker is responsible for.

0x02 Call part

Let’s first look at the invocation part, elastic invocation goes from universal to special, a little bit deeper.

2.1 _run

The entry point to the horovod program was the _run function. As you can see, different paths will be chosen depending on whether or not it is elastic training. We begin this article with _run_elastic.

def _run(args) :
    # if hosts are not specified, either parse from hostfile, or default as
    # localhost
    if not args.hosts and not args.host_discovery_script:
        if args.hostfile:
            args.hosts = hosts.parse_host_files(args.hostfile)
        else:
            # Set hosts to localhost if not specified
            args.hosts = 'localhost:{np}'.format(np=args.np)

    # Convert nics into set
    args.nics = set(args.nics.split(', ')) if args.nics else None

    if _is_elastic(args):
        return _run_elastic(args) # This article is here
    else:
        return _run_static(args)
Copy the code

2.2 _run_elastic

The logic of this part is as follows:

  • First of all, if the parameter configuration “parameters for the script”, call the discovery. HostDiscoveryScript get an object (currently only one object, in the future to build ElasticDriver will host information). Otherwise, read the fixed host configuration directly;
  • Second, configure ElasticSettings using host configuration and other information.
  • Finally, gloo_run_elastic is called to do elasticity training;

The code is as follows:

def _run_elastic(args) :
    # construct host discovery component
    if args.host_discovery_script:
        discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
    elif args.hosts:
        _, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
        discover_hosts = discovery.FixedHosts(available_host_slots)
    ......

    # horovodrun has to finish all the checks before this timeout runs out.
    settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
                                                min_np=args.min_np or args.np,
                                                max_np=args.max_np,
                                                elastic_timeout=args.elastic_timeout,
                                                reset_limit=args.reset_limit,
                                                num_proc=args.np,
                                                verbose=2 if args.verbose else 0,
                                                ssh_port=args.ssh_port,
                                                ssh_identity_file=args.ssh_identity_file,
                                                extra_mpi_args=args.mpi_args,
                                                key=secret.make_secret_key(),
                                                start_timeout=tmout,
                                                output_filename=args.output_filename,
                                                run_func_mode=args.run_func is not None,
                                                nics=args.nics,...)

    env = os.environ.copy()
    config_parser.set_env_from_args(env, args)
    gloo_run_elastic(settings, env, args.command)
Copy the code

2.3 gloo_run_elastic

This part begins with the first part of the architecture diagram:

The main work is as follows:

  • Defines get_common_interfaces, which are functions that retrieve network routing information and host capabilities. One thing to note is that it waits for the minimum number of nodes required before it begins to obtain network routes.
  • _exec_command_fn, which we described earlier, provides the ability to run commands, or an environment to run them;
  • A RendezvousServer is established to hold various host information.
  • Run launch_gloo_elastic with these arguments and the command argument, which is similarpython train.py;
def gloo_run_elastic(settings, env, command) :

    def get_common_interfaces(driver) :
        # Host-to-host common interface detection requires at least 2 hosts in an elastic job.
        min_hosts = _get_min_start_hosts(settings)
        current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
        return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)

    exec_command = _exec_command_fn(settings)
    rendezvous = RendezvousServer(settings.verbose)
    launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)
Copy the code

2.4 get_common_interfaces

Get_common_interfaces allows you to get network routing information as well as hosts.

  • If a remote host is configured, distributed execution is performed on each host to obtain the network adapter and routing information of each host.
  • Otherwise, obtain information such as the local network adapter.

The specific function is in: runner/driver/driver_service.py

def get_common_interfaces(settings, all_host_names, remote_host_names=None, fn_cache=None) :
    if remote_host_names is None:
        remote_host_names = network.filter_local_addresses(all_host_names)

    if len(remote_host_names) > 0:
        if settings.nics:
            # If args.nics is provided, we will use those interfaces. All the workers
            # must have at least one of those interfaces available.
            nics = settings.nics
        else:
            # Find the set of common, routed interfaces on all the hosts (remote
            # and local) and specify it in the args to be used by NCCL. It is
            # expected that the following function will find at least one interface
            # otherwise, it will raise an exception.
            local_host_names = set(all_host_names) - set(remote_host_names)
            nics = _driver_fn(all_host_names, local_host_names, settings, fn_cache=fn_cache)

    else:
        nics = get_local_interfaces(settings)
    return nics
Copy the code

Get_local_interfaces gets the network adapter information for the local host.

def get_local_interfaces(settings) :
    if settings.verbose >= 2:
        print('All hosts are local, finding the interfaces '
              'with the address 127.0.0.1')
    # If all the given hosts are local, find the interfaces with address
    # 127.0.0.1
    nics = set(a)for iface, addrs in net_if_addrs().items():
        if settings.nics and iface not in settings.nics:
            continue
        for addr in addrs:
            if addr.family == AF_INET and addr.address == '127.0.0.1':
                nics.add(iface)
                break

    return nics
Copy the code

2.5 Obtaining Remote NIC Information

This information has been described in the previous article, but we will simplify it as follows:

The _driver_fn function is distributed to execute the search function, which is:

  • Start the service service.
  • Get the address of the driver service using driver.addresses() (usingself._addresses = self._get_local_addresses()Complete);
  • Start the Task service in each worker using _launch_task_Servers (using the address of the Driver service), and the task service is registered in the Service service.
  • Because it is a ring, each worker will detect all network interfaces for worker index + 1;
  • Finally, _run_probe returns an intersection of all routing interfaces on all workers;
@cache.use_cache()
def _driver_fn(all_host_names, local_host_names, settings) :
    """ launches the service service, launches the task service on each worker and have them register with the service service. Each worker probes all the interfaces of the worker index + 1 (in a ring manner) and only keeps the routed interfaces. Function returns the intersection of the set of all the routed interfaces on all the workers. :param all_host_names: List of addresses. For example, ['worker-0','worker-1'] ['10.11.11.11', '10.11.11.12'] :type all_host_names: list(string) :param local_host_names: host names that resolve into a local addresses. :type local_host_names: set :param settings: the object that contains the setting for running horovod :type settings: horovod.runner.common.util.settings.Settings :return: example: ['eth0', 'eth1'] :rtype: list[string] """
    # Launch a TCP server called service service on the host running horovod
    num_hosts = len(all_host_names)
    driver = HorovodRunDriverService(num_hosts, settings.key, settings.nics)

    # Have all the workers register themselves with the service service.
    _launch_task_servers(all_host_names, local_host_names,
                         driver.addresses(), settings)
    try:
        return _run_probe(driver, settings, num_hosts)
    finally:
        driver.shutdown()
Copy the code

2.6 launch_gloo_elastic

Here, the GLOO elastic system is officially invoked, which is to generate the relevant part of the Driver & establish the worker of elastic training.

During run, there is only one RendezvousServer, and launch_gloo_elastic runs only once.

The logic is as follows:

  • If the output file needs to be configured, create it.
  • Create ElasticDriver using “discovery script” as parameters.
  • Start the Rendezvous ousServer with create_rendezvous ous_handler as the handler;
  • Use driver.wait_for_available_slots to wait for the minimum number of slots required;
  • If it does, it calls get_common_interfaces to get the network route, and so on, to get the server IP.
  • Register shutdown event;
  • Use get_run_command to get the command to run;
  • Use _create_ELAStic_worker_fn to create flexibility training worker;

The simple code is as follows:

def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous) :
    # Make the output directory if it does not exist
    if settings.output_filename:
        _mkdir_p(settings.output_filename)

    Create ElasticDriver with "discovery script" as parameter
    driver = ElasticDriver(rendezvous, settings.discovery,
                           settings.min_np, settings.max_np,
                           timeout=settings.elastic_timeout,
                           reset_limit=settings.reset_limit,
                           verbose=settings.verbose)

    handler = create_rendezvous_handler(driver)
    global_rendezv_port = rendezvous.start(handler) # start RendezvousServer
    driver.wait_for_available_slots(settings.num_proc)

    nics = get_common_interfaces(driver) # Obtain network routing, etc
    server_ip = network.get_driver_ip(nics)

    event = register_shutdown_event()
    run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)

    # Build a flexibility training worker
    create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)

    driver.start(settings.num_proc, create_worker)
    res = driver.get_results()
    driver.stop()

    for name, value in sorted(res.worker_results.items(), key=lambda item: item[1] [1]):
        exit_code, timestamp = value
Copy the code

It’s complicated, so we have to go through it one by one.

First, let’s look at get_run_command, which was introduced earlier in Spark Gloo and is covered here.

It calls create_run_env_vars to get the information gloo needs and builds run_Command based on it, which looks like this:

HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python 
Copy the code

As you can see, the elastic and spark gloo versions are similar in that they use RendezvousServer to perform some of the master’s control functions.

Second, let’s look at the Driver body.

0x03 Driver Main

3.1 ElasticDriver

Defined as follows, the basic members are:

  • _rendezvous: The driver reperforms a rendezvous Server based on the currently running node. This rendezvous stores the address of each worker and the sequence rank assigned to it in the logical communication ring;

  • _host_manager: HostManager is responsible for discovering and managing various hosts;

  • _worker_registry :WorkerStateRegistry

  • _discovery_thread: is responsible for the background to periodically explore the host, the specific call _host_manager to complete the function;

  • _worker_Clients: WorkerNotificationClient, one for each worker;

  • _host_assignments: Host assignments;

  • _rank_Assignments: Rank assignments. A rank can be thought of as representing a training process in a distributed task. Rank 0 usually has a special meaning in Horovod: it is the device responsible for this synchronization.

  • _world_size: total number of processes. Training will not start until all world_size processes are ready.

  • _wait_hosts_cond: The type is threading.Condition. The goal is to wait for the minimum number of hosts required for training;

Specific definitions are as follows:

class ElasticDriver(object) :
    def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0) :
        self._rendezvous = rendezvous
        self._host_manager = HostManager(discovery)

        self._host_assignments = {}
        self._rank_assignments = {}
        self._world_size = 0

        self._wait_hosts_cond = threading.Condition()

        self._create_worker_fn = None
        self._worker_clients = {}

        self._worker_registry = WorkerStateRegistry(self, self._host_manager, reset_limit=reset_limit)
        self._results = ResultsRecorder()
        self._shutdown = threading.Event()

        self._discovery_thread = threading.Thread(target=self._discover_hosts)
        self._discovery_thread.daemon = True
        self._discovery_thread.start()
Copy the code

After creating the Driver, the main actions are:

  • Wait for minimum number of hosts.
  • Configuration of the worker.
  • When the driver is started, the worker is started internally.
  • The Driver waits for the running result of the worker.

Let’s do it step by step.

3.2 Waiting for the minimum number of hosts

After startup, driver.wait_for_available_slots(settings.num_proc) is called to wait for the minimum number of hosts.

As you can see, this is an infinite wait loop, and it will only return if avail_slots >= min_NP and avail_hosts >= min_hosts. The number of hosts on the current_hosts server is the minimum number of required hosts and the number of slots.

def wait_for_available_slots(self, min_np, min_hosts=1) :
    tmout = timeout.Timeout(self._timeout,  message=' ')
    self._wait_hosts_cond.acquire()

    try:
        while True: # Infinite loop wait
            current_hosts = self._host_manager.current_hosts
            
            avail_slots = current_hosts.count_available_slots()
            avail_hosts = len(current_hosts.available_hosts)
            
            if avail_slots >= min_np and avail_hosts >= min_hosts:
                return current_hosts
            if self._shutdown.is_set():
                raise RuntimeError('Job has been shutdown, see above error messages for details.')
            self._wait_hosts_cond.wait(tmout.remaining())
            tmout.check_time_out_for('minimum number of slots to become available')
    finally:
        self._wait_hosts_cond.release()
Copy the code

The logic is as follows:

      launch_gloo_elastic
               +
               |
               |
               |
               |                                +----------------+
               v                                |   HostManager  |
+--------------+------------------+   wait      |                |
| driver.wait_for_available_slots | +---------> |                |
+---------------------------------+             |  current_hosts |
                                                +----------------+
Copy the code

3.3 configure the worker

The configuration process is completed by _create_ELAStic_worker_fn.

_create_ELAStic_worker_fn has two parts:

  • _slot_info_to_command_fn creates the slot_info_to_command by combining various environment variables with the run_command command, similar to the previous article. Get a command text that can be run on “some host and slot”;
  • Returns the create_worker.
    • Create_worker is a function built using exec_command and command text.
    • Exec_command, which we talked about earlier, provides the ability to run commands, or the environment to run commands;
    • So create_worker provides the ability to run a command in an environment;

The relationship between these concepts is as follows:

3.4 to start the driver

driver.start(settings.num_proc, create_worker)
Copy the code

The specific startup goes through the following steps.

3.4.1 track start

def start(self, np, create_worker_fn) :
    self._create_worker_fn = create_worker_fn
    self._activate_workers(np)
Copy the code

3.4.2 _activate_workers

The resume/start function of ElasticDriver calls _activate_workers, which is defined as follows. You can see that if the discovery script has discovered the new node, it returns pending_slots. Pending_slots is where new workers can be started on top of these slots, so _start_worker_processes is called:

def _activate_workers(self, min_np) :
    current_hosts = self.wait_for_available_slots(min_np)
    pending_slots = self._update_host_assignments(current_hosts)
    self._worker_registry.reset(self.world_size())
    self._start_worker_processes(pending_slots)
Copy the code

Rule 3.4.3 _start_worker_processes

After startup, start the worker in a thread through run_worker, and then use self._results.expect(thread) to put the worker thread into the ResultsRecorder. This is the key to waiting for results.

def _start_worker_processes(self, pending_slots) :
    for slot_info in pending_slots:
        self._start_worker_process(slot_info)

def _start_worker_process(self, slot_info) :
    create_worker_fn = self._create_worker_fn
    shutdown_event = self._shutdown
    host_event = self._host_manager.get_host_event(slot_info.hostname)

    def run_worker() :
        res = create_worker_fn(slot_info, [shutdown_event, host_event])
        exit_code, timestamp = res
        self._handle_worker_exit(slot_info, exit_code, timestamp)

    thread = threading.Thread(target=run_worker) # Start the training thread
    thread.daemon = True
    thread.start()
    self._results.expect(thread) Wait for the result of the run
Copy the code

3.5 Waiting for the Running Result

The Driver uses the following to obtain the result.

def get_results(self) :
    return self._results.get_results()
Copy the code

_results is the ResultsRecorder type, so we need to look at the implementation.

3.5.1 track of ResultsRecorder

Several functions are as follows:

  • Expect waits for threads: Use Expect to self._worker_threads.put(worker_thread) so you know which threads to wait for.
  • Add_result Add result:_handle_worker_exitIt’s called after recordself._results.add_result(name, (exit_code, timestamp))Record results;
  • Get_results: The driver calls this function to get the result, using a join.
class ResultsRecorder(object) :
    def __init__(self) :
        self._error_message = None
        self._worker_results = {}
        self._worker_threads = queue.Queue()

    def expect(self, worker_thread) :
        self._worker_threads.put(worker_thread)

    def add_result(self, key, value) :
        if key in self._worker_results:
            return
        self._worker_results[key] = value

    def get_results(self) :
        while not self._worker_threads.empty():
            worker_thread = self._worker_threads.get()
            worker_thread.join()
        return Results(self._error_message, self._worker_results)
Copy the code

3.5.2 worker end

The Driver uses _handle_worker_exit to wait for the specific worker to finish. The worker’s return determines what to do.

_handLE_worker_exit is run in worker Thread. At run time, it will register information in the ResultsRecorder via self._results.add_result.

def _handle_worker_exit(self, slot_info, exit_code, timestamp) :
    if not self.has_rank_assignment(slot_info.hostname, slot_info.local_rank):
        # Ignore hosts that are not assigned a rank
        return

    if exit_code == 0: # Complete the record successfully
        rendezvous_id = self._worker_registry.record_success(slot_info.hostname, slot_info.local_rank)
    else: Otherwise the record fails
        rendezvous_id = self._worker_registry.record_failure(slot_info.hostname, slot_info.local_rank)

    if self.finished() and self._worker_registry.last_rendezvous() == rendezvous_id:
        name = '{} ({}).format(slot_info.hostname, slot_info.local_rank)
        self._results.add_result(name, (exit_code, timestamp)) Register information with ResultsRecorder
Copy the code

The details are as follows:

+-----------------------------+
| ElasticDriver               |
|                             |
|        start                |
|          +                  |
|          | 1                |
|          |                  |
|          v                  |
|  _activate_workers          |
|          +                  |                 +-------------------------+
|          |                  |                 | Thread                  |
|          | 2                |                 |                         |
|          v                  |                 |        run_worker       |
| _start_worker_processes     |                 |            +            |
|          +                  |                 |            |            |
|          |                  |                 |            | 7          |
|          | 3                |                 |            v            |
|          v                  |                 |     create_worker_fn    |
|  _start_worker_process      |                 |            +            |
|          +                  |                 |            |            |
|          |                  |                 |            | 8          |
|          | 4                |                 |            v            |  results.add_result
|          v                  | thread.start()  |   _handle_worker_exit +---------------------------------+
|      run_worker +---------------------------> |                         |                9              |
|          +                  |      5          +-------------------------+                               |
|          |                  |                                                                           |
|          |                  |                 +------------------------------------------------------+  |
|          v                  | expect(thread)  | ResultsRecorder                                      |  |
|     self._results  +------------------------> |                                                      |  |
|                             |      6          | _worker_results = [thread]                           |  |
|                             |                 |                                                      |  |
|                             |                 | _worker_threads = [name : (exit_code, timestamp)] <-----+
+-----------------------------+                 |                                                      |
                                                +------------------------------------------------------+
Copy the code

Or mobile phone as follows:

We’re going to look at the other parts of flexibility training in detail.

Because Driver is the main framework for flexibility training, it is inevitable that some of this text will appear in other articles, so please understand.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Horovod for ElasticDL in Kubernetes

Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes

The Elastic Training Operator is an Elastic deep learning Training tool on Kubernetes

Elastic and Fault-tolerant Distributed Training for ElasticHorovod

Horovod Flexibility training