0 x00 the

In previous articles, we have studied the basic distributed modules of PyTorch and introduced some official examples. Next, we will introduce PyTorch’s resiliency training. In this second article, we will focus on how to start resiliency training and get a sense of the overall architecture of the system.

The resilience training series is as follows:

PyTorch distributed elastic training (1) — the general idea

0x01 Important Concepts

To further illustrate (and this will probably come later in this article because it is too important), we will summarize the two most important concepts of TE Agent and Rendezvous.

  • Agent: Agent is an independent background process running on a single node, which can be considered as worker Manager or Process Supervisor. It is responsible for starting the worker, monitoring the running of the worker, catching woker anomalies, and passingrendezvousImplement mutual discovery between workers (for example, reporting states to KVStore), and when members change based onrendezvousSynchronizing changes and so on.
  • Rendezvous: In order to achieve elastic training, there was a need for a mechanism for nodes/processes to discover each other. Rendezvous was the discovery mechanism or synchronization component. All workers will rendezvous to establish a new process group when the system is started or when members change.

Let’s take a look at the diagram from the source code, so you have a general idea.

0x02 Distributed Running

2.1 Mode Change

2.1.1 Original mode

As we know, PET has been merged from PyTorch V1.9, and the distributed boot approach has been greatly changed due to the incorporation of elastic training.

V1.9 before is to use the torch/distributed/launch. Py to start, such as:

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
Copy the code

The meanings of this parameter are as follows:

  • nnodes: is the number of nodes participating in training.
  • nproc_per_node: Number of processes running on each node.
  • node_rank: Indicates the id of the current node.
  • master_addrmaster_portIs the address and port on which the master listens.

When run, torch.distributed. Launch sets a number of environment variables, including world_size, master_ADDR, and master_port. Then create nproc_per_node processes on the current machine, which form a local group. If there are a total of NODE_SIZE machines participating in the training, there are a total of NODE_SIZE * TRAINERS_PER_NODE processes. If you want to start a distributed training task, you need to execute commands on all machines.

2.1.2 Current mode

PyTorch 1.9 starts with Torch /distributed/run.py. If still use the torch/distributed/launch. Py, has been through its internal to the run. Py, detailed in the code:

def main(args=None) :
    logger.warn(
        "The module torch.distributed.launch is deprecated "
        "and going to be removed in future."
        "Migrate to torch.distributed.run"
    )
    args = parse_args(args)
    run(args)
Copy the code

Torch.distributed. Run is a superset of the previous Torch.distributed. Launch and offers the following new features:

  • Fault tolerance: Worker faults can be handled gracefully by restarting all workers.
  • Automatic: Worker’sRANKWORLD_SIZEIt’s automatically assigned.
  • Elasticity: Allows you to change the number of nodes between the minimum and maximum (elasticity).

In order to use elastic training, the user code also needs to make some changes. If the user’s training script already supports torch.distributed. Launch, only a few changes are needed to use torch.distributed.

  • No need to manually pass RANK, WORLD_SIZE, MASTER_ADDR and MASTER_PORT.
  • You must providerdzv_backendrdzv_endpoint. For most users, this is really just “C10D” (see “Rendezvous”). This replaces the previous MASTER_ADDR and MASTER_PORT.
  • use_envParameters have been deleted. Get LOCAL_RANK from the LOCAL_RANK environment variable (for example,os.environ["LOCAL_RANK"]).
  • The user needs to make sure it’s in the scriptload_checkpoint(path)save_checkpoint(path)Logic, that is, manually Checkpoint. Because when the worker fails, we use the nearest checkpoint to restore the scene and restart all the workers.

Here is an example of a training script that sets checkpoints on each epoch, so that at worst it will lose the training results of one epoch if it fails.

  def main() :
       args = parse_args(sys.argv[1:])
       state = load_checkpoint(args.checkpoint_path)
       initialize(state)

       # torch.distributed.run ensure that this will work
       # by exporting all the env vars needed to initialize the process group
       torch.distributed.init_process_group(backend=args.backend)

       for i in range(state.epoch, state.total_num_epochs)
            for batch in iter(state.dataset)
                train(batch, state.model)

            state.epoch += 1
            save_checkpoint(state)
Copy the code

So, let’s take a look at distributed booting under the new model.

2.2 the deployment

