0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This is the 16th article to look at the life cycle of workers in Horovod flexibility training.

Let’s first give a logical diagram, so that you can have a rough understanding. The left side of the diagram is the Driver part, and the right side is a Worker.

Links to other articles in this series are as follows: \

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

Horovod (10) — Run on Spark

Horovod (11) — on Spark — GLOO scheme

Horovod (12) — a distributed training framework for deep learning horoVOd (12) — an overall architecture for elastic training

Horovod (13), a distributed training framework for deep learning, is a Driver for elastic training

Horovod (14) — Elastic training discovery node & State

Horovod (15) — Broadcast & Notification

0x01 What is a Worker

First, we need to see what a worker is. In order to be written separately, this chapter recalls a lot of previous knowledge, and those who have seen the previous article can skip it.

1.1 the role

“Training” is the process of iteratively optimizing the parameters of the neural network by computing gradient descent data, and finally output the network model.

Let’s first look at the role setting in flexibility training.

Horovod’s flex training consists of two roles, the driver process and the worker process. The driver process runs on the CPU node, while the worker process can run on the CPU or GPU node. In Horovod, training processes are equal participants, and each worker process is responsible for both gradient distribution and specific gradient calculation.

These roles are still very similar to Spark’s Driver, Executor. The Driver process can be considered as the Driver or master node of Spark. Workers are similar to Spark’s Executor.

The details are as follows:

+------------------------------+ | | | Driver | | | | | +-----+-------+--------+-------+ ^ ^ ^ | | | | | | +-------------+ | +--------------+ | | | | | | | | | v v v +--------+----+ +-------+------+ +----+--------+ | Worker | |  Worker | | Worker | | | | | | | | host1 | | host2 | | host3 | +-------------+ +--------------+ +-------------+Copy the code

1.2 responsibilities

The role’s responsibilities are as follows:

Master (master node)

  • Responsible for real-time detection of whether the active worker (work node) changes and drops;
  • Responsible for real-time monitoring of host changes;
  • Responsible for assigning tasks to the living worker (work node);
  • In the case that AllReduce fails due to a process failure, the master organizes the remaining living processes through the blacklist mechanism to construct a new ring.
  • If a new host is added, a new worker is generated, and the new worker and the old worker form a new ring together.

Worker (s) responsibilities:

  • Responsible for reporting (actually passive, without active mechanism) the status of the current worker (work node) (that is, the training completion);
  • Is responsible for performing training on the data that the worker is responsible for.

1.3 Networking Mechanism

Horovod uses NCCL for communication on multiple Gpus of a single machine and ring-based AllReduce algorithm for communication between multiple machines.

Horovod’s flex training is multi-machine flex training. In multi-machine ring-based communication, each worker node has a left neighbor and a right neighbor, and each worker only sends data to its right neighbor and receives data from its left neighbor.

1.3.1 ring

The Driver process is used to help the worker call GLOo to construct the AllReduce communication ring.

When Horovod calls Gloo to construct the communication domain, Horovod needs to create a RendezvousServer with KVStore for Gloo, The KVStore is used to store information such as the host address of each node in the communication domain and the rank assigned to the node in the logical communication ring.

The build process is as follows:

  • The Driver process creates a RendezvousServer with KVStore, that is, the RendezvousServer runs in the Driver process of Horovod.
  • After the Driver process gets the IP addresses and GPU card number information of all worker process nodes, it writes them into the KVStore of the RendezvousServer.
  • Each worker node requests RendezvousServer to obtain its neighbor node information (IP, port…) by invoking gloo. To construct the communication domain.

1.3.2 Elastic Build

When a worker fails or a new worker joins the training, each worker will stop the current training, record the number of steps of the current model iteration, and try to re-initialize AllReduce’s communication domain.

1.3.2.1 Driver monitoring

Because the driver process is always monitoring the status of the worker and host node, so

  • When the host changes, when the driver process finds that a node is marked as new or removed through the node discovery script, it will send a notification to all workers. The next time state.com MIT () or the lighter state.check_host_updates() is called, a HostsUpdateInterrupt exception is thrown.
  • When any worker fails, the driver will obtain the host of the living worker again.
1.3.2.2 Driver is rebuilt

To prevent other worker processes from exiting, Horovod catches the exceptions thrown by Gloo and passes them to the wrapped Python API. In turn, the Driver reconfigures the RendezvousServer so that the worker node can restructure the communication domain. So Horovod is fault-tolerant.

Worker processes can use this rendezvous to construct new communication domains if there are host changes. When the new communication domain is constructed successfully, the worker with rank=0 will broadcast its model to other workers, and then start training according to the number of iterative steps stopped last time.

The networking mechanism is as follows:

+-------------------------------+ | Driver | | | | +------------------------+ | | | RendezvousServer | | | | | | | | | |  | | host1, host2, host3 | | | +------------------------+ | +-------------------------------+ ^ ^ ^ | | | | | | +-------------+ | +--------------+ | | | | | | | | | v v v +--------+----+ +-------+------+ +----+--------+ | Worker | | Worker | | Worker  | +------> | +------> | +---------> | | +------+ | | host1 | | host2 | | host3 | | | +-------------+ +--------------+ +-------------+ | | | | | | v <--------------------------------------------------------------------------------+Copy the code

Therefore, this article will look at an overall life process of workers.

0x02 Total life flow

In the launch_gloo_elastic of the Driver, the following code is responsible for starting the worker.

  • Command is an executable command passed in, such aspython train.py. So when you go through get_run_command, you get env, Python, train.py, something like that, and you just add the environment variable, and you’re ready to run.
  • Exec_command looks like the following:exec_command = _exec_command_fn(settings)Is to generate an executable command environment based on various configurations.
run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)
​
create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)
​
driver.start(settings.num_proc, create_worker)
​
res = driver.get_results()
Copy the code

Three detailed processes can be clearly seen (because the training process is inside the worker, the driver analysis does not go into this part) :

  • _create_ELAStic_worker_fn is the configuration process.
  • Driver. start is the start process;
  • Driver. get_results is the result of the & registration run;

We will analyze these three processes in detail.

