0 x00 the

Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.

This series takes you through the source code analysis of Horovod. This is the twelfth article in a series that looks at how Horovod implements resilience training.

Resilience training allows Horovod to dynamically scale the number of workers at runtime without restarting or simply resuming training from a checkpoint in storage.

Links to other articles in this series are as follows:

Horovod (1) — Basics

Horovod (2) — A distributed training framework for deep learning — from the user’s perspective

Horovod (3) — What’s behind Horovodrun

Horovod (4) — Network Basics & Driver

Horovod (5) — fusion framework

Horovod (6) — background architecture

Horovod (6), a distributed training framework for deep learning, is implemented using threads

Horovod (7) — DistributedOptimizer

Horovod (8) — on Spark

Horovod (9) — Start on Spark

Horovod (10) — Run on Spark

Horovod (11) — on Spark — GLOO scheme

0 x01 an overview

1.1 points

So let’s think about what’s going on at Horovod right now.

  • Unable to automatically Scale capacity.
    • Since computing resources may be flexibly scheduled, consider what happens if the cluster shrinks? What if it expands? Ideally, the number of workers can be increased or decreased automatically during training. In addition, when the number of workers changes, the training task will not be interrupted to achieve smooth transition.
    • Horovod cannot be executed with limited resources at this time. If a total of 100 Gpus are needed and only 40 gpus are in place for the time being, Horovod will have to wait and not be able to start training on a small number of processes with the existing 40 Gpus to quickly start the model iteration.
    • Horovod cannot automatically add process acceleration training when resources are abundant. In the above example, ideally, Horovoid would start training by building a loop with these 40 Gpus and dynamically expand if 60 new Gpus are found in place, so that the next epoch starts training by building a loop with 100 Gpus.
  • There is no Fault Tolerance. Currently, if a node fails, the entire training fails, and users can only start training from scratch. If auto Scale is supported and some previously saved checkpoints are added, Horovod can re-select a good node to start the worker, or build a loop with the remaining nodes to continue training.
  • The scheduling mechanism is not flexible.
    • Machine learning training tasks generally take a long time and occupy a large amount of computing power. Horovod tasks lack flexibility, do not support dynamic configuration of workers, and do not support high-priority instance preemption. Therefore, when resources are insufficient, resources cannot be released for other services of high priority as needed. You can only wait for the task to terminate voluntarily or fail to terminate.

In order to solve these problems, we will consider many other specific technical problems and details. Let’s list them first:

  • When to build a checkpoint? Which stage is appropriate? Autosave after each epoch? Or is it controlled by the user (which is better)?
  • How do I recover from checkpoint?
  • What does Checkpoint need to store, that is, what states are necessary for Horovod?
  • How to monitor worker’s work? How do you tell if something is wrong with the machine? If only network congestion occasionally caused how to do?
  • You need to build a notification mechanism;
  • How do I know the surplus resources of a cluster? How do I find available nodes?
  • How to build a new communication ring?
  • If you build a new ring, is it done by a master? Or using a protocol like Gossip?
  • Whether the scheduling priority is configured to make full use of idle resources in the shared cluster.
  • How is the new worker synced?
  • What about the original Active worker node?
  • What can I do about the faulty worker node?
  • How does rank 0 broadcast?

We will attempt to answer these questions in this and subsequent analyses.

Note: Horovod’s current scheduling mechanism is still inflexible and does not support preemption.

1.1 the role

Horovod uses NCCL to communicate on multiple Gpus of a single machine and Ring AllReduce algorithm to communicate between multiple machines (CPU or GPU). Horovod’s flex training is multi-machine flex training.

Horovod Flexibility training has two roles: driver and worker. Driver processes run on CPU nodes, while worker processes can run on CPU nodes or GPU nodes.

The Driver process is used to:

  • Calling Gloo helps workers construct an AllReduce communication ring, or communication domain. Driver does not participate in the specific construction of communication ring, but provides auxiliary information, so that worker can establish the ring.
    • The Driver process needs to create a RendezvousServer with Kvstores for Gloo, where the Kvstores are used to store information such as the host of each node in the communication domain and its assigned sequence number rank in the logical communication ring.
    • This RendezvousServer runs within the driver process of Horovod. After the driver process obtains the addresses and GPU card number information of all worker process nodes, it writes them into the KVStore of the RendezvousServer. The worker can then call gloo to access the RendezvousServer to construct the communication ring.
  • The Driver starts/restarts the worker process on the worker node.
  • The Driver monitors the overall system status.