The deployment mode is as follows.

  1. Start the Rendezvous backend server (not required by the C10d backend) and obtain the endpoint (as--rdzv_endpointPass to the launcher script)
  2. Single-node multiple Workers: Start the launcher on the host to start the agent process, which creates and monitors local workgroups.
  3. Multiple nodes, multiple workers: Use the same parameters on all nodes to start the launcher for training.

When using job/Cluster Manager, the entry point command for a multi-node job is launcher.

2.3 the sample

Let’s start with a few examples of how to start distributed training.

2.3.1 Startup of Multiple Workers on a Node

The startup mode of multiple workers on a single node is as follows, which is actually the Standalone mode, which is a special case of the distributed mode and specifically provides some convenient Settings for multiple workers on a single node.

python -m torch.distributed.run
        --standalone
        --nnodes=1
        --nproc_per_node=$NUM_TRAINERS
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
Copy the code

2.3.2 Fault tolerant startup mode

The following is the fault-tolerant mode for starting, with a fixed number of workers and no flexible training. Nproc_per_node =$NUM_TRAINERS specifies the number of Gpus on a single node.

python -m torch.distributed.run --nnodes=$NUM_NODES --nproc_per_node=$NUM_TRAINERS --rdzv_id=$JOB_ID --rdzv_backend=c10d  --rdzv_endpoint=$HOST_NODE_ADDR YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)Copy the code

HOST_NODE_ADDR, of the format: [:], specifies the address and port of the C10d rendezvous back-end node. This node can be any in the training cluster, but a high bandwidth node is preferred.

2.3.3 Elastic Startup Mode

The following is the elastic training, the elastic interval is (min=1, Max =4). By specifying RDZV parameters, multi-machine training can be realized, with fault tolerance and flexibility.

The minimum number of nodes is MIN_SIZE and the maximum is MAX_SIZE. The etCD service is used to implement consistency and information synchronization.

python -m torch.distributed.run
        --nnodes=1:4
        --nproc_per_node=$NUM_TRAINERS
        --rdzv_id=$JOB_ID
        --rdzv_backend=c10d
        --rdzv_endpoint=$HOST_NODE_ADDR
        YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
Copy the code

HOST_NODE_ADDR, of the format: [:], specifies the address and port of the C10d rendezvous back-end node. This node can be any in the training cluster, but a high bandwidth node is preferred.

A few notes regarding Rendezvous Backend:

For multi-node training, you need to specify:

  • --rdzv_id: A unique JOB ID that is shared among all nodes that participate in the job.
  • --rdzv_backend: torch.distributed.elastic.rendezvous.RendezvousHandlerAn implementation of. (--rdzv_backendDefault is static mode, does not support fault tolerance and elastic scaling)
  • --rdzv_endpoint: Endpoint run by rendezvous Backend in the following format:host:port. This replaces the previous master address/port setting.

Currently, the following backends can be used directly: C10D (recommended), ETCD-V2, and ETCD (Legacy). To use etCD-v2 or ETCD, you need to set up an ETCD server with the V2 API enabled (i.e. –enable-v2).

0x03 Start Script

Since all of the above launches are using Torch /distributed/run.py, let’s take a closer look at this script, which provides three functions:

  • Rely on “restart all workers” to handle worker failure;

  • Automatically assign worker’s RANK and WORLD_SIZE;

  • Elastic training, where the number of nodes is allowed to change between the minimum and maximum;

3.1 Parameter Definition

