0 x00 the

In previous articles, we studied the basic distributed modules of PyTorch and introduced some official examples. We will cover the Elastic training of PyTorch in this fourth installment, which will look at the structure and overall logic of Rendezvous.

The resilience training series is as follows:

PyTorch distributed elastic training (1) — the general idea

PyTorch Distributed Elastic Training (2)– Startup & Single node flow

PyTorch Distributed elastic training (3)– proxy

0x01 General Background

TE is a combination of multiple Elastic agents based on Rendezvous, which is a separation of functions. Let’s compare.

  • The Agent focuses on the logic of a specific node.
    • The Agent is responsible for specific service log-related operations, such as starting processes to execute user programs, monitoring the running status of user programs, and notifying Rendezvous of anomalies.
    • Agent is a worker manager, responsible for starting/managing workers process, forming a worker group, monitoring the running status of workers and capturing invalid workers. If there is a fault/new worker, Restart the worker group.
    • The Agent is responsible for maintaining WORLD_SIZE and RANK information. Users do not need to manually provide these services. The Agent handles them automatically.
    • Agent is an independent background process on a specific node. Agents cannot achieve overall elastic training on their own, so a mechanism is needed to realize mutual discovery and change synchronization among workers (WORLD_SIZE and RANK also require synchronization of multiple nodes), which is the following Rendezvous concept.
  • Rendezvous is responsible for the cluster logic that ensures a strong consensus among nodes on “which nodes participate in training.”
    • Each Agent contains an internal Rendezvous handler, which collectively constitutes a Rendezvous cluster and thus an Agent cluster.
    • Upon Rendezvous, a shared key-value store was created that implements a shared key-value storetorch.distributed.StoreAPI. This storage, shared only by members who have completed Rendezvous, is intended to allow Torch Distributed Elastic to exchange control and data information during initialization operations.
    • Rendezvous is responsible for maintaining all relevant information about the current group on each agent. Each agent will have a rendezvous, which will communicate with each other and generally maintain a set of information stored in the aforementioned stores.
    • Rendezvous is responsible for cluster logic, such as adding new nodes, removing nodes, assigning ranks, and so on.

0x02 Basic Concepts

In the Torch Distributed Elastic context, the term rendezvous was used to refer specifically to a specific feature: A distributed synchronization primitive that combines peer Discovery.

It can be understood as a distributed governance process: Rendezvous was used by Torch Distributed Elastic to gather participants (nodes) for a training job, so that participants could negotiate a list of participants and each participant’s role, and make a consistent collective decision as to when training should start/resume. That is, rendezvous was used to reach a consensus among participants, assign rank to each participant, local rank to each participant, notify the world of size, etc., and re-rendezvous would be performed when flexible scaling was required or a failure occurred.

In order to achieve elastic training, there needs to be a mechanism for nodes/processes to discover each other. In TorchElastic, Rendezvous is the discovery mechanism or synchronization component, which is used as a distributed synchronization (governance) mechanism for peer-to-peer discovery to synchronize and collect information of all workers, including node list and worker roles of all nodes. Then each Agent can jointly decide the beginning, end and recovery of training.

Image from PyTorch source.

Or use the TE source image to make it clear that there are three nodes.

Rendezvous will provide the following segmentation features.

2.1 the Barrier

All nodes performing rendezvous will block until rendezvous completion, that is, at least min of nodes (for the same job) have been added to the Barrier, which means that for a fixed number of nodes, the Barrier is not necessary.

Rendezvous will not be declared complete immediately after the number of “min” is reached, but will wait a little extra time to ensure that Rendezvous will not be completed “too quickly,” because if it is completed immediately, it will miss nodes that were joined only a little bit slower. Rendezvous will be immediately completed if Max nodes are gathered at the Barrier.

In addition, there is a total timeout configuration: rendezvous will fail if min of nodes are not reached within the timeout, which is a simple fail-safe solution to help free up some of the allocated job resources and prevent resource waste.

2.2 Exclusivity

A simple distributed barrier is not enough, because we also need to ensure that only one set of nodes exists at any given time (for a given job). In other words, for the same job, new nodes (i.e. nodes added later) cannot form a new parallel independent worker group.

Torch Distributed Elastic ensures that if a set of nodes has already completed Rendezvous (and may already be training), other “late” nodes that attempt to join will only be considered in a waiting state and must wait until the existing Rendezvous is terminated.

2.3 Consistency

Upon rendezvous completion, all its members will agree on work membership and each person’s role within it. This role is represented by an integer between 0 and world size and is called a rank.

Note that ranks are unstable, for example, the same node may be assigned a different rank on the next (re) rendezvous.

2.4 Fault-tolerance

Torch Distributed Elastic Rendezvous has error tolerance during rendezvous:

  • Between the start of join Rendezvous and the completion of Rendezvous, if a process crashes (or a network failure, etc.), a Re-Rendezvous is automatically initiated and the remaining healthy nodes are automatically reorganized.

  • Nodes may also fail after rendezvous completion (or be observed to fail by other nodes), a scenario handled by Torch Distributed Elastic Train_loop, which will also trigger a re-rendezvous without interruption.

2.5 Shared Key-value Storage