0x03 Configuration process

The configuration process is completed by _create_ELAStic_worker_fn, which provides the ability to run a command in a certain environment.

_create_ELAStic_worker_fn has two parts:

  • _slot_info_to_command_fn creates the slot_info_to_command by combining various horovod environment variables with the run_command command, similar to the previous article. Get a command text that can be run on “some host and slot”;

  • Returns the create_worker.

    • Create_worker is a function built using exec_command and command text.
    • Exec_command, which we talked about earlier, provides the ability to run commands, or the environment to run commands;
    • So create_worker provides the ability to run a command in an environment;
Def _slot_info_to_command_fn(run_command, env): def slot_info_to_command(slot_info): """ Given a slot_info, creates a command used by gloo to launch a single job. :param slot_info: host and slot to execute the run command on """ env_vars = create_slot_env_vars(slot_info) horovod_rendez_env = " ".join( [f"{k}={str(v)}" for k, v in env_vars.items()]) return '{horovod_env} {env} {run_command}' .format( horovod_env=horovod_rendez_env, env=' '.join(['%s=%s' % (key, quote(value)) for key, value in env.items() if env_util.is_exportable(key)]), run_command=run_command) return slot_info_to_command def _create_elastic_worker_fn(exec_command, run_command, env, event): Get_with_env = _slot_info_to_command_fn(run_command, env) # def create_worker(slot_info, events): command = get_command_with_env(slot_info) events = [event] + (events or []) return exec_command(command, slot_info, events) return create_workerCopy the code

So, the final create_worker is:

Command (python "train". Py) + | | get_run_command | | v run_command (env python "train". Py) # get similar env command python "train". The p y + | | _slot_info_to_command_fn | | v {horovod_env} {env} {run_command} # get command similar horovod_env env python "train". The p y + | | Create_worker exec_command | | | v = exec_command ({horovod_env} {env} {run_command}) # have the ability to run a command in a certain environmentCopy the code

This allows create_worker to run the training code directly.

0x04 Start Process

Create_worker = _create_ELAStic_worker_fn provides the ability to run a command in an environment, because the create_worker method internally includes the execution command and environment, that is, Just run create_worker and you can train automatically. Let’s use this ability to start the worker.

The startup process is basically done in the start method of the ElasticDriver class.

4.1 Overall Logic

The following logic runs in ElasticDriver.

  • First, the create_worker generated above is assigned to self._create_worker_fn.

  • Second, _activate_workers is called to start multiple workers, including:

    • Use wait_for_AVAILable_slots to wait for min_NP number of available hosts. We analyzed this function before, which is an infinite loop waiting ifavail_slots >= min_np and avail_hosts >= min_hostsWill return.
    • Use _update_host_Assignments to get slots;
    • Use _start_worker_processes to start multiple workers;
def _activate_workers(self, min_np):
    current_hosts = self.wait_for_available_slots(min_np)
    pending_slots = self._update_host_assignments(current_hosts)
    self._worker_registry.reset(self.world_size())
    self._start_worker_processes(pending_slots)
​
def start(self, np, create_worker_fn):
    self._create_worker_fn = create_worker_fn
    self._activate_workers(np)
Copy the code

Let’s look at each of them.

4.2 the assignment

The first step is to assign the create_worker generated above to self._create_worker_fn.

                 command (python train.py)
                            +
                            |
                            |
            get_run_command |
                            |
                            v
​
                 run_command(env python train.py)
                            +
                            |
                            |
   _slot_info_to_command_fn |
                            |                                          +-----------------------------+
                            v                                          |                             |
                                                                       |                             |
               {horovod_env} {env} {run_command}                       |     ElasticDriver           |
                            +                                          |                             |
                            |                                          |                             |
                            |                                          |                             |
               exec_command |                                          |                             |
                            |                                          |                             |
                            |                                          |                             |
                            v                                       1  |                             |
+---------------------------+------------------------------------+     |                             |
| create_worker = exec_command({horovod_env} {env} {run_command})| <------------+  _create_worker_fn |
+----------------------------------------------------------------+     |                             |
                                                                       |                             |
                                                                       +-----------------------------+
​
Copy the code

Mobile phones are as follows:

0x04 Start Process

Create_worker = _create_ELAStic_worker_fn provides the ability to run a command in an environment, because the create_worker method internally includes the execution command and environment, that is, Just run create_worker and you can train automatically. Let’s use this ability to start the worker.

The startup process is basically done in the start method of the ElasticDriver class.

4.1 Overall Logic

The following logic runs in ElasticDriver.

  • First, the create_worker generated above is assigned to self._create_worker_fn.

  • Second, _activate_workers is called to start multiple workers, including:

    • Use wait_for_AVAILable_slots to wait for min_NP number of available hosts. We analyzed this function before, which is an infinite loop waiting ifavail_slots >= min_np and avail_hosts >= min_hostsWill return.
    • Use _update_host_Assignments to get slots;
    • Use _start_worker_processes to start multiple workers;
def _activate_workers(self, min_np):
    current_hosts = self.wait_for_available_slots(min_np)
    pending_slots = self._update_host_assignments(current_hosts)
    self._worker_registry.reset(self.world_size())
    self._start_worker_processes(pending_slots)
​
def start(self, np, create_worker_fn):
    self._create_worker_fn = create_worker_fn
    self._activate_workers(np)
Copy the code

Let’s look at each of them.

4.2 the assignment

The first step is to assign the create_worker generated above to self._create_worker_fn.

                 command (python train.py)
                            +
                            |
                            |
            get_run_command |
                            |
                            v
​
                 run_command(env python train.py)
                            +
                            |
                            |
   _slot_info_to_command_fn |
                            |                                          +-----------------------------+
                            v                                          |                             |
                                                                       |                             |
               {horovod_env} {env} {run_command}                       |     ElasticDriver           |
                            +                                          |                             |
                            |                                          |                             |
                            |                                          |                             |
               exec_command |                                          |                             |
                            |                                          |                             |
                            |                                          |                             |
                            v                                       1  |                             |