In the startup script, some parameters are defined as follows:

  • Node– Physical instance or container; Maps to the cell coordinated with the Job Manager.
  • Worker– Worker in distributed training environment.
  • WorkerGroup– A group of workers (e.g. trainers) that perform the same function.
  • LocalWorkerGroup– a subset of workers in a workgroup running on the same node.
    • anoderunLOCAL_WORLD_SIZEThese workers are composed ofLocalWorkerGroup.
    • All nodesLocalWorkerGroupscompositionWorkerGroups.
  • RANK– The rank of the worker in the working group is a global rank, which can be considered as a global GPU resource list.
    • Rank is unstable and local Workers are assigned to different ranks between reboots, so do not pair in codeRANKLOCAL_RANKThe stability of making any assumptions and relying on coding.
    • Upon rendezvous completion, all its members will agree on work membership and each person’s role within it. This role is represented by an integer between 0 and world size and is called a rank.
  • LOCAL_RANK– In a local workgroup, the rank of a worker can be regarded as the GPU resource list on the current node.
  • GROUP_RANK– Rank of worker group. A number between 0 and the maximum number of nodes. If each node runs a single workgroup, thenGROUP_RANKThat’s the rank of this node.
  • ROLE_RANK– For the ranks shared between workers who have the same role, the roles are specified in the “WorkerSpec”.
  • WORLD_SIZE– Total number of workers in the workgroup. Because nodes join/leave, soWORLD_SIZECan change, can’t rely onWORLD_SIZEThe stability of the code.
  • LOCAL_WORLD_SIZE– The size of the local workgroup, i.e. the number of workers running locally, is equal to thetorch.distributed.runSpecified at runtime--nproc_per_node. Currently, Torch /distributed/run.py only supports isomorphismLOCAL_WORLD_SIZE. That is, assume that all nodes run the same number of local workers (per role).
  • ROLE_WORLD_SIZE– Total number of workers with the same role, inWorkerSpecIs specified in.
  • rdzv_id– User-defined ID that uniquely identifies the workgroup for the job. This ID is used when each node joins a specific workgroup.
  • rdzv_backend– The back-end of rendezvous (for example, “C10D”). This is usually a strongly consistent key-value store.
  • rdzv_endpoint– rendezvous backend endpoint; Usually with”< host > : < port >“In the form of.
  • run_id: user-defined ID that uniquely identifies an instance of a distributed application. It typically maps to job ids and is used to allow nodes to join the correct distributed application.
  • TORCHELASTIC_RUN_ID– and rendezvousrun_idEqual: a unique JOB ID.
  • TORCHELASTIC_RESTART_COUNT– Number of workgroup restarts so far.
  • TORCHELASTIC_MAX_RESTARTS– Indicates the maximum number of restarts.

3.2 Related functions/variables

To better understand the above parameters, let’s take a look at some of the related functions/variables.

World_size, rank

These two variables are dynamically generated, so they are taken from state.

rank, world_size = self._get_world()
    
def _get_world(self) - >Tuple[int.int] :
	state = self._state_holder.state
	return state.participants[self._this_node], len(state.participants)
Copy the code

_pg_group_ranks

This global variable stores mapping information from global rank to Local Rank for each group.

# Process group's global rank to local rank mapping
_pg_group_ranks: Dict[ProcessGroup, Dict[int.int]] = {}
Copy the code

An example of its assignment is as follows:

# Create the global rank to group rank mapping
_pg_group_ranks[pg] = {
    global_rank: group_rank
    for group_rank, global_rank in enumerate(ranks)
}
Copy the code

group_rank

We can use global rank to extract local ranks from _pg_group_ranks.

def _get_group_rank(group: ProcessGroup, rank) :
    """ Helper that gets a given group's local rank in the group from a given global rank. """
    if group is GroupMember.WORLD:
        raise RuntimeError("group.WORLD does not have local rank to global "
                           "rank mapping")
    if group not in _pg_group_ranks:
        raise RuntimeError("The given group does not exist")
    try:
        group_rank = _pg_group_ranks[group][rank]
    except KeyError:
        raise RuntimeError(f"The global rank {rank} is not part of the group {group}") from None
    return group_rank
Copy the code

global_rank

We can use the local rank of a group to obtain its Gloabl rank.

def _get_global_rank(group, group_rank) :
    """ Helper that gets a given group's global rank from a given local rank in the group. """
    if group is GroupMember.WORLD:
        raise RuntimeError("group.WORLD does not have local rank to global "
                           "rank mapping")
    group_rank_map = _pg_group_ranks[group]
    for rank, grp_rank in group_rank_map.items():
        if grp_rank == group_rank:
            return rank
    raise RuntimeError("The group rank is not part of the group")
Copy the code

group_size

We can get the size of a group by _get_group_size.

def _get_group_size(group) :
    """ Helper that gets a given group's world size. """
    if group is GroupMember.WORLD or group is None:
        default_pg = _get_default_group()
        return default_pg.size()
    if group not in _pg_group_ranks:
        raise RuntimeError("The given group does not exist")
    return len(_pg_group_ranks[group])
Copy the code

nproc_per_node

This variable tells you how many processes are supported on each node.