Upon Rendezvous, a shared key-value store will be created and returned to Node. This store implements a torch. Distributed. Store API (see https://pytorch.org/docs/stable/distributed.html).

This storage is shared only by members who have completed Rendezvous and is used by Torch Distributed Elastic to exchange information necessary to initialize job controls and data planes.

2.6 Waiting for worker and Rendezvous to close

Torch Distributed Elastic Rendezvous Handler provides additional features:

  • Query how many workers joined after the barrier and they will join at the next rendezvous.
  • Rendezvous is set to shut down to notify all nodes that they will not participate in the next rendezvous.

2.7 DynamicRendzvousHandler

Torch Distributed Elastic provides the DynamicRendzvousHandler class, which implements the aforementioned Rendezvous Mechanism.

This class required us to specify the RendezvousBackend at build time. Users can implement the backend themselves, or use one of the following PyTorch companion implementations:

  • C10dRendezvousBackend, which uses C10d storage (TCPStore by default) as rendezvous backend, has the advantage of not relying on third parties, such as etcd, to build a rendezvous backend.
  • EtcdRendezvousBackend, which uses the classes EtcdRendezvousBackend, EtcdRendezvousBackend, etc. to accomplish rendezvous based on etcd. The disadvantage is that etcd needs to be built.

Such as:

     store = TCPStore("localhost")
     backend = C10dRendezvousBackend(store, "my_run_id")
     rdzv_handler = DynamicRendezvousHandler.from_backend(
         run_id="my_run_id",
         store=store,
         backend=backend,
         min_nodes=2,
         max_nodes=4
     )
Copy the code

2.8 Problems & Design

Knowing what we needed to implement allowed us to think about what internal modules Rendezvous should have to meet those requirements.

  • You need to have a node concept in order to express the system.
  • There needs to be a concept of state, which is the state of the node.
  • There needs to be an overall static class that keeps nodes, states, and other information together.
  • There needs to be a shared shared key-value store that can hold the above information centrally or be used to exchange information with each other for consensus.
  • You need a dynamic server, or handler, that provides a set of apis for external access.

Let’s look at the static structure first, and then the dynamic logic.

0x03 Static Structure

Let’s look at the supporting systems. Rendezvous will be available within Elastic, which will be different from distributed Rendezvous. Distributed’s original Rendezvous was a simple KV storage. Elastic Rendezvous was much more complex.

We took a closer look at the Rendezvous support system.

3.1 Startup Parameters

Rendezvous sparameters are parameters required to build RendezvousHandler.

  • Backend: backend name.
  • Endpoint: indicates the endpoint. The format is [:].
  • Run_id: rendezvous ID.
  • Min_nodes: indicates the minimum number of rendezvous nodes.
  • Max_nodes: indicates the maximum number of rendezvous nodes.
  • Kwargs: Additional parameter on the back end.
class RendezvousParameters:
    """Holds the parameters to construct a :py:class:`RendezvousHandler`. Args: backend: The name of the backend to use to handle the rendezvous. endpoint: The endpoint of the rendezvous, usually in form 
      
       [:
       
        ]. run_id: The id of the rendezvous. min_nodes: The minimum number of nodes to admit to the rendezvous. max_nodes: The maximum number of nodes to admit to the rendezvous. **kwargs: Additional parameters for the specified backend. """
       
      

    def __init__(
        self,
        backend: str,
        endpoint: str,
        run_id: str,
        min_nodes: int,
        max_nodes: int,
        **kwargs,
    ) :
        if not backend:
            raise ValueError("The rendezvous backend name must be a non-empty string.")

        if min_nodes < 1:
            raise ValueError(
                f"The minimum number of rendezvous nodes ({min_nodes}) must be greater than zero."
            )
        if max_nodes < min_nodes:
            raise ValueError(
                f"The maximum number of rendezvous nodes ({max_nodes}) must be greater than or "
                f"equal to the minimum number of rendezvous nodes ({min_nodes})."
            )

        self.backend = backend
        self.endpoint = endpoint
        self.run_id = run_id
        self.min_nodes = min_nodes
        self.max_nodes = max_nodes
        self.config = kwargs
Copy the code

3.2 configuration

The RendezvousSettings class was used to store the Rendezvous configuration. You can think of it as static meta information.

  • Run_id: rendezvous ID.
  • Min_nodes: indicates the minimum number of rendezvous nodes.
  • Max_nodes: indicates the maximum number of rendezvous nodes.
  • Timeout: indicates the timeout period.
  • Keep_alive_interval: indicates the amount of time a node waits before sending heartbeat messages.
  • Keep_alive_max_attempt: indicates the maximum number of heartbeat retries.
@dataclass(repr=False, eq=False, frozen=True)
class RendezvousSettings:
    """Holds the settings of the rendezvous. Attributes: run_id: The run id of the rendezvous. min_nodes: The minimum number of nodes to admit to the rendezvous. max_nodes: The maximum number of nodes to admit to the rendezvous. timeout: The timeout configuration of the rendezvous. keep_alive_interval: The amount of time a node waits before sending a heartbeat to keep it alive in the rendezvous. keep_alive_max_attempt: The maximum number of failed heartbeat attempts after which a node is considered dead. """

    run_id: str
    min_nodes: int
    max_nodes: int
    timeout: RendezvousTimeout
    keep_alive_interval: timedelta
    keep_alive_max_attempt: int
Copy the code

3.3 state

RendezvousState is the state of rendezvous. Is dynamic information, and each node maintains a local state.

  • Round: current Rendezvous round

  • Complete: A Boolean value indicating whether the current round was completed for rendezvous.

  • Deadline: indicates the end of the queue. If this parameter is set, it indicates the end of the queue.

  • Closed: A Boolean value indicating whether rendezvous was closed.

  • Participants have a dictionary structure that holds participants and their corresponding ranks.

  • Wait_list: set structure that holds the set of nodes waiting to participate in the next rendezvous operation

  • Last_heartbeats: dictionary that contains the last heartbeat time of each node.

class _RendezvousState:
    """Holds the state of a rendezvous. Attributes: round: The current round of the rendezvous. complete: A boolean value indicating whether the current round of the rendezvous is complete. deadline: The time at which the current round of the rendezvous will be considered complete if it is still waiting for nodes to join. closed: A boolean value indicating whether the rendezvous is closed. participants: A dictionary of the participants and their corresponding ranks. wait_list: A set of nodes that are waiting to participate in the next round of the rendezvous. last_heartbeats: A dictionary containing each node's last heartbeat time. """

    round: int
    complete: bool
    deadline: Optional[datetime]
    closed: bool
    participants: Dict[_NodeDesc, int]
    wait_list: Set[_NodeDesc]
    last_heartbeats: Dict[_NodeDesc, datetime]

    def __init__(self) - >None:
        self.round = 0
        self.complete = False
        self.deadline = None
        self.closed = False
        self.participants = {}
        self.wait_list = set()
        self.last_heartbeats = {}

Copy the code

3.4 the node

_NodeDesc is a node of rendezvous.

@dataclass(eq=True, order=True, frozen=True)
class _NodeDesc:
    """Describes a node in the rendezvous. Attributes: fqdn: The FQDN of the node. pid: The id of the process in which the rendezvous handler runs. local_id: A process-wide unique id. """

    fqdn: str
    pid: int
    local_id: int

    def __repr__(self) - >str:
        return f"{self.fqdn}_{self.pid}_{self.local_id}"
Copy the code

3.5 the back-end

In PyTorch, backend refers to the communication backends used by the current process. Generally, supported communication backends are Gloo, MPI, and NCCL. NCCL is recommended.

In resilient training, DynamicRendezvousHandler required us to specify the RendezvousBackend at build time. Users can implement the backend themselves, or use one of the following PyTorch companion implementations:

  • C10dRendezvousBackend, which uses C10d storage (TCPStore by default) as rendezvous backend, has the advantage of not relying on third parties, such as etcd, to build a rendezvous backend.
  • EtcdRendezvousBackend, which uses the classes EtcdRendezvousBackend, EtcdRendezvousBackend, etc to accomplish based on etcd.

Because ETCD RendezvousBackend must rely on ETCD and requires an ETCD cluster to be installed, the C10D backend is recommended for better usability. Let’s focus on the C10D back end.

The C10d backend is based on a TCPStore and synchronizes data over TCP. TCPStore, which we introduced in previous articles, is a TCP-based distributed key-value storage implementation (similar to Redis). In a typical client-server architecture, the server stores/stores data, and the storage client can connect to the server via TCP and perform operations such as set() to insert key-value pairs and GET () to retrieve key-value peers.

Therefore, for the C10D backend, one of the agents will run on top of the TCPStore Master, which will listen to ports, provide apis, and Rendezvous synchronization operations will be connected to this centralized TCPStore Master by the agents.

The details can be shown in the following figure, from Zhihu BobLiu.

3.5.1 track of use

The following figure shows how to configure the back end

     store = TCPStore("localhost")
     backend = C10dRendezvousBackend(store, "my_run_id") The backend is configured

     rdzv_handler = DynamicRendezvousHandler.from_backend(
         run_id="my_run_id",
         store=store,
         backend=backend,
         min_nodes=2,
         max_nodes=4
     )
Copy the code

3.5.2 base class

Let’s first look at the base class RendezvousBackend. This is a virtual class whose main function is to set and get State.

class RendezvousBackend(ABC) :
    """Represents a backend that holds the rendezvous state."""

    @property
    @abstractmethod
    def name(self) - >str:
        """Gets the name of the backend."""

    @abstractmethod
    def get_state(self) - >Optional[Tuple[bytes, Token]]:
        """Gets the rendezvous state. Returns: A tuple of the encoded rendezvous state and its fencing token or ``None`` if no state is found in the backend. """

    @abstractmethod
    def set_state(
        self, state: bytes, token: Optional[Token] = None
    ) - >Optional[Tuple[bytes, Token, bool]] :
        """Sets the rendezvous state. The new rendezvous state is set conditionally: - If the specified ``token`` matches the fencing token stored in the backend, the state will be updated. The new state will be returned to the caller along with its fencing token. - If the specified  ``token`` does not match the fencing token stored in the backend, the state won't be updated; instead the existing state along with its fencing token will be returned to the caller. - If the specified ``token`` is ``None``, the new state will be set only if there is no existing state in the backend. Either the new state or the existing state along with its fencing token will be returned to the caller. Args: state: The encoded rendezvous state. token: An optional fencing token that was retrieved by a previous call to :py:meth:`get_state` or ``set_state()``. Returns: A tuple of the serialized rendezvous state, its fencing token, and a boolean value indicating whether our set attempt succeeded. Raises: RendezvousConnectionError: The connection to the backend has failed. RendezvousStateError: The rendezvous state is corrupt. """
Copy the code

3.5.3 create

Here’s how to create a back end. The TCP store is generated and C10dRendezvousBackend is called.

def create_backend(params: RendezvousParameters) - >Tuple[C10dRendezvousBackend, Store]:
    """Creates a new :py:class:`C10dRendezvousBackend` from the specified parameters. +--------------+-----------------------------------------------------------+ | Parameter | Description | +==============+===========================================================+ | store_type | The type of the C10d store. As of today the only | | | supported type is "tcp" which corresponds to | | | :py:class:`torch.distributed.TCPStore`. Defaults to "tcp".| +--------------+-----------------------------------------------------------+ | read_timeout | The read timeout, in seconds, for store operations. | | | Defaults to 60 seconds. | +--------------+-----------------------------------------------------------+ | is_host | A boolean value indicating whether this backend instance | | | will host the C10d store. If not specified it will be | | | inferred heuristically by matching the hostname or the IP | | | address of this machine against the specified rendezvous | | | endpoint. Defaults to ``None``. | | | | | | Note that this configuration option only applies to | | | :py:class:`torch.distributed.TCPStore`. In normal | | | circumstances you can safely skip it; the only time when | | | it is needed is if its value cannot be correctly | | | determined (e.g. the rendezvous endpoint  has a CNAME as | | | the hostname or does not match the FQDN of the machine). | + -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "+" "
    # As of today we only support TCPStore. Other store types do not have the
    # required functionality (e.g. compare_set) yet.
    store_type = params.get("store_type"."tcp").strip().lower()
    ifstore_type ! ="tcp":
        raise ValueError("The store type must be 'tcp'. Other store types are not supported yet.")

    store = _create_tcp_store(params)

    return C10dRendezvousBackend(store, params.run_id), store
Copy the code
3.5.3.1 TCPStore

_create_tcp_store Creates a TCPStore.

def _create_tcp_store(params: RendezvousParameters) -> TCPStore:
    host, port = parse_rendezvous_endpoint(params.endpoint, default_port=29400)

    cfg_is_host = params.get_as_bool("is_host") Get the configuration and see
    # If the user has explicitly specified whether our process should host the
    # the store, respect it.
    if cfg_is_host is not None: If configured, use it
        is_host = cfg_is_host
    # Otherwise try to determine whether we are the host based on our hostname
    # and IP address.
    else: Check whether the host is host
        is_host = _matches_machine_hostname(host) 

    # The timeout
    read_timeout = cast(int, params.get_as_int("read_timeout".60))
    if read_timeout <= 0:
        raise ValueError("The read timeout must be a positive integer.")

    # In specific cases we attempt to instantiate the store twice. For details
    # see the explanation in the except clause below.
    for is_server in [is_host, False] :try:
            store = TCPStore(  # type: ignore[call-arg]
                host, port, is_master=is_server, timeout=timedelta(seconds=read_timeout)
            )

            if is_server:
                log.info(
                    f"Process {os.getpid()} hosts the TCP store for the C10d rendezvous backend."
                )

            break
        except (ValueError, RuntimeError) as exc:
            # If we heuristically inferred the value of is_host as True and our
            # first attempt to instantiate the TCP store has failed, try it one
            # more time with is_host set to False. As an edge case there can be
            # more than one process that is part of the same rendezvous on this
            # machine and only one of them will eventually host the store.

            if not is_server or cfg_is_host is not None:
                raise RendezvousConnectionError(
                    "The connection to the C10d store has failed. See inner exception for details."
                ) from exc

    return store
Copy the code
3.5.3.2 C10dRendezvousBackend

As you can see, the core of C10dRendezvousBackend is a Store for storing relevant information. The following code is simplified to read and write the Store using set_state and get_state.

class C10dRendezvousBackend(RendezvousBackend) :
    """Represents a C10d-backed rendezvous backend. Args: store: The :py:class:`torch.distributed.Store` instance to use to communicate with the C10d store. run_id: The run id of the rendezvous. """

    # See the explanation in the __init__ method.
    _NULL_SENTINEL = "Y2FuaW1hZGFt"

    _store: Store
    _key: str

    def __init__(self, store: Store, run_id: str) - >None:
        if not run_id:
            raise ValueError("The run id must be a non-empty string.")

        self._store = store
        self._key = "torch.rendezvous." + run_id

        # The read operation of a store blocks the caller until the specified
        # key becomes available. This behavior makes it tricky to use a store
        # as a regular key-value dictionary.
        #
        # As a workaround we initially set a sentinel value as the rendezvous
        # state. Whenever this value gets returned we treat it as a None.
        self._call_store("compare_set", self._key, "", self._NULL_SENTINEL)

    @property
    def name(self) - >str:
        """See base class."""
        return "c10d"

    def get_state(self) - >Optional[Tuple[bytes, Token]]:
        """See base class."""
        Read data from store
        base64_state: bytes = self._call_store("get", self._key)
        return self._decode_state(base64_state)

    def set_state(
        self, state: bytes, token: Optional[Token] = None
    ) - >Optional[Tuple[bytes, Token, bool]] :
        """See base class."""
        base64_state_str: str = b64encode(state).decode()

        if token:
            # Shortcut if we know for sure that the token is not valid.
            if not isinstance(token, bytes):
                result = self.get_state()
                if result is not None:
                    tmp = *result, False
                    # Python 3.6 does not support tuple unpacking in return
                    # statements.
                    return tmp
                return None

            token = token.decode()
        else:
            token = self._NULL_SENTINEL

        Insert data into store
        base64_state: bytes = self._call_store("compare_set", self._key, token, base64_state_str)

        state_token_pair = self._decode_state(base64_state)
        if state_token_pair is None:
            return None

        new_state, new_token = state_token_pair

        # C10d Store's compare_set method does not offer an easy way to find out
        # whether our write attempt was successful. As a brute-force solution we
        # perform a bitwise comparison of our local state and the remote state.
        return new_state, new_token, new_state == state

    def _call_store(self, store_op: str, *args, **kwargs) - >Any:
        return getattr(self._store, store_op)(*args, **kwargs)

    def _decode_state(self, base64_state: bytes) - >Optional[Tuple[bytes, Token]]:
        if base64_state == self._NULL_SENTINEL.encode():
            return None
        state = b64decode(base64_state)
        return state, base64_state

Copy the code

3.6 StateHolder

3.6.1 _RendezvousStateHolder

The purpose of this class is to store rendezvous state synchronized with other nodes, but a derived class is required to perform the function.

class _RendezvousStateHolder(ABC) :
    """Holds the shared rendezvous state synced with other nodes."""

    @property
    @abstractmethod
    def state(self) -> _RendezvousState:
        """Gets the local state."""

    @abstractmethod
    def sync(self) - >Optional[bool] :
        """Reads or writes the latest state. Returns: A boolean value indicating whether the local state, in case marked as dirty, was successfully synced with other nodes. """

    @abstractmethod
    def mark_dirty(self) - >None:
        """Marks the local state as dirty."""

Copy the code

3.6.2 _BackendRendezvousStateHolder

_BackendRendezvousStateHolder _RendezvousStateHolder has been expanded. Sync is a call to the internal back end that reads and writes to the store.

class _BackendRendezvousStateHolder(_RendezvousStateHolder) :
    """Holds the rendezvous state synced with other nodes via a backend. Args: backend: The rendezvous backend to use. settings: The rendezvous settings. cache_duration: The amount of time, in seconds, to cache the last rendezvous state before requesting it from the backend again. """

    _backend: RendezvousBackend
    _state: _RendezvousState
    _settings: RendezvousSettings
    _cache_duration: int
    _token: Token
    _dirty: bool
    _last_sync_time: float
    _dead_nodes: List[_NodeDesc]

    def __init__(
        self, backend: RendezvousBackend, settings: RendezvousSettings, cache_duration: int = 1
    ) - >None:self._backend = backend self._state = _RendezvousState() self._settings = settings self._cache_duration = cache_duration  self._token =None
        self._dirty = False
        self._last_sync_time = -1
        self._dead_nodes = []

    @property
    def state(self) -> _RendezvousState:
        """See base class."""
        return self._state

    def sync(self) - >Optional[bool] :
        """See base class."""
        state_bits: Optional[bytes] = None
        token = None
        has_set: Optional[bool]

        if self._dirty:
            has_set = False

            state_bits = pickle.dumps(self._state)

            The backend is set here
            set_response = self._backend.set_state(state_bits, self._token)
            if set_response is not None:
                state_bits, token, has_set = set_response
        else:
            has_set = None

            if self._cache_duration > 0:
                # Avoid overloading the backend if we are asked to retrieve the
                # state repeatedly. Try to serve the cached state.
                if self._last_sync_time >= max(time.monotonic() - self._cache_duration, 0) :return None

            get_response = self._backend.get_state()
            if get_response is not None:
                state_bits, token = get_response

        if state_bits is not None:
            try:
                self._state = pickle.loads(state_bits)
            except pickle.PickleError as exc:
                raise RendezvousStateError(
                    "The rendezvous state is corrupt. See inner exception for details."
                ) from exc
        else:
            self._state = _RendezvousState()

        if has_set and self._dead_nodes and log.isEnabledFor(logging.DEBUG):
            node_list = ",".join(f"'{dead_node}'" for dead_node in self._dead_nodes)

        self._token = token
        self._dirty = False
        self._last_sync_time = time.monotonic()
        self._sanitize()

        return has_set

    def _sanitize(self) - >None:
        expire_time = datetime.utcnow() - (
            self._settings.keep_alive_interval * self._settings.keep_alive_max_attempt
        )

        # Filter out the dead nodes.
        self._dead_nodes = [
            node
            for node, last_heartbeat in self._state.last_heartbeats.items()
            if last_heartbeat < expire_time
        ]

        for dead_node in self._dead_nodes:
            del self._state.last_heartbeats[dead_node]

            try:
                del self._state.participants[dead_node]
            except KeyError:
                pass

            try:
                self._state.wait_list.remove(dead_node)
            except KeyError:
                pass

    def mark_dirty(self) - >None:
        """See base class. If the local rendezvous state is dirty, the next sync call will try to write the changes back to the backend. However this attempt might fail if another node, which had the same state, also made changes and wrote them before us. """
        self._dirty = True
Copy the code

3.6.3 How can I Use it

StateHolder specific how to use among _DistributedRendezvousOpExecutor (the code) :

  • Synchronize various states via _state_holder.sync(), as the latest state is at rendezvous.
  • Get the latest state from self._state_holder.state.
  • Service processing.
  • Synchronize your state to other nodes again with _state_holder.mark_dirty()
def run(
    self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
) - >None:
    """See base class."""
    action = None

    whileaction ! = _Action.FINISH:# Reads or writes the latest rendezvous state shared by all nodes in
        # the rendezvous. Note that our local changes might get overridden
        # by another node if that node synced its changes before us.
        
        has_set = self._state_holder.sync()  We need to synchronize the various states here, because the latest state is on rendezvous.

        self._state = self._state_holder.state Get the latest status
        ctx = _RendezvousContext(self._node, self._state, self._settings)

        # Determine the next action to take based on the current state of
        # the rendezvous.
        action = state_handler(ctx, deadline) 

        # omit some code

        if action == _Action.SYNC:
            # Delay the execution by one second to avoid overloading the
            # backend if we are asked to poll for state changes.
            _delay(seconds=1)
        else:
            if action == _Action.KEEP_ALIVE:
                self._keep_alive()
            elif action == _Action.ADD_TO_PARTICIPANTS:
                self._add_to_participants()
            elif action == _Action.ADD_TO_WAIT_LIST:
                self._add_to_wait_list()
            elif action == _Action.REMOVE_FROM_PARTICIPANTS:
                self._remove_from_participants()
            elif action == _Action.REMOVE_FROM_WAIT_LIST:
                self._remove_from_wait_list()
            elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
                self._mark_rendezvous_complete()
            elif action == _Action.MARK_RENDEZVOUS_CLOSED:
                self._mark_rendezvous_closed()

            # Attempt to sync our changes back to other nodes.
            self._state_holder.mark_dirty() Synchronize state to other nodes
Copy the code

3.7 summarize

We summarized below the current logic, two _BackendRendezvousStateHolder through TCPStore information interaction.

+ +-------------------------------+ | +-------------------------------+ | _BackendRendezvousStateHolder | | | _BackendRendezvousStateHolder | | | +-------------------+ | +--------------------+ | | | _settings +-----------> | RendezvousSettings| | | RendezvousSettings | <----------+ _settings | | | +-------------------+ | +--------------------+  | | | | +-------------------+ | +--------------------+ | | | _state +--------------> | _RendezvousState | | | _RendezvousState | <----------+ _state | | | | | | | | | | | | +-------------------+ | +--------------------+ | | | | | | | | | +-----------------------+ + +----------------------+ | | | _backend +------------> | C10dRendezvousBackend | | C10dRendezvousBackend| <-------+ _backend | | | | | +---------+ | | | | | | | _store +-----> |TCPStore | <---------+ _store | | | | | | | | | | | | | | | +-----------------------+ +---------+ +----------------------+ | | | | | | | | ^ + ^ | | | | | | | | | | | | | | | | | sync +----------------------+ | +---------------------+ sync | | | set_state | set_state | | +-------------------------------+ + +-------------------------------+Copy the code

The mobile phone is as follows:

0x04 Dynamic Logic

4.1 the entrance

We’ll first look at how Rendezvous is used.

Launch_agent starts a LocalElasticAgent and invokes its run method. Before calling run, rDZv_handler is generated and set to WorkerSpec.

import torch.distributed.elastic.rendezvous.registry as rdzv_registry

@record
def launch_agent(
    config: LaunchConfig,
    entrypoint: Union[Callable.str.None],
    args: List[Any].) - >Dict[int.Any] :

    rdzv_parameters = RendezvousParameters(
        backend=config.rdzv_backend,
        endpoint=config.rdzv_endpoint,
        run_id=config.run_id,
        min_nodes=config.min_nodes,
        max_nodes=config.max_nodes,
        **config.rdzv_configs,
    )

    # rdzv_handler is built
    rdzv_handler = rdzv_registry.get_rendezvous_handler(rdzv_parameters)

    try:
        spec = WorkerSpec(
            role=config.role,
            local_world_size=config.nproc_per_node,
            entrypoint=entrypoint,
            args=tuple(args),
            rdzv_handler=rdzv_handler, Rdzv_handler is set here
            max_restarts=config.max_restarts,
            monitor_interval=config.monitor_interval,
            redirects=config.redirects,
            tee=config.tee,
            master_addr=master_addr,
            master_port=master_port,
        )

        agent = LocalElasticAgent( # building
            spec=spec, start_method=config.start_method, log_dir=config.log_dir
        )

        result = agent.run() # start agent
    except ChildFailedError:
Copy the code

Self._rendezvous (worker_group) is eventually called from the run function, and next_rendezvous() is called from the _rendezvous method to handle membership changes.

    @prof
    def _rendezvous(self, worker_group: WorkerGroup) - >None:
        r""" Runs rendezvous for the workers specified by worker spec. Assigns workers a new global rank and world size. Updates  the rendezvous store for the worker group. """

        spec = worker_group.spec
        store, group_rank, group_world_size = spec.rdzv_handler.next_rendezvous()
        
				# omit subsequent code
Copy the code

In this process, rdzv_registry. Get_rendezvous_handler (rdzv_parameters) is the original source, so we’ll look at get_rendezvous_handler. The get_rendezvous_handler returns rendezvous shandler, so rendezvous shandler and rendezvous_handler_registry are primary.

from .api import rendezvous_handler_registry as handler_registry

def get_rendezvous_handler(params: RendezvousParameters) -> RendezvousHandler:
    """ This method is used to obtain a reference to a :py:class`RendezvousHandler`. Custom rendezvous handlers can be registered by :: from torch.distributed.elastid.rendezvous import rendezvous_handler_registry from torch.distributed.elastic.rendezvous.registry import get_rendezvous_handler def create_my_rdzv(params: RendezvousParameters): return MyCustomRdzv(params) rendezvous_handler_registry.register("my_rdzv_backend_name", create_my_rdzv) my_rdzv_handler = get_rendezvous_handler("my_rdzv_backend_name", RendezvousParameters) """
    return handler_registry.create_handler(params)
Copy the code

RendezvousHandler and rendezvous handler_registry will be our next steps.

4.2 RendezvousHandler base class

The RendezvousHandler was used to execute the business logic, and the virtual functions are:

  • Next_rendezvous: The primary entry point to the rendezvous barrier where new nodes will wait until the current rendezvous terminates, either times out, or the current rendezvous is identified as closed.
  • Is_closed: Whether rendezvous is closed, if rendezvous is closed, it means all attempts to re-rendezvous will fail.
  • Num_nodes_waiting: Returns the number of current phases waiting at the rendezvous barrier that are not part of the current workgroup. Users should call this method periodically to check if there are any new nodes waiting to join the workgroup, and if so, call itnext_rendezvous()(re – rendezvous. For the next re-rendezvous.

The specific code is as follows:

class RendezvousHandler(ABC) :
    """Main rendezvous interface. Note: Distributed Torch users normally **do not** need to implement their own ``RendezvousHandler``. An implementation based on C10d Store is already provided, and is recommended for most users. """

    Get the rendezvous Backend name
    @abstractmethod
    def get_backend(self) - >str:
        """Returns the name of the rendezvous backend."""

    The primary entry point to the # rendezvous barrier, where new nodes will wait until the current rendezvous terminates, either times out, or the current rendezvous is identified as closed.
    @abstractmethod
    def next_rendezvous(
        self,
    ) - >Tuple[Store, int.int] :
        """Main entry-point into the rendezvous barrier. Blocks until the rendezvous is complete and the current process is included in the formed worker group, or a timeout occurs, or the rendezvous was marked closed. Returns: A tuple of :py:class:`torch.distributed.Store`, ``rank``, and ``world size``. """

    # whether rendezvous has ended, if rendezvous ends, it means all attempts to re-rendezvous will fail
    @abstractmethod
    def is_closed(self) - >bool:
        """Checks whether the rendezvous has been closed. A closed rendezvous means all future attempts to re-rendezvous within same job will fail. """

    @abstractmethod
    def set_closed(self) :
        """Marks the rendezvous as closed."""

    # return the number of current phases waiting at the rendezvous Barrier that are not part of the current workgroup. Users should call this method periodically to check for new nodes waiting to join the workgroup and, if so, to call 'next_rendezvous()' (re-rendezvous). For the next re-rendezvous.
    @abstractmethod
    def num_nodes_waiting(self) - >int:
        """Returns the number of nodes who arrived late at the rendezvous barrier, hence were not included in the current worker group. Callers should periodically call this method to check whether new nodes are waiting to join the job and if so admit them by calling :py:meth:`next_rendezvous()` (re-rendezvous). """

    @abstractmethod
    def get_run_id(self) - >str:
        """Returns the run id of the rendezvous. The run id is a user-defined id that uniquely identifies an instance of a distributed application. It typically maps to a job id and is used to allow nodes to join the correct distributed application. """

    def shutdown(self) - >bool:
        """Closes all resources that were open for the rendezvous. """
Copy the code

4.3 registered

Let’s look at rendezvous_handler_registry next.

In the torch/distributed/elastic/rendezvous/API. Py has the following code.

# The default global registry instance used by launcher scripts to instantiate
# rendezvous handlers.
rendezvous_handler_registry = RendezvousHandlerRegistry()
Copy the code

So we came to the RendezvousHandlerRegistry.

4.3.1 RendezvousHandlerRegistry

RendezvousHandlerRegistry is a responsible for creating RendezvousHandler factory class.

  • Register is simply adding the corresponding builder to the internal dictionary.
  • Create_handler fetches the corresponding builder based on the key.
  • Rendezvous_handler_registry is the global Registry.
class RendezvousHandlerRegistry:
    """Represents a registry of :py:class:`RendezvousHandler` backends."""

    _registry: Dict[str, RendezvousHandlerCreator]

    def __init__(self) - >None:
        self._registry = {}

    def register(self, backend: str, creator: RendezvousHandlerCreator) - >None:
        """Registers a new rendezvous backend. Args: backend: The name of the backend. creater: The callback to invoke to construct the :py:class:`RendezvousHandler`. """
        current_creator: Optional[RendezvousHandlerCreator]
        current_creator = self._registry[backend]
        self._registry[backend] = creator

    def create_handler(self, params: RendezvousParameters) -> RendezvousHandler:
        """Creates a new :py:class:`RendezvousHandler`."""

        creator = self._registry[params.backend]
        handler = creator(params)
        return handler
Copy the code

4.3.2 global registry

The system creates a global registry, which is the rendezvous_handler_registry seen earlier.

# The default global registry instance used by launcher scripts to instantiate
# rendezvous handlers.
rendezvous_handler_registry = RendezvousHandlerRegistry()
Copy the code

Several handlers are registered here to provide the creator. Rendezvous provides the following implementations, etCD, ETCD-V2, C10D, and Static.

from .api import rendezvous_handler_registry as handler_registry

def _register_default_handlers() - >None:
    handler_registry.register("etcd", _create_etcd_handler)
    handler_registry.register("etcd-v2", _create_etcd_v2_handler)
    handler_registry.register("c10d", _create_c10d_handler)
    handler_registry.register("static", _create_static_handler)
Copy the code

Runtime is:

rendezvous_handler_registry = 
  _registry = {dict: 4} 
   'etcd' = {function} <function _create_etcd_handler at 0x7ff657e12d08>
   'etcd-v2' = {function} <function _create_etcd_v2_handler at 0x7ff657e12d90>
   'c10d' = {function} <function _create_c10d_handler at 0x7ff657e12e18>
   'static' = {function} <function _create_static_handler at 0x7ff657b9d2f0>
   __len__ = {int} 4
Copy the code

_create_etcd_handler creates etCD handlers, and so on.

4.4 create

Now that we have the creation path, let’s look at how to create it. Rendezvous provides the following implementations, ETCD, ETCD-V2, C10D, and static. We use static and C10D as examples.

4.4.1 static RendezvousHandler

Let’s use the _create_STATIC_handler example to see how to create a static handler.

Start with _create_static_handler.

4.4.1.1 _create_static_handler
def _create_static_handler(params: RendezvousParameters) -> RendezvousHandler:
  
    from . import static_tcp_rendezvous

    return static_tcp_rendezvous.create_rdzv_handler(params)
Copy the code

So we came to the torch/distributed/elastic/rendezvous/static_tcp_rendezvous py. Where create_rdzv_handler establishes StaticTCPRendezvous.

def create_rdzv_handler(params: RendezvousParameters) -> RendezvousHandler:
    endpoint = params.endpoint.strip()
    master_addr, master_port = parse_rendezvous_endpoint(endpoint, -1)
    world_size = params.max_nodes
    rank = cast(int, params.config.get("rank"))
    run_id = params.run_id
    if "timeout" in params.config:
        timeout = int(params.config["timeout"])
    else:
        timeout = _default_timeout_seconds
        
    return StaticTCPRendezvous(
        master_addr, master_port, rank, world_size, run_id, timeout
    )
Copy the code
4.4.1.2 StaticTCPRendezvous subclass

The StaticTCPRendezvous extends rendezvous shandler, defined as follows, with the primary logic of establishing a TCPStore above group_rank = 0, which is then encapsulated as a PrefixStore.

class StaticTCPRendezvous(RendezvousHandler) :
    """ Static rendezvous that is a wrapper around the TCPStore. Creates TCPStore based on the input parameters with the listener on the agent with group_rank=0 """

    def __init__(
        self,
        master_addr: str,
        master_port: int,
        rank: int,
        world_size: int,
        run_id: str,
        timeout: int.) :
        self.master_addr = master_addr
        self.master_port = master_port
        self.rank = rank
        self.world_size = world_size
        self.run_id = run_id
        self.timeout = datetime.timedelta(seconds=timeout)
        self._store: Optional[Store] = None

    def get_backend(self) - >str:
        return "static"

    def next_rendezvous(self) - >Tuple[Store, int.int] :
        if not self._store:
            is_master = self.rank == 0
            self._store = TCPStore(
                self.master_addr,
                self.master_port,
                self.world_size,
                is_master,
                self.timeout,
            )
        store = PrefixStore(self.run_id, self._store)
        return store, self.rank, self.world_size
Copy the code

The key function

def next_rendezvous(self) - >Tuple[Store, int.int] :
    log.info("Creating TCPStore as the c10d::Store implementation")
    if not self._store:
        is_master = self.rank == 0
        self._store = TCPStore(
            self.master_addr,
            self.master_port,
            self.world_size,
            is_master,
            self.timeout,
        )
    store = PrefixStore(self.run_id, self._store)
    return store, self.rank, self.world_size
Copy the code

4.4.2 dynamic RendezvousHandler

Let’s look at building dynamic rendezvous Shandler.

4.4.2.1 _create_c10d_handler

Where _create_c10d_handler returns a DynamicRendezvousHandler.

def _create_c10d_handler(params: RendezvousParameters) -> RendezvousHandler:
    from .c10d_rendezvous_backend import create_backend

    backend, store = create_backend(params)
    return create_handler(store, backend, params)
Copy the code

Here the dynamic rendezvous shandler is returned.

def create_handler(
    store: Store, backend: RendezvousBackend, params: RendezvousParameters
) -> DynamicRendezvousHandler:
    """Creates a new :py:class:`DynamicRendezvousHandler` from the specified parameters. Args: store: The C10d store to return as part of the rendezvous. backend: The backend to use to hold the rendezvous state. +-------------------+------------------------------------------------------+ | Parameter | Description | +===================+======================================================+ | join_timeout | The total time, in seconds, within which the | | | rendezvous is expected to complete. Defaults to 600 | | | seconds. | +-------------------+------------------------------------------------------+ | last_call_timeout | An additional wait amount, in seconds, before | | | completing the rendezvous once the minimum number of | | | nodes has been reached. Defaults to 30 seconds. | +-------------------+------------------------------------------------------+ | close_timeout | The time, in seconds, within which the rendezvous is | | | expected to close after a call to | | | :py:meth:`RendezvousHandler.set_closed` or | | | :py:meth:`RendezvousHandler.shutdown`. Defaults to | | | 30 seconds. | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "+" "
    timeout = RendezvousTimeout(
        _get_timeout(params, "join"),
        _get_timeout(params, "last_call"),
        _get_timeout(params, "close"),return DynamicRendezvousHandler.from_backend(
        params.run_id,
        store,
        backend,
        params.min_nodes,
        params.max_nodes,
        timeout,
    )
Copy the code
4.4.2.2 from_backend

From_backend is a method for concretely generating dynamic rendezvous shandler, which is equivalent to a generator.

It generates RendezvousSettings _BackendRendezvousStateHolder and node, and then DynamicRendezvousHandler is established.

@classmethod
def from_backend(
    cls,
    run_id: str,
    store: Store,
    backend: RendezvousBackend,
    min_nodes: int,
    max_nodes: int,
    timeout: Optional[RendezvousTimeout] = None.) :
    """Creates a new :py:class:`DynamicRendezvousHandler`. Args: run_id: The run id of the rendezvous. store: The C10d store to return as part of the rendezvous. backend: The backend to use to hold the rendezvous state. min_nodes: The minimum number of nodes to admit to the rendezvous. max_nodes: The maximum number of nodes to admit to the rendezvous. timeout: The timeout configuration of the rendezvous. """
    # We associate each handler instance with a unique node descriptor.
    node = cls._node_desc_generator.generate()

    settings = RendezvousSettings(
        run_id,
        min_nodes,
        max_nodes,
        timeout or RendezvousTimeout(),
        keep_alive_interval=timedelta(seconds=5),
        keep_alive_max_attempt=3,
    )

    state_holder = _BackendRendezvousStateHolder(backend, settings)

    return cls(node, settings, backend.name, store, state_holder)
Copy the code
4.4.2.3 DynamicRendezvousHandler
Torch Distributed Elastic comes with the :py:class:`.DynamicRendezvousHandler` class that implements the rendezvous mechanism described above. It is a backend- agnostic type that expects a particular :py:class:`.RendezvousBackend` instance to be specified during construction. Torch distributed users can either implement their own backend type or use  one of the following implementations that come with PyTorch:Copy the code

DynamicRendezvousHandler extends RendezvousHandler, defined as follows, the primary logic of which is to build a TCPStore above group_rank = 0 and encapsulate it as a PrefixStore.

The most important are the following member variables:

  • _BackendRendezvousStateHolder is responsible for the coordination between Rendezvous information.
  • _DistributedRendezvousOpExecutor is responsible for the specific business.
  • _store stores information (distributed).
class DynamicRendezvousHandler(RendezvousHandler) :
    """Represents a handler that sets up a rendezvous among a set of nodes."""

    # Static
    _node_desc_generator = _NodeDescGenerator()
    _this_node: _NodeDesc
    _settings: RendezvousSettings
    _backend_name: str
    _store: Store
    _state_holder: _RendezvousStateHolder
    _op_executor: _RendezvousOpExecutor
    _heartbeat_lock: threading.Lock
    _keep_alive_timer: Optional[_PeriodicTimer]

    @classmethod
    def from_backend(
        cls,
        run_id: str,
        store: Store,
        backend: RendezvousBackend,
        min_nodes: int,
        max_nodes: int,
        timeout: Optional[RendezvousTimeout] = None.) :
        """Creates a new :py:class:`DynamicRendezvousHandler`. Args: run_id: The run id of the rendezvous. store: The C10d store to return as part of the rendezvous. backend: The backend to use to hold the rendezvous state. min_nodes: The minimum number of nodes to admit to the rendezvous. max_nodes: The maximum number of nodes to admit to the rendezvous. timeout: The timeout configuration of the rendezvous. """
        # We associate each handler instance with a unique node descriptor.
        node = cls._node_desc_generator.generate()

        settings = RendezvousSettings(
            run_id,
            min_nodes,
            max_nodes,
            timeout or RendezvousTimeout(),
            keep_alive_interval=timedelta(seconds=5),
            keep_alive_max_attempt=3,
        )

        state_holder = _BackendRendezvousStateHolder(backend, settings)

        return cls(node, settings, backend.name, store, state_holder)

    def __init__(
        self,
        node: _NodeDesc,
        settings: RendezvousSettings,
        backend_name: str,
        store: Store,
        state_holder: _RendezvousStateHolder,
    ) - >None:
        self._this_node = node
        self._settings = settings
        self._backend_name = backend_name
        self._store = store
        self._state_holder = state_holder
        self._op_executor = _DistributedRendezvousOpExecutor(
            self._this_node, self._state_holder, self._settings
        )
        self._heartbeat_lock = threading.Lock()
        self._keep_alive_timer = None
Copy the code

We can also directly generate dynamic rendezvous Shandler as follows.

 store = TCPStore("localhost")
 backend = C10dRendezvousBackend(store, "my_run_id")
 rdzv_handler = DynamicRendezvousHandler.from_backend(
     run_id="my_run_id",
     store=store,
     backend=backend,
     min_nodes=2,
     max_nodes=4
 )
Copy the code
4.4.2.4 next_rendezvous

This function call is blocked until the required number of workers is reached. This function is called when the worker is initialized or restarted. When the function returns, the different worker groups will get a rank as a unique identifier. Its internal logic is:

  • Use the first_RendezvousExitOpExit the node.
  • And then use it_RendezvousJoinOpRejoin the node.
  • Finally, the heartbeat is started to return to world size, Store, etc. At this time, all the participating nodes are in the participants.
def next_rendezvous(self) - >Tuple[Store, int.int] :
    """See base class."""

    self._stop_heartbeats()

    # Delay the execution for a small random amount of time if this is our
    # first run. This will slightly skew the rendezvous attempts across the
    # nodes and reduce the load on the backend.
    if self._state_holder.state.round= =0:
        _delay(seconds=(0.0.3))

    exit_op = _RendezvousExitOp()
    join_op = _RendezvousJoinOp()

    deadline = self._get_deadline(self._settings.timeout.join)

    self._op_executor.run(exit_op, deadline)
    self._op_executor.run(join_op, deadline)

    self._start_heartbeats()

    rank, world_size = self._get_world()
    store = self._get_store()

    return store, rank, world_size # return the rank of worker group
Copy the code
4.4.2.5 _get_world

In the code above, _get_world is used, so let’s examine it again. The rank and world_size variables are dynamically generated, so they are retrieved from state. Moreover, because participants are synchronized across all nodes, each Node gets exactly the same participants.

rank, world_size = self._get_world()
    
def _get_world(self) - >Tuple[int.int] :
	state = self._state_holder.state
	return state.participants[self._this_node], len(state.participants)
Copy the code

Where are the participants from? At the end of rendezvous, a rank will be set. Because the rank is sorted by the same algorithm on each node, the rank is the same on each node. It is guaranteed that each Node gets a different rank than the others.

def _mark_rendezvous_complete(self) - >None:
    state = self._state
    state.complete = True
    state.deadline = None

    # Assign the ranks.
    for rank, node in enumerate(sorted(state.participants)):
        state.participants[node] = rank
Copy the code

4.5 fault-tolerant

As mentioned earlier, between the beginning of join Rendezvous and the completion of Rendezvous, if a process crashes (or a network failure, etc.), a Re-Rendezvous is automatically initiated and the remaining healthy nodes are automatically reorganized.

Torch Distributed Elastic rendezvous is designed to tolerate node failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy nodes will happen automatically.
Copy the code

4.5.1 ETCD

This fault-tolerant mechanism is particularly evident in the ETD rendezvous Shandler.

The next_rendezvous method calls rendezvous_barrier.

def next_rendezvous(self) :
    rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier()

    log.info("Creating EtcdStore as the c10d::Store implementation")
    store = self._rdzv_impl.setup_kv_store(rdzv_version)

    return store, rank, world_size
Copy the code

Among the rendezvous_barrier, if the underlying layer throws exceptions, they will be caught and init_phase will be called to rendezvous again until the deadline time expires.

def rendezvous_barrier(self) :
    """ Main entry point for next rendezvous. This method is blocking until rendezvous succeeds or a timeout occurs. Returns: ``(rdzv_version, rank, world_size)`` Raises: RendezvousTimeoutError - timeout waiting for rendezvous RendezvousClosedError - rendezvous is or was closed while waiting RendezvousError - other persistent errors that render the rendezvous non-retryable """
    self._rendezvous_deadline = time.time() + self._timeout
    while True:
        if time.time() > self._rendezvous_deadline:
            raise RendezvousTimeoutError()

        log.info("Attempting to join next rendezvous")
        try:
            # Dis-own our lease in the previous rendezvous, if exists
            if self._lease_this_rank_stop is not None:
                self._lease_this_rank_stop.set(a)return self.init_phase()

        except EtcdRendezvousRetryImmediately:
            # The type of failure suggests we can retry without delay
            pass

        except EtcdRendezvousRetryableFailure:
            # In case of retryable failure, wait a small delay
            # to avoid spamming etcd
            time.sleep(1)

        except RendezvousTimeoutError:
            log.info("Rendezvous timeout occured in EtcdRendezvousHandler")
            raise

        except RendezvousClosedError:
            log.info(
                f"Rendezvous for run_id={self._run_id} was observed to be closed"
            )
            raise

        except RendezvousError:
            raise

        except Exception as e:
            # In case of a general exception, wait a small delay
            # to avoid spamming etcd
            # FIXME: there are a few things that fall under this like
            # etcd.EtcdKeyNotFound, etc, which could be handled more explicitly.
            log.info("Rendezvous attempt failed, will retry. Reason: " + str(e))
            time.sleep(1)
Copy the code

Init_phase will initiate a rendezvous.

def init_phase(self) :
    """ Initially, the rendezvous state is expected to be one of: 1. empty (non-existent) - in this case we try to create a new one. 2. joinable - we try to join it. 3. final - we announce ourselves as waiting, and go into monitoring mode Any other state is considered transitional, and will be retried after a short delay. Returns: ``(rdzv_version, rank, world_size)`` Raises: RendezvousClosedError - current rendezvous was/is closed EtcdRendezvousRetryableFailure - observed some intermediate state, which is best handled by retrying later """
    try:
        active_version = self.try_create_rendezvous() # Launch a rendezvous
        state = json.loads(active_version.value)
        log.info("New rendezvous state created: " + str(state))
    except etcd.EtcdAlreadyExist:
        active_version, state = self.get_rdzv_state()
        # Note: it is possible for above query to fail (etcd.EtcdKeyNotFound),
        # but this is ok for us - just means we'll restart from beginning.
        log.info("Observed existing rendezvous state: " + str(state))

    if state["status"] = ="closed":
        raise RendezvousClosedError()

    if state["status"] = ="joinable":
        return self.join_phase(state["version"])

    if state["status"] = ="final":
        self.handle_existing_rendezvous(state["version"])
        raise EtcdRendezvousRetryImmediately()

    self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
    raise EtcdRendezvousRetryableFailure()
Copy the code

4.5.2 DynamicRendezvousHandler

DynamicRendezvousHandler is not obvious, presumably because DynamicRendezvousHandler was developed after ETCD, so many functions are incomplete and evolving.

This series is primarily analyzed based on PyTorch 1.9, so there is no error handling in the next_Rendezvous code above. Error handling has been added in the latest code for 2021-December, and should continue to be improved.

def next_rendezvous(self) - >Tuple[Store, int.int] :
    """See base class."""
    msg = (
        f"The node '{self._this_node}' attempts to join the next round of the rendezvous "
        f"'{self._settings.run_id}'."
    )
    self._record(message=msg)

    try: Error handling has been added
        self._stop_heartbeats()

        # Delay the execution for a small random amount of time if this is our
        # first run. This will slightly skew the rendezvous attempts across the
        # nodes and reduce the load on the backend.
        if self._state_holder.state.round= =0:
            _delay(seconds=(0.0.3))

        exit_op = _RendezvousExitOp()
        join_op = _RendezvousJoinOp()

        deadline = self._get_deadline(self._settings.timeout.join)

        self._op_executor.run(exit_op, deadline)
        self._op_executor.run(join_op, deadline)

        self._start_heartbeats()

        rank, world_size = self._get_world()
        store = self._get_store()

    except Exception as e: # error handling was added, but the next rendezvous was not launched
        self._record(
            message=f"{type(e).__name__}: {str(e)}",
            node_state=NodeState.FAILED,
        )
        raise

    msg = (
        f"The node '{self._this_node}' has joined round {self._state_holder.state.round} of "
        f"the rendezvous '{self._settings.run_id}' as rank {rank} in a world of size "
        f"{world_size}."
    )
    self._record(message=msg, rank=rank)

    return store, rank, world_size
Copy the code

4.6 summary

The logical connection between Rendezvous and Agent is summarized below, and each startup script has a set of mechanisms. Several mechanisms for launching scripts are interconnected.

+-----------------------------+ +------------------------------------------------+ | LocalElasticAgent | | WorkerSpec | | | | | | +------------------------+ | | rdzv_handler = {DynamicRendezvousHandler} -------+ | |WorkerGroup | | | | | | |  spec +--------------> | entry = worker_fn | | | | workers | | | | | | | store | | | role = {str} 'trainer'                       |   |
| |            group_rank  |  |      |                                                |   |
| |       group_world_size |  |      +------------------------------------------------+   |
| |                        |  |                                                           |
| +------------------------+  |                                                           |
|                             |                                                           |
| rdzv_run_id                 |                                                           |
| store                       |            +-----------------------------------------+    |
|                             |            |DynamicRendezvousHandler                 |    |
+-----------------------------+            |                                         |    |
                                           |                                         |    |
                                           |   _settings: RendezvousSettings         | <--+
                                           |                                         |
                                           |   _store: Store                         |
                                           |                                         |
                                           |   _state_holder: _RendezvousStateHolder |
                                           |                                         |
                                           |   _op_executor: _RendezvousOpExecutor   |
                                           |                                         |
                                           +-----------------------------------------+
Copy the code

Or look at it in conjunction with the previous static logic.

+------------------------+ +----------------------------------------------+ + +------------------------+ +---------------------------------------------+ | LocalElasticAgent | | WorkerSpec | | | LocalElasticAgent | | WorkerSpec | | | | | | | | | | | +--------------------+ | | rdzv_handler = {DynamicRendezvousHandler} ----+ | | +--------------------+ | | rdzv_handler = {DynamicRendezvousHandler}-----+ | | WorkerGroup | | | | | | | | WorkerGroup |  | | | | | | spec +----------->+ entry = worker_fn | | | | | spec +----------->+ entry = worker_fn | | | | workers | | |  | | | | | workers | | | | | | | store | | | role = {str} 'trainer'                     |  |  |  | |        store       | |   |  role = {str} 'trainer'| | | | group_rank | | | | | | | | group_rank | | | | | | | group_world_size | | +----------------------------------------------+ | | | | group_world_size | | +---------------------------------------------+ | | | | | +--------------------------------------------+ | | | | | | +--------------------------------------------+ | | +--------------------+ | | DynamicRendezvousHandler | | | | +--------------------+ | | DynamicRendezvousHandler | | | rdzv_run_id | | | | | | rdzv_run_id | | | | | store | | | | | | store | | | | +------------------------+ | _settings: RendezvousSettings | | | +------------------------+ | _settings: RendezvousSettings | | | | <--+ | | +<--+ | _store: Store | | | _store: Store | | | | | | +----------+ _state_holder: _RendezvousStateHolder | | | _state_holder: _RendezvousStateHolder +-----+ | | | | | | | | | _op_executor: _RendezvousOpExecutor | | | _op_executor: _RendezvousOpExecutor | | | | | | | | | | +--------------------------------------------+ | +--------------------------------------------+ | v | | +---------+---------------------+ | +-------------------------------+ | | _BackendRendezvousStateHolder | | | _BackendRendezvousStateHolder | | | | +-------------------+ | +--------------------+ | | | | _settings +-----------> | RendezvousSettings| | | RendezvousSettings | <----------+ _settings | <-------+ | | +-------------------+ | +--------------------+ | | | | +-------------------+ | +--------------------+ | | | _state +--------------> | _RendezvousState | | | _RendezvousState |  <----------+ _state | | | | | | | | | | | | +-------------------+ | +--------------------+ | | | | +-----------------------+ + +----------------------+ | | | _backend +------------> | C10dRendezvousBackend | | C10dRendezvousBackend| <-------+ _backend | | | | | +---------+ | | | | | | | _store +-----> |TCPStore | <-------+ _store | | | | | +---+-------------------+ +---+-----+ +-----------+----------+ | | | | ^ | ^ | | | | | | | | | | | | | | | | | sync +----------------------+ | +---------------------+ sync | | | set_state NODE1  |  NODE 2               set_state      |                               |
           +-------------------------------+                                      +                                      +-------------------------------+


Copy the code

The mobile phone is as follows:

0 x05 summary

We have now analyzed the static structure and dynamic logic of Rendezvous, and you have gained a basic understanding of its mechanism, such as the following concepts:

  • The node concept _NodeDesc is used to express the system.
  • The concept of states. RendezvousState is the state of rendezvous. Is dynamic information, and each node maintains a local state.
  • Overall static class _BackendRendezvousStateHolder to the node, status, the back-end to maintain unity, and other information.
  • Share A shared key-value store, such as TCPStore, can store the above information centrally and can also be used to exchange information for consensus.
  • The dynamic server or handler, RendezvousHandler provides a set of apis for external access.

In the next installment we introduce how the internal business logic, the Rendezvous engine, was implemented.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Design and implementation of PyTorch 1.9.0 Elastic Distributed Training

PyTorch Elastic read the source code