+---------------------------+------------------------------------+     |                             |
| create_worker = exec_command({horovod_env} {env} {run_command})| <------------+  _create_worker_fn |
+----------------------------------------------------------------+     |                             |
                                                                       |                             |
                                                                       +-----------------------------+
​
Copy the code

Mobile phones are as follows:

4.3 Obtaining Host Information

Next, we use _update_host_Assignments to obtain slots in two steps:

First build the allocation status between host and rank.

The second

4.3.1 Updating host and Rank

Rendezvous is rebuilt in the _update_host_Assignments function based on the latest host information, such as:

Self. _rendezvous. Init (host_assignments_list).

The specific logic is:

  • Get active slot:active_slots;
  • Get host assignment;
  • Ensure that each worker has a precursor, that is, it can transfer the state and form a ring;
  • Reconstruct rendezvous by calling self._rendezvous. Init;
  • Allocate the relationship between rank and slot;
  • Return pending_slots, which is among the assigned slots, not in the active slot listactive_slotsIn the. The inactive ones are the ones that can start the new worker next.
def _update_host_assignments(self, current_hosts): Determine the slots that are already filled so we do not respawn these processes set([(host, slot_info.local_rank) for host, slots in self._host_assignments.items() for slot_info in slots]) # Adjust the host assignments to account for added / removed hosts host_assignments, host_assignments_list = self._get_host_assignments(current_hosts) if len(self._host_assignments) > 0: # Ensure that at least one previously active host is still assigned, otherwise there is no # way to sync the state to the new workers prev_hosts = self._host_assignments.keys() next_hosts =  host_assignments.keys() if not prev_hosts & next_hosts: raise RuntimeError('No hosts from previous set remaining, unable to broadcast state.') self._host_assignments = host_assignments self._world_size = len(host_assignments_list) Rendezvous # Rank Assignments Map from world Rank to slot info rank_assignments = {} for slot_info in host_assignments_list: rank_assignments[slot_info.rank] = slot_info self._rank_assignments = rank_assignments # Get the newly assigned slots that need to be started pending_slots = [slot_info for host, slots in self._host_assignments.items() for slot_info in slots if (host, slot_info.local_rank) not in active_slots] return pending_slotsCopy the code

4.3.2 Obtaining a Host and a Rank

Host information is done with _get_host_Assignments

def _get_host_assignments(self, current_hosts):
    # Adjust the host assignments to account for added / removed hosts
    host_list = [hosts.HostInfo(host, current_hosts.get_slots(host))
                 for host in current_hosts.host_assignment_order]
    host_assignments_list = hosts.get_host_assignments(host_list, self._min_np, self._max_np)
    host_assignments = defaultdict(list)
    for slot_info in host_assignments_list:
        host_assignments[slot_info.hostname].append(slot_info)
    return host_assignments, host_assignments_list
Copy the code

_get_host_Assignments Calls get_host_assignments to perform services.

Get_host_assignments will assign the processes in the Horovod according to host and process vacancies, i.e. giving the corresponding relationship between a Horovod rank and slot. If you set np, you have a number of slots.

The given allocation scheme is similar to the following, so that we know which rank corresponds to which slot on which host:

[
  SlotInfo(hostname='h1', rank=0, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),
    SlotInfo(hostname='h2', rank=1, local_rank=0, cross_rank=0, size=2, local_size=2, coress_size=1),
]
​
Copy the code

The code is as follows:

def get_host_assignments(hosts, min_np, max_np=None): """Assign hosts with process capacities (slots) to ranks in the Horovod process. This function will try to allocate as many as possible processes on the same host to leverage local network. :param hosts: list of HostInfo objects describing host and slot capacity :type hosts: list[HostInfo] :param min_np: minimum number of processes to be allocated :param max_np: (optional) maximum number of processes to be allocated :return: a list of the allocation of process on hosts in a `SlotInfo` object. :rtype: List [SlotInfo] """ Ranks = [] cross_ranks = collections.dict (dict) rank = 0 Local rank, cross Rank (hierarchical AllReduce required) for host_info in hosts: ranks = [] for local_rank in range(host_info.slots): if rank == max_np: break ranks.append(rank) rank += 1 cross_ranks_at_local = cross_ranks[local_rank] cross_ranks_at_local[host_info.hostname] = len(cross_ranks_at_local) host_ranks.append((host_info, Ranks)) world_size = rank # Give a horovod rank and slot mapping. Alloc_list = [] for host_info, ranks in host_ranks: local_size = len(ranks) for local_rank, rank in enumerate(ranks): cross_ranks_at_local = cross_ranks[local_rank] cross_rank = cross_ranks_at_local[host_info.hostname] cross_size = len(cross_ranks_at_local) alloc_list.append( SlotInfo( hostname=host_info.hostname, rank=rank, local_rank=local_rank, cross_rank=cross_rank, size=world_size, local_size=local_size, cross_size=cross_size)) return alloc_listCopy the code

4.3.3 Extending logic

Step 4.2 and Step 4.3 later, some variables are assigned to ElasticDriver. We simplified the figure exec_command as follows (the first step is shown in the figure above) :

                                                           From Host discovery
                                                                   +
+------------------------------+                                   |
| ElasticDriver                |                                   |
|                              |                                   |  2
|                              |       wait_for_available_slots    |
|    wait_for_available_slots  |                                   |
|                              |                                   v
|   _update_host_assignments   |                              current_hosts
|                              |                                   +
|          _rendezvous         |                                   |
|                              |       _update_host_assignments    |
|         _host_assignments    |                                   |  3
|                              |          self._rendezvous.init    |
|         _world_size          |                                   |
|                              |                                   |
|         _rank_assignments    |                                   |
|                              |                                   v
|         _create_worker_fn+---------+                        pending_slots
|                              |     |
|                              |     |
+------------------------------+     |
                                     |
                                     |
                                     v
 +-----------------------------------+------------+
 | exec_command({horovod_env} {env} {run_command})|
 +------------------------------------------------+


Copy the code

4.4 start