Worker is responsible for training and model iteration.

  • Each worker node constructs a communication ring by requesting RendezvousServer to obtain information about its neighbor nodes.
  • In this communication ring, each worker node has a left neighbor and a right neighbor. In the communication process, each worker only sends data to its right neighbor and receives data from its left neighbor.

The networking mechanism is as follows:

+-------------------------------+ | Driver | | | | +------------------------+ | | | RendezvousServer | | | | | | | | | |  | | host1, host2, host3 | | | +------------------------+ | +-------------------------------+ ^ ^ ^ | | | | | | +-------------+ | +--------------+ | | | | | | | | | v v v +--------+----+ +-------+------+ +----+--------+ | Worker | | Worker | | Worker  | +------> | +------> | +---------> | | +------+ | | host1 | | host2 | | host3 | | | +-------------+ +--------------+ +-------------+ | | | | | | v <--------------------------------------------------------------------------------+Copy the code

Let’s look at each part in detail.

1.2 Fault tolerance mechanism

Horovod’s fault-tolerant mechanism is based on gloo, which can be considered a passive operation for errors.

Gloo itself does not support fault tolerance. When many workers perform aggregation operations on tensors, if a certain worker fails, GLOo will not handle exceptions, but throw exceptions and exit, so that all workers will report abnormal exit.

In order not to let the failure of a single worker cause the whole training to quit, Horovod needs to do two things:

  • Do not allow exceptions to affect existing jobs. Horovod had to catch exceptions thrown by gloo, so a Python mechanism for handling exceptions was built.
    • The Worker will pass the exception to the corresponding Python API for processing after catching the exception, and the API will decide whether to continue the training by judging the exception type.
    • If the exception information contains the keywords “HorovodAllreduce”, “HorovodAllgather”, or “HorovodBroadcast”, it indicates that the communication failure may be caused by the death of a worker. The exception is considered recoverable by Horovod.
  • Give up the failed worker and use the remaining available worker to continue training.
    • Other surviving workers stop the current training and record the number of steps of the current model iteration.
    • At this time, the gloO runtime has problems, the communication ring has broken, and AllReduce operations cannot be continued among the remaining workers.
    • In order to continue training, the Horovod Driver reinitializes GLOo, starts a new Rendezvous server, and then obtains information about the surviving workers, which are used to form a new communication ring.
    • When the new communication ring is constructed successfully, the Rank 0 worker will broadcast its model to all other workers, so that they can start training on the same basis, following the last stopped iteration.

1.4 Monitoring Mechanism

Fault tolerant mechanism is passive operation, monitoring mechanism is active operation.

Resilience means that the state of the distributed cluster can change at any time, and Horovod itself is not associated with the distributed cluster, so there needs to be an external way for Horovod to keep track of the state of the cluster at any time.

The external approach is that the user needs to provide a discovery script, discovery_host, in the Horovod startup command. Discovery_host is written by the user and is responsible for discovering available worker node topology information.

After running, the Driver will periodically call this bash script to monitor the cluster. When the worker changes, the discover_host script will return the latest worker state, and the Driver will get the worker node information based on the return of discover_host:

  • If the Driver finds that any worker fails, it captures the exception, updates the node information of RendezvousServer KVStore according to the information of the surviving worker, and calls on the RendezvousServer KVStore to re-establish a communication ring for training.
  • If the Driver finds that a new worker node joins the cluster, it updates the node information of RendezvousServer KVStore based on all the current worker information, and calls on the RendezvousServer to build a communication ring again for training. After the existing worker node receives the notification, it will pause the current training, record the current iteration steps, and callshutdowninitReconstruct the communication ring. The Driver also starts the worker on the new node, expanding the number of processes.
  • When the new communication ring is constructed successfully, the Rank 0 worker will broadcast its model to all other workers, so that they can start training on the same basis, following the last stopped iteration.

In this way, during the training process, when the number of workers changes, the training still continues.

1.5 Official Architecture diagram

An official architecture diagram is shown below, and we will explain the parts of the diagram step by step in the following articles:

0x02 Example code

2.1 the python code

Let’s take a look at the TF V2 sample code from the official documentation. The key is to encapsulate the train using @hvd.elastic. Run and pass in a TensorFlowKerasState.

import tensorflow as tf
import horovod.tensorflow as hvd

hvd.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

dataset = ...
model = ...

optimizer = tf.optimizers.Adam(lr * hvd.size())

@tf.function
def train_one_batch(data, target, allreduce=True) :
    with tf.GradientTape() as tape:
        probs = model(data, training=True)
        loss = tf.losses.categorical_crossentropy(target, probs)

    if allreduce:
        tape = hvd.DistributedGradientTape(tape)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