def determine_local_world_size(nproc_per_node: str) :
    try:
        logging.info(f"Using nproc_per_node={nproc_per_node}.")
        return int(nproc_per_node)
    except ValueError:
        if nproc_per_node == "cpu":
            num_proc = os.cpu_count()
            device_type = "cpu"
        elif nproc_per_node == "gpu":
            if not torch.cuda.is_available():
                raise ValueError("Cuda is not available.")
            device_type = "gpu"
            num_proc = torch.cuda.device_count()
        elif nproc_per_node == "auto":
            if torch.cuda.is_available():
                num_proc = torch.cuda.device_count()
                device_type = "gpu"
            else:
                num_proc = os.cpu_count()
                device_type = "cpu"
        else:
            raise ValueError(f"Unsupported nproc_per_node value: {nproc_per_node}"))return num_proc
Copy the code

3.3 Script Entry

As you can see, it calls elastic_launch to perform this function, so we’ll look at this function in the next section.

from torch.distributed.launcher.api import LaunchConfig, elastic_launch

def run(args) :
    if args.standalone: There are two modes: Standalone and distributed
        args.rdzv_backend = "c10d"
        args.rdzv_endpoint = "localhost:29400"
        args.rdzv_id = str(uuid.uuid4())
        log.info(
            f"\n**************************************\n"
            f"Rendezvous info:\n"
            f"--rdzv_backend={args.rdzv_backend} "
            f"--rdzv_endpoint={args.rdzv_endpoint} "
            f"--rdzv_id={args.rdzv_id}\n"
            f"**************************************\n"
        )

    config, cmd, cmd_args = config_from_args(args)
    elastic_launch(
        config=config,
        entrypoint=cmd,
    )(*cmd_args)


def main(args=None) :
    args = parse_args(args)
    run(args)


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO, format="[%(levelname)s] %(asctime)s %(module)s: %(message)s"
    )
    main()
Copy the code

0x04 Monomer Process

Let’s start with Elastic_launch and see how it works on a single node. Firstly, we present a general schematic diagram, in which there are two nodes with one agent on each node. Under the agent is a worker group, and under the group are four workers.

4.1 small example

Let’s look at an example from the source code, here we only set two workers.

import uuid
import torch
from torch.distributed.launcher.api import LaunchConfig, elastic_launch

def worker_fn(t1, t2) :
    return torch.add(t1, t2)

def main() :
    t1 = torch.rand((3.3), requires_grad=True)
    t2 = torch.rand((3.3), requires_grad=True)

    config = LaunchConfig(
        min_nodes=2,
        max_nodes=4,
        nproc_per_node=1,
        run_id=str(uuid.uuid4()),
        role="trainer",
        rdzv_endpoint="localhost:29400",
        rdzv_backend="c10d",
        max_restarts=1,
        monitor_interval=1,
        start_method="spawn",
    )

    outputs = elastic_launch(config, worker_fn)(t1, t2)

if __name__ == '__main__':
    main()

Copy the code

The output is as follows. You can see that there are two worker processes and one agent process.

{"name": "torchelastic.worker.status.SUCCEEDED"."source": "WORKER"."timestamp": 0."metadata": {"run_id": "7fbf85fe-b8b3-462e-887e-8121e3062e0b"."global_rank": 0."group_rank": 0."worker_id": "12172"."role": "trainer"."hostname": "DESKTOP-0GO3RPO"."state": "SUCCEEDED"."total_run_time": 31."rdzv_backend": "c10d"."raw_error": null, "metadata": "{\"group_world_size\": 1, \"entry_point\": \"worker_fn\", \"local_rank\": [0], \"role_rank\": [0], \"role_world_size\": [2]}"."agent_restarts": 0}}

{"name": "torchelastic.worker.status.SUCCEEDED"."source": "WORKER"."timestamp": 0."metadata": {"run_id": "7fbf85fe-b8b3-462e-887e-8121e3062e0b"."global_rank": 1."group_rank": 0."worker_id": "3276"."role": "trainer"."hostname": "DESKTOP-0GO3RPO"."state": "SUCCEEDED"."total_run_time": 31."rdzv_backend": "c10d"."raw_error": null, "metadata": "{\"group_world_size\": 1, \"entry_point\": \"worker_fn\", \"local_rank\": [1], \"role_rank\": [1], \"role_world_size\": [2]}"."agent_restarts": 0}}