_start_worker_Processes completes the startup process with the following logical increment.

  • Create_worker = exec_command({horovod_env} {env} {run_command});
  • Executing create_worker_fn(slot_info, [shutdown_event, host_event]) in run_worker() runs the training code;
  • Threading.thread (target=run_worker) runs the training code in a Thread;
  • _start_worker_processes runs multiple copies of training code in multiple threads;
def _start_worker_processes(self, pending_slots):
    for slot_info in pending_slots:
        self._start_worker_process(slot_info)
​
def _start_worker_process(self, slot_info):
    create_worker_fn = self._create_worker_fn
    shutdown_event = self._shutdown
    host_event = self._host_manager.get_host_event(slot_info.hostname)
​
    def run_worker():
        res = create_worker_fn(slot_info, [shutdown_event, host_event])
        exit_code, timestamp = res
        self._handle_worker_exit(slot_info, exit_code, timestamp)
​
    thread = threading.Thread(target=run_worker)
    thread.daemon = True
    thread.start()
    self._results.expect(thread)
​
Copy the code

After 4 steps, count (slot_info) so many threads are started, and each Thread has one _create_worker_fn running in one Slot:

+------------------------------+ | ElasticDriver | From Host discovery | | + | _start_worker_processes | | | | | | wait_for_available_slots | | 2 | | wait_for_available_slots | | _update_host_assignments | | | | v | _rendezvous | current_hosts | | + | _host_assignments | | | | _update_host_assignments | | _world_size | | 3 | | self._rendezvous.init  | | _rank_assignments | | | | | | _create_worker_fn+---------+ | | | | v | _worker_registry | | pending_slots | | | + |  | | | +------------------------------+ | _start_worker_processes | 4 | | | | v | +-----------------------------------+------------+ | | exec_command({horovod_env} {env} {run_command})| | +---+--------------------------------------------+ | ^ v | +-----------------+-----------+------------------+ | | | | | | | | | | v v +-----------------------------------+---+ +-------+---------+ +----------+-+ | | Thread 1 | | Thread 2 | |  Thread n | | +----------------------------------+ | | | | | | | | run_worker | | | | | | | | | | | | run_worker | . | run_worker | | | | | | | | | | | | +----+ create_worker_fn( slot 1 )| | | slot 2 | | slot n | | | | | | | | | | +----------------------------------+ | | | | | +---------------------------------------+ +-----------------+ +------------+Copy the code

Mobile phones are as follows:

0x05 Running process

Exec_command ({horovod_env} {env} {run_command}) This is called user code for training.

thread = threading.Thread(target=run_worker)
thread.daemon = True
thread.start()
self._results.expect(thread)
​
Copy the code

Self._results = ResultsRecorder(). Where each Thread running is recorded.

class ResultsRecorder(object):
    def __init__(self):
        self._error_message = None
        self._worker_results = {}
        self._worker_threads = queue.Queue()
​
    def expect(self, worker_thread):
        self._worker_threads.put(worker_thread)
​
Copy the code

So our logic is as follows: self._results records all Threads:

+------------------------------+ | ElasticDriver | From Host discovery | | + | | | | | | | _start_worker_processes | | 2  | | wait_for_available_slots | | wait_for_available_slots | | | | v | _update_host_assignments | current_hosts | | + | _rendezvous | | | | _update_host_assignments | | _host_assignments | | 3 | | self._rendezvous.init | | _world_size | | |  | | | _rank_assignments | | | | v | _create_worker_fn+---------+ pending_slots | | | + +---------+ _results | | | | | |  | _start_worker_processes | 4 | +------------------------------+ | | | | | | v | | +-----------------------------------+------------+ | | | exec_command({horovod_env} {env} {run_command})| | | +---+--------------------------------------------+ | | ^ v | | +-----------------+-----------+------------------+ | | | | | | | | | | | | | v v | +-----------------------------------+---+ +-------+---------+ +----------+-+ | | | Thread 1 | | Thread 2 | | Thread n | | | +----------------------------------+ | | | | | | | | | run_worker | | | | | | | | | | | | | run_worker | ...... | run_worker | | | | | | | | | | | | | | +----+ create_worker_fn( slot 1 )| | | slot 2 | | slot n | | | | | | | | | | | | +----------------------------------+ | | | | | | +---------------------------------------+ +--------+--------+ + + -- -- -- -- -- -- -- -- + | ^ ^ ^ | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - +Copy the code

Mobile phones are as follows:

0x06 Register, Result & harmonize

This section mainly corresponds to the following three sections of the architecture diagram.

The logical level of this section is above the fault tolerance mechanism described later. The fault tolerance mechanism is inside the Worker, and this part is above the Worker.

6.1 Logical hierarchy of workers

The worker is the stage on top of the training script, that is, the worker runs the training script using “Python train.py”.

So, let’s simplify the figure below:

+------------------------------+ | ElasticDriver | From Host discovery | | + | _start_worker_processes | | | | | | wait_for_available_slots | | 2 | | wait_for_available_slots | | _update_host_assignments | | | | v | _rendezvous | current_hosts | | + | _host_assignments | | | | _update_host_assignments | | _world_size | | 3 | | self._rendezvous.init  | | _rank_assignments | | | | | | _create_worker_fn | | | | v | _worker_registry | pending_slots | | + +---------+ _results | | | | | _start_worker_processes | 4 | +------------------------------+ | | v | +-----------------+-----------+------------------+ | | | | | | | | | | v v | +-----------------------------------+---+ +-------+---------+ +------------+-+ | | Thread 1 slot 1 | | Thread 2 | | Thread n | | | | | | | | | | +---------------------------------+ | | slot 2 | | slot n | | | | worker 1 | | | +-----------+ | ...... | +---------+ | | | | | | | |worker 2 | | | |worker n | | | | | Python train.py | | | | | | | | | | | | | | | | | | | | | | | | | +---------------------------------+ | | +-----------+ | | +---------+ | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ ^ ^ | | | | +------------------+---------------------------------------+--------------------------+Copy the code

Mobile phones are as follows:

6.2 Worker running phase

So we asked the following new questions:

  • How does a worker run as a phase? How many phases (states) are there?
  • According to what characteristics does the Driver record the running result of the Worker?

From the source code, we can see that the Worker has three states as follows:

READY = 'READY'
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
​
Copy the code

Therefore, Worker can be divided into four stages. RUNNING is added by myself, which is the process of RUNNING the training script. Officially, there is no such state, but I think it should be clearer. SUCCESS and FAILURE are scripts that run successfully.

          Worker
            +
            |
            |
            |
            v
          READY
            +
            |
            |
            v
   +--------+---------+
   | RUNNINZG         |
   |                  |
   |  Python train.py |
   |                  |
   +--------+---------+
            |
            |
            |
            v
   +--------+--------+
   |                 |
   |                 |
   v                 v
​
SUCCESS           FAILURE
​
Copy the code

Let’s look at the run phase next.

6.2.1 entering the C++ world

Hvd.init is called when the Driver initializes/resumes (e.g. HostsUpdateInterrupt is received).

There are several ways to enter hvd.init (in the following 1,2,3 logical order) :

  1. Rely on workerStateregistry._barrier: When all workers are finished, they will be processed further. There are three ways to trigger a barrier:

    • Start a worker, the worker will init it, then call gloo in c++, then contact rendezvous, rendezvous driver, Set your state to READY in the WorkerStateRegistry. If min_NP is reached, it will trigger_barrierWay to 1);
    • When a new host is discovered, a HostsUpdateInterrupt is triggered. When the worker catches the exception, it resets. When resets, hvd.init is called, and, as above, it fires_barrierWay 2);
    • The worker fails and will call the_handle_worker_exitIf you set your WorkerStateRegistry status to FAILURE, this will trigger_barrierWay 3);
  2. As the _barrier continues, it calls the handler set at build time, the _action function, which in turn calls _on_workers_recorded, which in turn calls self._driver.resume();

  3. The resume function will generate self._activate_workers(self._min_NP) and eventually regenerate (perhaps partially, as determined by Pending_slots) the worker.

6.2.2 build Gloo

As mentioned earlier, when calling hvd.init in the python world, you enter the C++ world, and when GLOO is compiled, a GlooContext is created.

namespace {
​
// All the Horovod state that must be stored globally per-process.
HorovodGlobalState horovod_global;
​
#if HAVE_GLOO
GlooContext gloo_context;
#endif
Copy the code

GlooContext has an interface to communicate with RendezvousServer.

6.2.2.1 Go to Rendezvous for information

As you can see from GlooContext::Initialize, you need to get a lot of configuration information, the most important of which is the rank information.

This information is stored in the RendezvousServer setting. So GlooContext will RendezvousServer for the HTTP interaction, and the code below is fairly straightforward.

#define HOROVOD_GLOO_GET_RANK_AND_SIZE "rank_and_size" void GlooContext::Initialize(const std::string& gloo_iface) { // Create a device for communication // TODO(sihan): Add support for multiple interfaces: // https://github.com/facebookincubator/gloo/issues/190 attr device_attr; device_attr.iface = gloo_iface; device_attr.ai_family = AF_UNSPEC; auto dev = CreateDevice(device_attr); auto timeout = GetTimeoutFromEnv(); auto host_env = std::getenv(HOROVOD_HOSTNAME); std::string hostname = host_env ! = nullptr ? std::string(host_env) : std::string("localhost"); int rank = GetIntEnvOrDefault(HOROVOD_RANK, 0); int size = GetIntEnvOrDefault(HOROVOD_SIZE, 1); int local_rank = GetIntEnvOrDefault(HOROVOD_LOCAL_RANK, 0); int local_size = GetIntEnvOrDefault(HOROVOD_LOCAL_SIZE, 1); int cross_rank = GetIntEnvOrDefault(HOROVOD_CROSS_RANK, 0); int cross_size = GetIntEnvOrDefault(HOROVOD_CROSS_SIZE, 1); auto rendezvous_addr_env = std::getenv(HOROVOD_GLOO_RENDEZVOUS_ADDR); auto rendezvous_port = GetIntEnvOrDefault(HOROVOD_GLOO_RENDEZVOUS_PORT, -1); bool elastic = GetBoolEnvOrDefault(HOROVOD_ELASTIC, false); if (elastic && reset_) { std::string server_addr = rendezvous_addr_env; std::string scope = HOROVOD_GLOO_GET_RANK_AND_SIZE; HTTPStore init_store(server_addr, rendezvous_port, scope, rank); auto key = hostname + ":" + std::to_string(local_rank); std::vector<char> result = init_store.get(key); std::string s(result.begin(), result.end()); std::stringstream ss(s); int last_rank = rank; int last_size = size; int last_local_rank = local_rank; int last_local_size = local_size; int last_cross_rank = cross_rank; int last_cross_size = cross_size; rank = ParseNextInt(ss); size = ParseNextInt(ss); local_rank = ParseNextInt(ss); local_size = ParseNextInt(ss); cross_rank = ParseNextInt(ss); cross_size = ParseNextInt(ss); SetEnv(HOROVOD_RANK, std::to_string(rank).c_str()); SetEnv(HOROVOD_SIZE, std::to_string(size).c_str()); SetEnv(HOROVOD_LOCAL_RANK, std::to_string(local_rank).c_str()); SetEnv(HOROVOD_LOCAL_SIZE, std::to_string(local_size).c_str()); SetEnv(HOROVOD_CROSS_RANK, std::to_string(cross_rank).c_str()); SetEnv(HOROVOD_CROSS_SIZE, std::to_string(cross_size).c_str()); } ctx = Rendezvous(HOROVOD_GLOO_GLOBAL_PREFIX, rendezvous_addr_env, rendezvous_port, rank, size, dev, timeout); local_ctx = Rendezvous(HOROVOD_GLOO_LOCAL_PREFIX + hostname, rendezvous_addr_env, rendezvous_port, local_rank, local_size, dev, timeout); cross_ctx = Rendezvous(HOROVOD_GLOO_CROSS_PREFIX + std::to_string(local_rank), rendezvous_addr_env, rendezvous_port, cross_rank, cross_size, dev, timeout); }Copy the code
6.2.2.2 RendezvousServer

RendezvousServer provides the GET method externally.

# GET methods
GET_RANK_AND_SIZE = 'rank_and_size'
​
def _get_value(self, scope, key):
    if scope == GET_RANK_AND_SIZE:
        host, local_rank = key.split(':')
        return self._get_rank_and_size(host, int(local_rank))
​
    return super(RendezvousHandler, self)._get_value(scope, key)
Copy the code

ElasticRendezvousHandler is RendezvousServer response handler, which ElasticRendezvousHandler _get_rank_and_size function is:

def _get_rank_and_size(self, host, local_rank):
    driver.record_ready(host, local_rank)
    slot_info = driver.get_slot_info(host, local_rank)
    return slot_info.to_response_string().encode('ascii')
Copy the code

Record_ready is called to inform the driver that a worker is in the READY state.

def record_ready(self, host, slot):
    self._worker_registry.record_ready(host, slot)
Copy the code

6.2.3 Entering the READY State

After the call to hvd.init —–> GlooContext is established, it communicates with the RendezvousServer, after which the Worker enters the READY state.

We need to dig deeper and see what happens between when a worker starts running and when it is in the READY state.