# Initialize model and optimizer state so we can synchronize across workers
data, target = get_random_batch()
train_one_batch(data, target, allreduce=False)

# Wrap train with @hdD.elastic. Run
@hvd.elastic.run
def train(state) :
    for state.epoch in range(state.epoch, epochs):
        for state.batch in range(state.batch, batches_per_epoch):
            data, target = get_random_batch()
            train_one_batch(data, target)
            if state.batch % batches_per_commit == 0:
                state.commit()
        state.batch = 0

def on_state_reset() :
    optimizer.lr.assign(lr * hvd.size())

# Here is the new modification, passing in a TensorFlowKerasState
state = hvd.elastic.TensorFlowKerasState(model, optimizer, batch=0, epoch=0)
state.register_reset_callbacks([on_state_reset])
train(state)
Copy the code

2.2 Script Execution

Resilience training still uses the command line tool horovodrun. Unlike distributed training, resilience training does not explicitly specify the list of nodes in the start command. Instead, it uses a discovery mechanism to discover nodes at run time. It is common practice to provide a discovery script when starting a Job:

horovodrun -np 18 --host-discovery-script discover_hosts.sh python train.py
Copy the code

This script is used to provide real-time feedback on the available hosts and slots on each hosts. (The following uses discover_hosts.sh to refer to the script, but it does not need to be named discover_hosts.sh.)

The discover_hosts.sh script must have executable permission. When executed, the discover_hosts.sh script returns a list of available nodes.

$ sh ./discover_hosts.sh    Run the script to output the node information
host-1:4
host-2:4
host-3:4
Copy the code

If the discovery script fails to run (it does not have executable rights) or returns a non-0 error code when it is run, the training process fails immediately, or it tries again until it times out (the return list of slots does not meet the minimum number of runnable slots).

The flexibility training process will not run until the minimum number of slots required (-NP) is ready. The user can specify the minimum and maximum number of slots by — min-NP and –max-np. For example:

horovodrun -np 8 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.sh python train.py
Copy the code

If the number of available slots is less than the number specified by — min-NP (for example, some nodes fail or the task is preempted), the task will be suspended until more nodes become active or the HOROVOD_ELASTIC_TIMEOUT (600 seconds by default) is reached. In addition, if you do not specify –min-np, the minimum number of slots will default to the number configured by -NP.

The reason for needing — max-NP is to limit the number of processes (to prevent overuse of available resources), and also to serve as a reference point in terms of learning rates and data partitioning (in which case a fixed reference configuration is required). Again, if this parameter is not specified, it defaults to — NP.

0x03 Logical process

3.1 Logical Process