{"name": "torchelastic.worker.status.SUCCEEDED"."source": "AGENT"."timestamp": 0."metadata": {"run_id": "7fbf85fe-b8b3-462e-887e-8121e3062e0b"."global_rank": null, "group_rank": 0."worker_id": null, "role": "trainer"."hostname": "DESKTOP-0GO3RPO"."state": "SUCCEEDED"."total_run_time": 31."rdzv_backend": "c10d"."raw_error": null, "metadata": "{\"group_world_size\": 1, \"entry_point\": \"worker_fn\"}"."agent_restarts": 0}}
Copy the code

4.2 the entrance

Let’s dig a little deeper with the code. Elastic_launch launches a Torchelastic Agent and then invoks the user entry through this agent. The agent will start the worker for training and manage the worker life cycle.

class elastic_launch:
    """ Launches an torchelastic agent on the container that invoked the entrypoint. 1. Pass the ``entrypoint`` arguments as  non ``kwargs`` (e.g. no named parameters)/ ``entrypoint`` can be a function or a command. 2. The return value is a map of each worker's output mapped by their respective global rank. """

    def __init__(
        self,
        config: LaunchConfig,
        entrypoint: Union[Callable.str.None].) :
        self._config = config
        self._entrypoint = entrypoint

    def __call__(self, *args, **kwargs) :
        return launch_agent(self._config, self._entrypoint, list(args)) The user program will be called internally
Copy the code

4.3 Starting the Agent

Launch_agent starts a LocalElasticAgent and invokes its run method.

@record
def launch_agent(
    config: LaunchConfig,
    entrypoint: Union[Callable.str.None],
    args: List[Any].) - >Dict[int.Any] :
    if not config.run_id:
        run_id = str(uuid.uuid4().int)
        config.run_id = run_id

    entrypoint_name = _get_entrypoint_name(entrypoint, args)

    rdzv_parameters = RendezvousParameters(
        backend=config.rdzv_backend,
        endpoint=config.rdzv_endpoint,
        run_id=config.run_id,
        min_nodes=config.min_nodes,
        max_nodes=config.max_nodes,
        **config.rdzv_configs,
    )

    agent = None
    rdzv_handler = rdzv_registry.get_rendezvous_handler(rdzv_parameters)
    master_addr, master_port = _get_addr_and_port(rdzv_parameters)
    try:
        spec = WorkerSpec( # 1. Get specs
            role=config.role,
            local_world_size=config.nproc_per_node,
            entrypoint=entrypoint,
            args=tuple(args),
            rdzv_handler=rdzv_handler, # RendezvousHandler
            max_restarts=config.max_restarts,
            monitor_interval=config.monitor_interval,
            redirects=config.redirects,
            tee=config.tee,
            master_addr=master_addr,
            master_port=master_port,
        )

        cfg = metrics.MetricsConfig(config.metrics_cfg) if config.metrics_cfg else None
        metrics.initialize_metrics(cfg)

        agent = LocalElasticAgent( # 2. Build the proxy
            spec=spec, start_method=config.start_method, log_dir=config.log_dir
        )

        result = agent.run() # 3. Start the agent
        events.record(agent.get_agent_status_event(WorkerState.SUCCEEDED))
        if result.is_failed():
            # ChildFailedError is treated specially by @record
            # if the error files for the failed children exist
            # @record will copy the first error (root cause)
            # to the error file of the launcher process.
            raise ChildFailedError(
                name=entrypoint_name,
                failures=result.failures,
            )
        else:
            return result.return_values
    except ChildFailedError:
        raise
    except Exception:
        if agent:
            events.record(agent.get_agent_status_event(WorkerState.FAILED))
        else:
            events.record(_construct_event(config))
        raise
    finally:
        rdzv_handler.shutdown()
Copy the code

Here are a few key points:

4.3.1 WorkerSpec

WorkerSpec: this is the configuration information that contains certain global information required by the agent, such as RendezvousHandler, role, and entry (user functions).

spec = {WorkerSpec} 
   args = {tuple: 2} (tensor, tensor)
   fn = {NoneType} None
   local_world_size = {int} 1
   master_addr = {NoneType} None
   master_port = {NoneType} None
   max_restarts = {int} 1
   monitor_interval = {int} 1
   rdzv_handler = {DynamicRendezvousHandler}
   redirects = {Std} Std.NONE
   role = {str} 'trainer'
   tee = {Std} Std.NONE
   entry = worker_fn