  1. The Worker starts calling Python train.py;
  2. In train.py, call hvd.init(), which dives into the C++ world to generate GlooContext;
  3. In GlooContext, you will get the IP and port of Rendezvous Server from the environment variables, and then call init_store to generate an HTTPStore;
  4. callinit_store.get(hostname + ":" + std::to_string(local_rank))Send a request to Rendezvous Server to obtain the various configurations corresponding to the worker’s rank (local_rank, cross_rank… Because Rendezvous Server might be reinitialized for reallocation);
  5. Elasticrendezshandler is the response function that calls driver.record_ready(host, local_rank) to be recorded in the READY dictionary of the WorkerStateRegistry, Worker 2 is READY.
  6. Driver.get_slot_info (host, local_rank) is called to get the slot info from the driver;
  7. In this case, the Worker’s state is READY (actually, the Worker does not have this state, but the WorkerStateRegistry does).
  8. Elasticrendezshandler returns the slot info to the worker’s C++ world;
  9. The execution continues in the worker’s C++ world, returning the slot info to the GlooContext for various Settings;

The detailed logic diagram is as follows:

Python + C++ | +--------------------+ +----------------------+ | | ElasticDriver | | RendezvousServer | | +------------>  | | | | | | | _rendezvous +---------> | handler_cls | | | | | | + | | | | | +----------------------+ | | +-------+ _worker_registry | | | | | | | v | | | +--------+-----------+ +---------------+---------------+ | | | ^ | ElasticRendezvousHandler | | | | | +---------------------+ | | 4 get(rank) +---------------+ | | | | | _get_rank_and_size +<------------------------------------------+ | HTTPStore | | | 6 | get_slot_info | | | | | | | | | +-------------------+ driver +------------------------------------------------------> | | | | | | | | | 8 slot_info (rank,local_rank...) +--+------------+ | | +-------------------------+ | | | | | | | 5 record_ready | +---------------------+ | | | ^ | | | +-------------------------------+ | | | | v v ----------------------------------+ | | | | +-----+--------+---------------+ | Worker | | | | | | WorkerStateRegistry | | + | | | | | | | | | | | 9 | | 3 | | | | | 1 | | | | | | _host_manager | | v | | | | | | | | Python train.py | | | | +--------+ _driver | | + | | +----------------------------+ | | | | | | | GlooContext | | | | _barrier | | | | | | | | | | | | v | | create an instance | | + | | +---------------------+ | | hvd.init() +----------------------------------> | | init_store | | | _workers | | | | | 2 | v | | | | | 7 | | | | gloo::Context(rank) | | | READY[]+----------------------------------> READY  | | | | | | SUCCESS[] | | | + | | | gloo::Context(local_rank) | | | FAILURE[] | | | | | | | | | | | | run_fn(train_function) | | | gloo::Context(cross_rank) | | +---------------------+ | | | | | | | +------------------------------+ | | | | +----------------------------+ | v | | | +--------+--------+ | | | | | | | | | | | | | v v | | | | | | SUCCESS FAILURE | | | | | +---------------------------------+ +Copy the code

Mobile phones are as follows:

At this point, the Worker is ready to run.

6.3 WorkerStateRegistry

The function of the WorkerStateRegistry is to register the results of the run and then coordinate accordingly.

Its main member variables are:

  • _driver: Links to the Driver because the Driver is called to do the processing;

  • _host_manager: Used to discover hosts;

  • _workers: Records workers in each state. States include: ‘READY’, ‘SUCCESS’, ‘FAILURE’;

    • def count(self, state):
          return len(self._workers[state])
      Copy the code
  • _states: Records the status of the worker;

  • _barrier: When all workers are finished, further processing will take place;

Specific definitions are as follows:

class WorkerStateRegistry(object): def __init__(self, driver, host_manager, reset_limit=None, verbose=False): self._driver = driver self._host_manager = host_manager self._reset_limit = reset_limit self._reset_count = 0 self._lock  = threading.Lock() self._states = {} self._workers = defaultdict(set) self._barrier = None self._rendezvous_id = 0 self._verbose = verbose self._size = 0Copy the code

6.3.1 initialization

WorkerStateRegistry is initialized in the Driver and sets itself as a member variable of the Driver so that the Driver can easily call:

self._worker_registry = WorkerStateRegistry(self, self._host_manager, reset_limit=reset_limit)
Copy the code

6.3.2 start

Reset is called before the master starts all workers.

def _activate_workers(self, min_np): current_hosts = self.wait_for_available_slots(min_np) pending_slots = self._update_host_assignments(current_hosts) Self._start_worker_processes (pending_slots) self._worker_registry.reset(self.world_size()) # reset self._start_worker_processes(pending_slots)Copy the code

There is complex logic in the reset function.

There are two problems:

  • Why do we have _barrier? The reason is that most machine learning algorithms require all workers (or several workers) to complete before further processing, so it needs to wait.