Let’s first analyze the logical process of Elastic training (in order to realize the ability of Elastic training, Horovod Elastic has made some modifications to the architecture and implementation of Horovod). The biggest difference is that Elastic training needs to track and synchronize the status of workers when adding or deleting workers. The specific modifications are as follows.

  1. Aggregation operations need to be defined inhvd.elastic.runUnder the function.
    1. Wrap your main training process code (all code after initialization) in a function (we’ll call it train_func for the time being) and decorate it with the decorator hvd.elastic. Run.
    2. For the train_func function decorated by this decorator, its first argument must be an instance of hvd.elastic.State. Because some newly added workers may be in some uncertain state, before running the decorated function train_func, the state object needs to be synchronized in all workers, so as to ensure that all workers reach the consistent state.
    3. Do not use Horovod’s collection operations (such as broadcast, allReduce, allGather) before the synchronization function because the synchronization function uses collection communication operations and the active worker is not reset before the synchronization function.
  2. Each worker has its own state..
    1. Place all variables that need to be synchronized between workers into the hvd.elastic.State (such as Model Parameters, Optimizer State, current epoch, batch schedule, etc.) object.
    2. For TensorFlow, Keras and PyTorch, a default standard state implementation is already provided. However, if the user needs to broadcast special types in certain scenarios, the custom HvD.elastic.State object can be overridden.
    3. In the runhvd.elastic.runFunction, this state object will be synchronized once across all workers for consistency.
  3. Periodically call state.com MIT () to back up state to memory.
    1. Regular backups are very useful. In case of unexpected errors of some workers, regular backup can prevent the state from being damaged and the field cannot be restored during retraining. For example, if a worker suddenly makes an error in the process of updating parameters, part of the gradient is updated, and part of the gradient may only be updated to half. This state is irreversible and cannot be continued. Therefore, when this state occurs, an HorovodInternalError is raised, and when caught by hvd.elastic. Run, all states are restored using the latest commit.
    2. Because the commit state is expensive (for example, too many parameters can lead to too much time), you need to strike a balance between the amount of processing time per batch and how long the training will have to recover from if something goes wrong. For example, if you commit every 10 batches you’ve trained, you’ve reduced the copy time by 10 times. But when an error occurs, you need to roll back to the previous state of 10 batches.
    3. Elastic Horowod can avoid these rollbacks by performing what we call a “graceful removal worker.” If the driver process finds that the host is available or marked for removal, it pushes a notification to all workers. So on the next callstate.commit()Or something lighterstate.check_host_updates()When aHostsUpdatedInterruptThe exception will be thrown. This exception is handled similarly to “HorovodInternalError”, except that the parameter status is not restored to the previous commit, but is recovered from the current real-time parameter.
    4. In general, if your hardware is reliable and stable and your choreography system provides adequate alerts when task nodes are removed, you can call the state.com MIT () function at low frequencies, Only the relatively inexpensive state.check_host_updates() is called at the end of each batch to check for node changes.
  4. Register some callback functions in the hvd.elastic.State object to respond when worker members change
    1. For example, the callback function can handle the following cases:
      1. When the number of workers changes, the learning rate should be changed accordingly according to the new world size.
      2. Repartition the data set.
    2. These callbacks are called between “Horovod is restarted “and” state is synchronized between nodes “.
  5. Increasing or decreasing a worker will trigger a reset event on other workers, the reset event activates the following operations (depending on the situation, but not all of them) :
    1. Determine whether the worker can continue to run.
    2. If the worker host is added to the blacklist, the host in the blacklist will not be used in the next network.
    3. Start the worker process on the new hosts.
    4. Update the rank information of each worker.
  6. After the reset, the state of each worker is synchronized

3.2 the entry point

From the following code shows HVD. Elastic. The run is horovod tensorflow/elastic. Py run function.

import horovod.tensorflow as hvd
@hvd.elastic.run
Copy the code

So we went to this document.

def run(func) :
    from tensorflow.python.framework.errors_impl import UnknownError

    def wrapper(state, *args, **kwargs) :
        try:
            return func(state, *args, **kwargs)
        except UnknownError as e:
            if 'HorovodAllreduce' in e.message or \
                    'HorovodAllgather' in e.message or \
                    'HorovodBroadcast' in e.message:
                raise HorovodInternalError(e)
    return run_fn(wrapper, _reset)
Copy the code

3.3 main logic

Run_fn function is mainly about the user code logic, is located in the horovod/common/elastic. Py.

The main logic is:

  • Initialize notification_Manager;
  • Register state in notification_Manager;
  • Run the func function, which is the user’s training code train;
  • When HorvodInternalError or HostsUpdateInterrupt is added or deleted in the worker process, these two errors are caught and reset is called for fault-tolerant processing.
def run_fn(func, reset) :
    @functools.wraps(func)
    def wrapper(state, *args, **kwargs) :
        notification_manager.init()
        notification_manager.register_listener(state)
        skip_sync = False

        try:
            while True:
                if not skip_sync:
                    state.sync()

                try:
                    return func(state, *args, **kwargs)
                except HorovodInternalError:
                    state.restore()
                    skip_sync = False
                except HostsUpdatedInterrupt as e:
                    skip_sync = e.skip_sync

                reset()
                state.on_reset()
        finally:
            notification_manager.remove_listener(state)
    return wrapper
Copy the code

3.4 Error Handling

In the error state, when HorvodInternalError or HostsUpdateInterrupt occurs in the worker process, the Horovod executes the following process:

  1. The above two errors were caught in the hvd.elastic. Run decorator;
  2. If HorvodInternalError is thrown, the commit state is recovered from the last commit state.
  3. Horovod Context is re-initialized, and then a new round of rendezvous is started. During rendezvous, the old worker is elected as the new rank-0 first, because the old worker has the latest status from the last training.
  4. The new rank-0 worker synchronizes the status to other workers;
  5. Continued training;

So far, we have analyzed the basic architecture of Horovod resilience training. In the next article, we analyze the most important component: Driver.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Horovod for ElasticDL in Kubernetes

Kubernetes Training _ Distributed deep learning training using Horovod on Kubernetes

The Elastic Training Operator is an Elastic deep learning Training tool on Kubernetes

Elastic and Fault-tolerant Distributed Training for ElasticHorovod

Horovod Flexibility training

The four-fold realm of large-scale machine learning frameworks