Copy the code

The agent extracts all the required information from here. For example, _start_workers gets a store from it.

use_agent_store = spec.rdzv_handler.get_backend() == "static"
Copy the code

The logic is as follows:

+--------------------------+ +---------------------------------------------------+ |LocalElasticAgent | | WorkerSpec | |  | | | | WorkerSpec +--------------> | rdzv_handler = {DynamicRendezvousHandler} --------+ | | | | | | rdzv_run_id | | entry = worker_fn | | | | | | | | store | | role = {str} 'trainer'                       |    |
|                          |      |                                                   |    |
|                          |      +---------------------------------------------------+    |
|                          |                                                               |
|                          |                                                               |
|                          |                                                               |
|                          |                                                               |
|                          |               +-----------------------------------------+     |
+--------------------------+               |DynamicRendezvousHandler                 |     |
                                           |                                         |     |
                                           |                                         |     |
                                           |   _settings: RendezvousSettings         | <---+
                                           |                                         |
                                           |   _store: Store                         |
                                           |                                         |
                                           |   _state_holder: _RendezvousStateHolder |
                                           |                                         |
                                           |   _op_executor: _RendezvousOpExecutor   |
                                           |                                         |
                                           +-----------------------------------------+
Copy the code

4.3.2 WorkerGroup

WorkerGroup represents a working group. As a whole, WorkerGroup manages multiple workers for batch processing.

class WorkerGroup:
    """ Represents the set of ``Worker`` instances for the given ``WorkerSpec`` managed by ``ElasticAgent``. Whether the worker group contains cross instance workers or not depends on the implementation of the agent. """

    __slots__ = ["spec"."workers"."store"."group_rank"."group_world_size"."state"]

    def __init__(self, spec: WorkerSpec) :
        self.spec = spec
        self.workers = [Worker(local_rank=i) for i in range(self.spec.local_world_size)]

        # assigned after rdzv
        self.store = None
        self.group_rank = None
        self.group_world_size = None

        self.state = WorkerState.INIT
Copy the code

During the SimpleElasticAgent initialization, a WorkerGroup is created.

class SimpleElasticAgent(ElasticAgent) :
    """ An ``ElasticAgent`` that manages workers (``WorkerGroup``) for a single ``WorkerSpec`` (e.g. one particular type of worker role). """

    def __init__(self, spec: WorkerSpec, exit_barrier_timeout: float = 300) :
        self._worker_group = WorkerGroup(spec)
        self._remaining_restarts = self._worker_group.spec.max_restarts
        self._store = None
        self._exit_barrier_timeout = exit_barrier_timeout
        self._total_execution_time = 0
Copy the code

Details are as follows:

+-----------------------------+ +------------------------------------------------+ | LocalElasticAgent | | WorkerSpec | | | | | | +------------------------+ | | rdzv_handler = {DynamicRendezvousHandler} -------+ | |WorkerGroup | | | | | | |  spec +--------------> | entry = worker_fn | | | | workers | | | | | | | store | | | role = {str} 'trainer'                       |   |
| |            group_rank  |  |      |                                                |   |
| |       group_world_size |  |      +------------------------------------------------+   |
| |                        |  |                                                           |
| +------------------------+  |                                                           |
|                             |                                                           |
| rdzv_run_id                 |                                                           |
| store                       |            +-----------------------------------------+    |
|                             |            |DynamicRendezvousHandler                 |    |
+-----------------------------+            |                                         |    |
                                           |                                         |    |
                                           |   _settings: RendezvousSettings         | <--+
                                           |                                         |
                                           |   _store: Store                         |
                                           |                                         |
                                           |   _state_holder: _RendezvousStateHolder |
                                           |                                         |
                                           |   _op_executor: _RendezvousOpExecutor   |
                                           |                                         |
                                           +-----------------------------------------+
Copy the code

4.4 Agent Running

SimpleElasticAgent is the base class of LocalElasticAgent, so it runs first to the workerSpec. run method, which invokes _invoke_run.

    @prof
    def run(self, role: str = DEFAULT_ROLE) -> RunResult:
        start_time = time.monotonic()
        try:
            result = self._invoke_run(role) # call
            self._total_execution_time = int(time.monotonic() - start_time)
            self._record_metrics(result)
            self._record_worker_events(result)
            return result
        finally:
            # record the execution time in case there were any exceptions during run.
            self._total_execution_time = int(time.monotonic() - start_time)
            self._shutdown()