    • In this case, the barrier argument parties is self.world_size(), which means that self._action will be fired only when the barrier internal count reaches self.world_size().
    • At the end of each worker, the_handle_worker_exit, will eventuallyself._barrier.wait().
    • That way, when all workers are finished, barrier fires self._action.
  • What does the _action setting do? Its function is: according to this training result, further control, decide the next action;

The code is as follows:

def reset(self, size):
    with self._lock:
        self._states.clear()
        self._workers.clear()
        self._barrier = threading.Barrier(parties=size, action=self._action)
        self._rendezvous_id += 1
        self._size = size
Copy the code

6.3.3 worker end

When the worker finishes, it returns to the Driver setting of _handle_worker_exit. Exit_code determines whether to call success or failure.

def  _handle_worker_exit(self, slot_info, exit_code, timestamp):
    if not self.has_rank_assignment(slot_info.hostname, slot_info.local_rank):
        return
​
    if exit_code == 0:
        rendezvous_id = self._worker_registry.record_success(slot_info.hostname, slot_info.local_rank)
    else:
        rendezvous_id = self._worker_registry.record_failure(slot_info.hostname, slot_info.local_rank)
​
    if self.finished() and self._worker_registry.last_rendezvous() == rendezvous_id:
        name = '{}[{}]'.format(slot_info.hostname, slot_info.local_rank)
        self._results.add_result(name, (exit_code, timestamp))
Copy the code

This is called into WorkerStateRegistry.

def record_ready(self, host, slot):
    return self._record_state(host, slot, READY)
​
def record_success(self, host, slot):
    return self._record_state(host, slot, SUCCESS)
​
def record_failure(self, host, slot):
    return self._record_state(host, slot, FAILURE)
Copy the code

The _record_state function uses self._workers[state].add(key) to record the state and calls _wait.

def _record_state(self, host, slot, state):
    if self._driver.finished():
        return self._rendezvous_id
    if self._host_manager.is_blacklisted(host):
        return self._rendezvous_id
​
    key = (host, slot)
    with self._lock:
        if key in self._states:
            if state == FAILURE:
                self._barrier.reset()
​
        if key not in self._states or state == FAILURE:
            self._states[key] = state
            self._workers[state].add(key)
​
        rendezvous_id = self._rendezvous_id
​
    rendezvous_id = self._wait(key, state, rendezvous_id)
    return rendezvous_id
​
Copy the code

_wait will also call self._barrier. Wait () to wait for information from other workers and then process it together.

def _wait(self, key, state, rendezvous_id): while True: try: self._barrier.wait() return rendezvous_id except threading.BrokenBarrierError: if self._barrier.broken: # Timeout or other non-recoverable error, so exit raise # Barrier has been reset with self._lock: # Check to make sure the reset was not caused by a change of state for this key rendezvous_id = self._rendezvous_id saved_state = self._states.get(key, state) if saved_state ! = state: # This worker changed its state, so do not attempt to wait again to avoid double-counting raise RuntimeError('State {} overridden by {}'.format(state, saved_state))Copy the code

6.3.4 Further control

The _action function will judge and control all workers after they finish.

def _action(self):
    self._on_workers_recorded()
Copy the code

The _on_workers_recorded function performs the control logic.

  • Judge whether a worker succeeds. If a worker succeeds, close other processes and end the training. Since all workers have finished running at this time, the loop can be broken out as long as one worker succeeds.
  • If all workers fail, the training ends;
  • Record the failed worker to the blacklist;
  • If all hosts are on the blacklist, the training ends.
  • If the maximum number of retries has been reached, the training ends.
  • Otherwise, call _driver.resume() to restart training. It will automatically resume training because it is committed.

The specific code is as follows:

def _on_workers_recorded(self):
    # Check for success state, if any process succeeded, shutdown all other processes
    if self.count(SUCCESS) > 0:
        self._driver.stop()
        return
​
    # Check that all processes failed, indicating that processing should stop
    if self.count(FAILURE) == self._size:
        self._driver.stop()
        return
​
    # Check for failures, and add them to the blacklisted hosts list
    failures = self.get(FAILURE)
    for host, slot in failures:
        self._host_manager.blacklist(host)
​
    # If every active host is blacklisted, then treat this as job failure
    if all([self._host_manager.is_blacklisted(host) for host, slot in self.get_recorded_slots()]):
        self._driver.stop()
        return
​
    # Check that we have already reset the maximum number of allowed times
    if self._reset_limit is not None and self._reset_count >= self._reset_limit:
      
self._driver.stop(error_message=constants.RESET_LIMIT_EXCEEDED_MESSAGE.format(self._reset_limit))
        return
​
    try:
        self._reset_count += 1
        self._driver.resume()
    except Exception:
        self._driver.stop()
Copy the code

6.4 Driver. Resume scene

Resume is all about starting over.

def resume(self):
    self._activate_workers(self._min_np)
​
Copy the code

We analyzed what happens between the time a worker starts running and when it is in the READY state.

Now, let’s add a case where the Driver finds a new node while the Driver is in resume and starts a new worker 3.

  1. Worker 2 starts calling Python train.py;

  2. In train.py, call hvd.init(), which dives into the C++ world to generate GlooContext;

  3. In GlooContext, you will get the IP and port of Rendezvous Server from the environment variables, and then call init_store to generate an HTTPStore;