Copy the code

4.5 Proxy main loop

The agent does the following in invoke_run:

  • Start _initialize_workers, where _rendezvous is used to build a rendezvous, and then call _start_workers to start workers.
  • Enter the while True loop, where:
    • _monitor_workers periodically monitors the running status of user programs to obtain the running results of customer processes, and then makes a judgment based on the situation.
      • Returns if the program ends normally.
      • If the program fails, retry, that is, restart all workers. If there is still a problem after the retry times, terminate all workers.
      • If the node membership changes, for example, scale up, there will be a new node in waiting, and then all workers will be restarted.
    def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
        # NOTE: currently only works for a single role

        spec = self._worker_group.spec
        role = spec.role

        self._initialize_workers(self._worker_group) # start the worker
        monitor_interval = spec.monitor_interval
        rdzv_handler = spec.rdzv_handler

        while True:
            assertself._worker_group.state ! = WorkerState.INIT# Regular monitoring
            time.sleep(monitor_interval)
            Monitor client running status
            run_result = self._monitor_workers(self._worker_group) Get the result of the process running
            state = run_result.state
            self._worker_group.state = state

            put_metric(f"workers.{role}.remaining_restarts", self._remaining_restarts)
            put_metric(f"workers.{role}.{state.name.lower()}".1)

            if state == WorkerState.SUCCEEDED:
                The program ends normally
                self._exit_barrier()
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                # program error
                if self._remaining_restarts > 0: # retry
                    self._remaining_restarts -= 1
                    self._restart_workers(self._worker_group)
                else:
                    self._stop_workers(self._worker_group) The number of retries reaches, terminating workers
                    self._worker_group.state = WorkerState.FAILED
                    self._exit_barrier()
                    return run_result
            elif state == WorkerState.HEALTHY:
                If the node membership changes, for example, scale up, there will be a new node waiting
                # membership changes do not count as retries
                num_nodes_waiting = rdzv_handler.num_nodes_waiting()
                group_rank = self._worker_group.group_rank
                Restart all workers if a new node is waiting
                if num_nodes_waiting > 0:
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")

Copy the code

The final logic is as follows:

+----------------------------------------------+
| LocalElasticAgent                            |
|                                              |    +---------------------------------------------------+
|  rdzv_run_id                                 |    | WorkerSpec                                        |
|                                              |    |                                                   |
|  store           +------------------------+  |    |      rdzv_handler = {DynamicRendezvousHandler} +-------+
|                  |WorkerGroup             |  |    |                                                   |    |
|  _pcontext       |            spec +------------> |      entry = worker_fn                            |    |
|                  |            workers     |  |    |                                                   |    |
|                  |            store       |  |    |      role = {str} 'trainer'                       |    |
|                  |            group_rank  |  |    |                                                   |    |
|                  |       group_world_size |  |    +---------------------------------------------------+    |
|                  |                        |  |                                                             |
|                  +------------------------+  |                                                             |
|  +----------------------------------------+  |                                                             |
|  | _invoke_run                            |  |                                                             |
|  |                                        |  |             +-----------------------------------------+     |
|  |   _initialize_workers +------------------------+        |DynamicRendezvousHandler                 |     |
|  |                                        |  |    |        |                                         |     |
|  |                                        |  |    |        |                                         |     |
|  |   while True: | | | | _settings: RendezvousSettings | <---+ | | _monitor_workers(_worker_group) | | | | | | | + | | | | _store: Store | | | | _pcontext.wait | | | | | | | | | | | | _state_holder: _RendezvousStateHolder | | +----------------------------------------+ | | | | | | | | | _op_executor: _RendezvousOpExecutor | +----------------------------------------------+ | | | | | +-----------------------------------------+ | | v v +-------------------------------------------------+ | +------------+  +------------+ +------------+ | | |Process | |Process | |Process | | | | | | | | | | | | work_fn | | work_fn | | work_fn | | | | | | | | | | | +------------+ +------------+ +------------+ | +-------------------------------------------------+Copy the code

The mobile phone is as follows:

Now that we have finished our analysis of how the script starts and the singleton process, we will look specifically at the agent in the next article.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch Elastic read the source code