  4. Call init_store.get(hostname + “:” + STD ::to_string(local_rank)) to send a request to Rendezvous Server, Local_rank, cross_rank… Because Rendezvous Server might be reinitialized for reallocation);

  5. Elasticrendezshandler is the response function that calls driver.record_ready(host, local_rank) to be recorded in the READY dictionary of the WorkerStateRegistry, Worker 2 is READY.

  6. Driver.get_slot_info (host, local_rank) is called to get the slot info from the driver;

  7. Return the slot info to the Http_store in the worker;

  8. Continue execution in Worker 2, returning the slot info to GlooContext for various Settings;

  9. We continue with item 5; In record_ready, call rendezvous vous_id = self._wait(key, state, rendezvous_id) to wait on workerstateregistry._barrier; Barrier(parties=size, action=self._action),

  10. If the number of READY workers reaches min-NP in Horovod, which is the minimum number of workers that can be started, _barrier finishes the task, and broken continues.

  11. As the _barrier continues, it calls the handler set at build time, the _action function, which in turn calls _on_workers_recorded, which in turn calls self._driver.resume();

  12. The resume function of ElasticDriver calls _activate_workers, which is defined as follows. You can see that the discovery script returns pending_slots if it has already discovered the new node. Pending_slots is where new workers can be started on top of these slots, so _start_worker_processes is called:

    1. def _activate_workers(self, min_np):
          current_hosts = self.wait_for_available_slots(min_np)
          pending_slots = self._update_host_assignments(current_hosts)
          self._worker_registry.reset(self.world_size())
          self._start_worker_processes(pending_slots)
      Copy the code
  13. _start_worker_processes will start a new worker: worker 3;

  14. Worker 3 also executes Python train.py, at which point the new Worker is started;

  15. Back to worker 2, if the training ends, SUCCESS or FAILURE will be returned to the Driver according to the training results;

  16. The Driver calls _handle_worker_exit to process the training results.

At this point, the new logic is complete.

+--------------------------------------------+ | +------------------------------+ | +----------------------+ | | WorkerStateRegistry | | 15 | ElasticDriver | +----------------------+ | | | +-----> | _rendezvous +-------------> | RendezvousServer | | | _driver +-----------+ | | | | | | | ^ | +<----------------+ _worker_registry | 12 init | handler_cls | | | | | | | +--------> | + | | | | 10 broken | | 11 resume | | +----------------------+ | | | +----------------------> _activate_workers | | | | | | +----------+_handle_worker_exit | v | | + 9 wait | | | | +---------------+---------------+ | | _barrier <---------------------------+ +--+-----------+-------+ | ElasticRendezvousHandler | | | | | | | ^ | +---------------------+ | 4 get(rank) | | +--------------------------+ | | | | | | | _get_rank_and_size +<-----------------------------------------------------+ | | | _workers | +<-----+ | | 6 | get_slot_info | | | | | | | | | | 16 | | +-------------------+ driver +-----------------------------------------------------------+ | | | |SUCCESS[] | | | | | | | | 7 slot_info (rank,local_rank...) | | | | |READY[(host 1, slot1)] <----------------+--------------------------------------+ | | | | | | |FAILURE[] | | | 5 record_ready | +---------------------+ | | | | | +--------------------------+ | | +-------------------------------+ | | + +------------------------------+ | | | _handle_worker_exit + Host 1 + + +-----+-----------------------------------------------------+ rsh +---------+----------------------------------------------------------------------------------------+ Socket +------+ | +  Host 3 | Host 2 Host 2 + + | | | +-----------------+---------------+ Python + C++ | | | + | | Worker 2 | 1 slot 2 | | v  | | 13 _start_worker_processes(pending_slots) | | v host 2 | | | | + | | Python train.py | | +--------+---+ | | | | + |  | | HTTPStore | | v | | | | | +---+-----+--+ | | | v | | | ^ | +---------------------------+ | | hvd.init() | | 2 8 | |  3 | | Worker 3 | | | + +------------------+-------+ | | | | + slot 3 | | | | | | +----------------------------+ | | | host 3 | | | + | + | | GlooContext | | | | | 14 | | | | READY | | | | | | | | | v | | | + | | | | | + | | | | | | | | | | | | init_store | | | Python train.py | | | v | | | | v | | | | | | hvd.elastic.run(state) | | | | gloo::Context(rank) | | | | | | + | | | | | | +---------------------------+ | | + | | +--------> | gloo::Context(local_rank) | | | | run_fn(train_function) | | | | | | | + | | | gloo::Context(cross_rank) | | | | v | | | | | | | +--------+--------+ | | +----------------------------+ | | | | | | | | | | v v | + | | | SUCCESS FAILURE | | + | + + | | | | | | | +---------------------------------+ | | | | | | +--------------------------------------------------------------------------<------------------+-----------------+Copy the code

Mobile phones are as follows:

We can expand some details to see, that is, Driver + a Worker, so that we can see more clearly:

+ +------------------------------+ +----------------------+ | | WorkerStateRegistry | | ElasticDriver | Host 1 | Host 2 | |10 resume | | | 1 | _driver +-------------------------->+ start/resume +-----------------------------------------------------------+ | | | | _activate_workers | | | ^ | | | <-------------+ | | | | | +---+---+--------------+ get_slot_info | | +---------------------------------+ | 9 | broken | | | | | | Worker 2  | slot 2 | | | | | | ^ | | | v host 2 | | | | | | | | | | Python train.py | | + | | | | +----------+-----------+ | | + | | _barrier | | | | | RendezvousServer | | | | | | ^ | | | | | | | 2 | v | | 8 | wait | | | | | _get_rank_and_size | <---------------------+ hvd.init() | | | | | | | | | | | + | | | | | | | +-----------+----------+ | | | | | +------+-------------------+ | | | | | | | + | | |_workers | | | | | | | | READY | | | | | | | | | | | + | | |SUCCESS[] <------------------------------+ | | | | | | | | | | | 6 record_success | | | | | v | | | | | | | | | | hvd.elastic.run(state) | | |READY[(host 1, slot1)] <--------------------------------------------------------+ | | + | | | | | | | 3 record_ready | | + | | |FAILURE[] <-----------------------------------+ | | | run_fn(train_function) | | | | | 7 record_failure | | | + | | +--------------------------+ | | | | v | +------------------------------+ | | | +--------+--------+ | | | | | | | | | | v v | | | | SUCCESS FAILURE | | | | + + | | | | | | | | | +---------------------------------+ | | | | | _handle_worker_exit | 4 | 5| | | | | +-----------------------------------------------------<-----+-----------------+ | +Copy the code

Mobile phones are as follows:

Now that the worker section is analyzed, we’ll look at how to handle errors in the next article.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

PaddlePaddle Fluid: Elastic deep learning in Kubernetes

Horovod Flexibility training

Horovod for ElasticDL in Kubernetes

Kubernetes- Native elastic distributed deep learning system

One of the cloud native elastic AI training series: Elastic distributed training practices based on AllReduce