0 x00 the

In previous articles, we studied the basic modules of PyTorch distributed, introduced some official examples, and we will cover the resiliency training of PyTorch in this fifth installment, which will take a look at the internal engine of Rendezvous, such as how nodes join, leave, wait, heartbeat, and so on.

The resilience training series is as follows:

PyTorch distributed elastic training (1) — the general idea

PyTorch Distributed Elastic Training (2)– Startup & Single node flow

PyTorch Distributed elastic training (3)– proxy

PyTorch Distributed Elastic Training (4)– Rendezvous Architecture and logic

0 x01 preface

1.1 Overall System

Flexible training can be understood as a running system based on Rendezvous.

  • The Agent focuses on the logic of a specific node

    • The Agent is responsible for specific service log-related operations, such as starting processes to execute user programs, monitoring the running status of user programs, and notifying Rendezvous of anomalies.
    • Agent is a worker manager, responsible for starting/managing workers process, forming a worker group, monitoring the running status of workers and capturing invalid workers. If there is a fault/new worker, Restart the worker group.
    • The Agent is responsible for maintaining WORLD_SIZE and RANK information. Users do not need to manually provide these services. The Agent handles them automatically.
    • Agent is an independent background process on a specific node. Agents cannot achieve overall elastic training on their own, so a mechanism is needed to realize mutual discovery and change synchronization among workers (WORLD_SIZE and RANK also require synchronization of multiple nodes), which is the following Rendezvous concept.
  • Rendezvous is responsible for

    Cluster logic

    To ensure that nodes reach a strong consensus on “which nodes participate in the training”.

    • Each Agent contains an internal Rendezvous handler, which collectively constitutes a Rendezvous cluster and thus an Agent cluster.
    • Upon Rendezvous, a shared key-value store was created that implements a shared key-value storetorch.distributed.StoreAPI. This storage, shared only by members who have completed Rendezvous, is intended to allow Torch Distributed Elastic to exchange control and data information during initialization operations.
    • Rendezvous is responsible for maintaining all relevant information about the current group on each agent. Each agent will have a rendezvous, which will communicate with each other and generally maintain a set of information stored in the aforementioned stores.
    • Rendezvous is responsible for cluster logic, such as adding new nodes, removing nodes, assigning ranks, and so on.

1.2 Rendezvous

Rendezvous Rendezvous is a dynamic logic, wherein, _RendezvousStateHolder is a meta-information store (static structure), You’ll notice that there is one more _RendezvousOpExecutor not shown, the runtime engine, so we’ll take a look at _RendezvousOpExecutor.

+-----------------------------+ +------------------------------------------------+ | LocalElasticAgent | | WorkerSpec | | | | | | +------------------------+ | | rdzv_handler = {DynamicRendezvousHandler} -------+ | |WorkerGroup | | | | | | |  spec +--------------> | entry = worker_fn | | | | workers | | | | | | | store | | | role = {str} 'trainer'                       |   |
| |            group_rank  |  |      |                                                |   |
| |       group_world_size |  |      +------------------------------------------------+   |
| |                        |  |                                                           |
| +------------------------+  |                                                           |
|                             |                                                           |
| rdzv_run_id                 |                                                           |
| store                       |            +-----------------------------------------+    |
|                             |            |DynamicRendezvousHandler                 |    |
+-----------------------------+            |                                         |    |
                                           |                                         |    |
                                           |   _settings: RendezvousSettings         | <--+
                                           |                                         |
                                           |   _store: Store                         |
                                           |                                         |
                                           |   _state_holder: _RendezvousStateHolder |
                                           |                                         |
                                           |   _op_executor: _RendezvousOpExecutor   |
                                           |                                         |
                                           +-----------------------------------------+
Copy the code

Decoupling 1.3

_RendezvousOpExecutor to divide and decouple functions:

  • Business logic is abstracted into a series of operators, such as_RendevzousJoinOp.
  • Rendezvous maintains a set of state machines internally composed of business functions, such as _add_to_participants, to add participants.
  • _RendezvousOpExecutorEngine to execute a variety of operators, according to the operator result, get an Action, and then use the Action to call the business function for operation.

This article introduces the Rendezvous engine corresponding to the C10d backend.

0x02 Engine implementation

2.1 the base class

The _RendezvousOpExecutor is the base class of the engine, defining only the run virtual function.

class _RendezvousOpExecutor(ABC) :
    """Executes rendezvous operations."""

    @abstractmethod
    def run(
        self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
    ) - >None:
        """Executes a rendezvous operation. An operation is run inside a state machine and is expected to transition the rendezvous from one state to another. Args: state_handler: A callable that is expected to return the next state transition action based on the current state of the rendezvous. deadline: The time, in seconds, at which the operation will be considered timed-out. """
Copy the code

The _RendezvousContext is used, which encapsulates the Rendezvous information and provides it to the operation engine. This is where _RendezvousState and RendezvousSettings are used.

class _RendezvousContext:
    """Holds the context of the rendezvous. Attributes: node: The node descriptor associated with the current rendezvous handler instance. state: The current state of the rendezvous. settings: The rendezvous settings. """

    node: _NodeDesc
    state: _RendezvousState
    settings: RendezvousSettings

    def __init__(
        self, node: _NodeDesc, state: _RendezvousState, settings: RendezvousSettings
    ) - >None:
        self.node = node
        self.state = state
        self.settings = settings
Copy the code

2.2 Distributed Operation Engine

_DistributedRendezvousOpExecutor expanded _RendezvousOpExecutor is ElasticTorch actual practitioners. Similar to Looper, responsible for message distribution, business invocation, and state maintenance.

2.2.1 definition

Compared to its base class, _DistributedRendezvousOpExecutor joined the node information, for example, a state, configuration such member variables.

class _DistributedRendezvousOpExecutor(_RendezvousOpExecutor) :
    """Executes rendezvous operations using a shared state. Args: node: The node descriptor associated with the current rendezvous handler instance. state_holder: The ``RendezvousStateHolder`` to use to sync the rendezvous state with other nodes. settings: The rendezvous settings. """

    _node: _NodeDesc
    _state: _RendezvousState
    _state_holder: _RendezvousStateHolder
    _settings: RendezvousSettings

    def __init__(self, node: _NodeDesc, state_holder: _RendezvousStateHolder, settings: RendezvousSettings,) - >None:
        self._node = node
        self._state_holder = state_holder
        self._settings = settings
Copy the code

The logic is as follows:

+---------------------------------------------------------------+ | _DistributedRendezvousOpExecutor | | | | +------------------------+ | | _state +---> | _RendezvousState | | | | | | | | participants | | | | wait_list | | | | last_heartbeats | | | | deadline | | | +------------------------+ | | | | +-------------------------+ | | _settings +-->  | RendezvousSettings | | | | | | | +-------------------------+ | | | | +--------------------------------------+ | | _state_holder +---> | _BackendRendezvousStateHolder | | | | | | | | _backend: RendezvousBackend | | | | _state: _RendezvousState | | | | _settings: RendezvousSettings | | | | | | | +--------------------------------------+ | | +--------------------------------------+ |  | | _NodeDesc | | | _node +-------> | fqdn:str               |  |
|                     |              pid: int                |  |
|                     |              local_id: int           |  |
|                     |                                      |  |
|                     +--------------------------------------+  |
+---------------------------------------------------------------+
Copy the code

2.2.2 call

Let’s give several examples of how to call the engine. You can see that the operator is set first and then the engine’s run function is called.

2.2.2.1 _RendezvousKeepAliveOp
def _keep_alive(self) - >None:
    self._heartbeat_lock.acquire()
    op = _RendezvousKeepAliveOp() # set operator
    deadline = self._get_deadline(self._settings.timeout.heartbeat)
    self._op_executor.run(op, deadline) # call
Copy the code
2.2.2.2 _RendezvousCloseOp
def _close(self) - >None:
    op = _RendezvousCloseOp() # set operator
    deadline = self._get_deadline(self._settings.timeout.close)
    self._op_executor.run(op, deadline) # call
Copy the code
2.2.2.3 _RendezvousJoinOp
def next_rendezvous(self) - >Tuple[Store, int.int] :
    """See base class."""

    self._stop_heartbeats()

    # Delay the execution for a small random amount of time if this is our
    # first run. This will slightly skew the rendezvous attempts across the
    # nodes and reduce the load on the backend.
    if self._state_holder.state.round= =0:
        _delay(seconds=(0.0.3))

    exit_op = _RendezvousExitOp() # set operator
    join_op = _RendezvousJoinOp() # set operator

    deadline = self._get_deadline(self._settings.timeout.join)

    self._op_executor.run(exit_op, deadline) # this is where the call is made
    self._op_executor.run(join_op, deadline) # call

    self._start_heartbeats()

    rank, world_size = self._get_world()
    store = self._get_store()

    return store, rank, world_size
Copy the code

2.2.3 function

_DistributedRendezvousOpExecutor, run function to achieve the basic logic, on the basis of the action type is various operations.

2.2.3.1 Main loop

The specific code of run is as follows:

    def run(
        self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
    ) - >None:
        """See base class."""
        action = None

        whileaction ! = _Action.FINISH:Loop until a FINISH action is obtained
            # Reads or writes the latest rendezvous state shared by all nodes in
            # the rendezvous. Note that our local changes might get overridden
            # by another node if that node synced its changes before us.
            
            It is important to synchronize information between all nodes
            has_set = self._state_holder.sync() # Because the latest status is on rendezvous.

            self._state = self._state_holder.state

            ctx = _RendezvousContext(self._node, self._state, self._settings)

            # Determine the next action to take based on the current state of
            # the rendezvous.
            action = state_handler(ctx, deadline) # decide the next operation, state_handler is the operator

            if action == _Action.FINISH:
                continue

            if action == _Action.ERROR_CLOSED:
                raise RendezvousClosedError()

            if action == _Action.ERROR_TIMEOUT:
                raise RendezvousTimeoutError()

            if action == _Action.SYNC:
                # Delay the execution by one second to avoid overloading the
                # backend if we are asked to poll for state changes.
                _delay(seconds=1)
            else:
                if action == _Action.KEEP_ALIVE:
                    self._keep_alive()
                elif action == _Action.ADD_TO_PARTICIPANTS:
                    self._add_to_participants()
                elif action == _Action.ADD_TO_WAIT_LIST:
                    self._add_to_wait_list()
                elif action == _Action.REMOVE_FROM_PARTICIPANTS:
                    self._remove_from_participants()
                elif action == _Action.REMOVE_FROM_WAIT_LIST:
                    self._remove_from_wait_list()
                elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
                    self._mark_rendezvous_complete()
                elif action == _Action.MARK_RENDEZVOUS_CLOSED:
                    self._mark_rendezvous_closed()

                # Attempt to sync our changes back to other nodes.
                self._state_holder.mark_dirty()
Copy the code

The details are shown below.

+-----------------------------------------+ +---------------------------------------------------------------+ |DynamicRendezvousHandler | | _DistributedRendezvousOpExecutor | | | | | | | | +------------------------+ | | _settings:  RendezvousSettings | | _state +---> | _RendezvousState | | | | | | | | | | | | participants | | | _store: Store | | | wait_list | | | | | | last_heartbeats | | | | | | deadline | | | _state_holder: _RendezvousStateHolder | | +------------------------+ | | | run(_RendezvousJoinOp()) | +-------------------------+ | | |  | _settings +--> | RendezvousSettings | | | _op_executor +------------------------------------------------> | | | | | |  | +-------------------------+ | | | | +--------------------------------------+ | +-----------------------------------------+ | _state_holder +---> | _BackendRendezvousStateHolder | | | | | | | | _backend: RendezvousBackend | | | | _state: _RendezvousState | | | | _settings: RendezvousSettings | | | | | | | +--------------------------------------+ | | +--------------------------------------+ |  | | _NodeDesc | | | _node +-------> | fqdn:str               |  |
                                                                     |                     |              pid: int                |  |
                                                                     |                     |              local_id: int           |  |
                                                                     |                     |                                      |  |
                                                                     |                     +--------------------------------------+  |
                                                                     +---------------------------------------------------------------+
Copy the code

The mobile phone is as follows:

2.2.3.2 synchronization

In the run function, it should be noted that before performing various operator operations, self._state_holder.sync() will be called to carry out a state synchronization among all workers and reach a consensus.

def sync(self) - >Optional[bool] :
    """See base class."""
    state_bits: Optional[bytes] = None
    token = None
    has_set: Optional[bool]

    if self._dirty: If the node state changes
        has_set = False
        state_bits = pickle.dumps(self._state)
        # Set your state to Backend
        set_response = self._backend.set_state(state_bits, self._token)
        if set_response is not None:
            state_bits, token, has_set = set_response
    else: You can only fetch it from the back end
        has_set = None
        if self._cache_duration > 0:
            # Avoid overloading the backend if we are asked to retrieve the
            # state repeatedly. Try to serve the cached state.
            if self._last_sync_time >= max(time.monotonic() - self._cache_duration, 0) :return None
        get_response = self._backend.get_state() # Obtain the latest status of other nodes from Backend
        if get_response is not None:
            state_bits, token = get_response

    if state_bits is not None:
        try:
            self._state = pickle.loads(state_bits) Update your own state with backend state
        except pickle.PickleError as exc:
            raise RendezvousStateError(
                "The rendezvous state is corrupt. See inner exception for details."
            ) from exc
    else:
        self._state = _RendezvousState()

    if has_set and self._dead_nodes and log.isEnabledFor(logging.DEBUG):
        node_list = ",".join(f"'{dead_node}'" for dead_node in self._dead_nodes)
        msg = (
            f"As part of the sync operation the node(s) {node_list} have been removed from the "
            f"rendezvous '{self._settings.run_id}' since they had no heartbeat."
        )
        self._record(message=msg)

    self._token = token
    self._dirty = False
    self._last_sync_time = time.monotonic()
    self._sanitize()

    return has_set
Copy the code
The back-end

The torch/distributed/elastic/rendezvous/c10d_rendezvous_backend py is corresponding to the back-end code.

The back end, where we use the Store as a centralized store, is the master. Each node is a client and goes to the master to update its state and get the state of other nodes. This allows all nodes to communicate with each other and reach a consensus. Clients that do not update metadata are also periodically removed.

Get_state is simply extracted from store.

def get_state(self) - >Optional[Tuple[bytes, Token]]:
    """See base class."""
    base64_state: bytes = self._call_store("get", self._key)

    return self._decode_state(base64_state)
Copy the code

Set_state makes a compare set that returns the new state and whether the state is updated.

def set_state(
    self, state: bytes, token: Optional[Token] = None
) - >Optional[Tuple[bytes, Token, bool]] :
    """See base class."""
    base64_state_str: str = b64encode(state).decode()

    if token:
        # Shortcut if we know for sure that the token is not valid.
        if not isinstance(token, bytes):
            result = self.get_state()
            if result is not None:
                tmp = *result, False
                # Python 3.6 does not support tuple unpacking in return
                # statements.
                return tmp
            return None

        token = token.decode()
    else:
        token = self._NULL_SENTINEL

    base64_state: bytes = self._call_store("compare_set", self._key, token, base64_state_str)

    state_token_pair = self._decode_state(base64_state)
    if state_token_pair is None:
        return None

    new_state, new_token = state_token_pair

    # C10d Store's compare_set method does not offer an easy way to find out
    # whether our write attempt was successful. As a brute-force solution we
    # perform a bitwise comparison of our local state and the remote state.
    return new_state, new_token, new_state == state
Copy the code
_sanitize

The _sanitize method is used to do things based on messages from other nodes, such as cleaning up a failed node. That is, if the last heartbeat time exceeds a certain threshold range, these nodes are marked as dead_nodes and cleared from the Participant or Wait list.

def _sanitize(self) - >None:
    state = self._state

    expire_time = datetime.utcnow() - (
        self._settings.keep_alive_interval * self._settings.keep_alive_max_attempt
    )

    # Filter out the dead nodes.
    self._dead_nodes = [
        node
        for node, last_heartbeat in state.last_heartbeats.items()
        if last_heartbeat < expire_time
    ]

    participant_removed = False

    for dead_node in self._dead_nodes:
        del state.last_heartbeats[dead_node] # Remove the faulty node

        try:
            del state.participants[dead_node] # Remove the faulty node

            participant_removed = True
        except KeyError:
            pass

        try:
            state.wait_list.remove(dead_node) # Remove the faulty node
        except KeyError:
            pass

    if participant_removed:
        # Common epilogue shared with the _remove_from_participants()
        # function of _DistributedRendezvousOpExecutor.
        _remove_participant_epilogue(state, self._settings)
Copy the code

After explaining how to run the engine, let’s look at the concrete operator.

0 x03 operator

The business logic of the _RendezvousOpExecutor engine is divided into two layers: user operations and internal business logic. User operations are decoupled from internal business mechanisms.

  • User operations are divided into various operators, including: heartbeat, Join, close, and end. For example, the Join operator is _RendevzousJoinOp.

  • The internal business logic is divided into various business functions, such as the _add_to_participants method that removes a node from the wait list and adds it to the participants.

  • Operators and internal business logic do not correspond one to one, and need a mechanism like state machine to control.

    • For example, the result of a heartbeat operator might be timeout /keep alive/ normal end, so different internal business functions should be called based on this result. This correspondence logic is accomplished through Action.
    • The operators combine to form a state machine.
    • Inside the operator is the generation of various actions, which determine the next operation of the state machine.
  • Inside the engine, the specific business logic is executed according to actions, or decoupled through actions.

The engine can be logically divided into three layers: the operator layer at the top, the Action layer in the middle, and the business function layer below.

+-----------------------------------------------------------------------------------------+ | | | _RendezvousKeepAliveOp  _RendezvousCloseOp _RendezvousExitOp _RendezvousJoinOp | | | +-------------+---------------------+--------------------+------------------+-------------+ | | | | | | | | | | | | | | | | v v v v +-----------------------------------------------------------------------------------------+ | | | KEEP_ALIVE  ADD_TO_PARTICIPANTS ADD_TO_WAIT_LIST REMOVE_FROM_WAIT_LIST ...... | | | +-------------+----------+----------+----------+---------+---------+---------+------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | | v v v v v v v +-----------------------------------------------------------------------------------------+ | | | _add_to_participants _remove_from_participants _add_to_wait_list ...... | | | | | +-----------------------------------------------------------------------------------------+Copy the code

Let’s go through them one by one.

3.1 operating

Let’s first parse the mid-level actions and see how many there are. Based on the rendezvous state, the engine actions are as follows. Code is located in the torch/distributed/elastic/rendezvous/dynamic_rendezvous py

class _Action(Enum) :
    """Specifies the possible actions based on the state of the rendezvous."""

    KEEP_ALIVE = 1
    ADD_TO_PARTICIPANTS = 2
    ADD_TO_WAIT_LIST = 3
    REMOVE_FROM_PARTICIPANTS = 4
    REMOVE_FROM_WAIT_LIST = 5
    MARK_RENDEZVOUS_COMPLETE = 6
    MARK_RENDEZVOUS_CLOSED = 7
    SYNC = 8
    ERROR_CLOSED = 9
    ERROR_TIMEOUT = 10
    FINISH = 11
Copy the code

3.2 operator

There are operators implemented in the engine, basically, one operator for each operation, and we will give examples of operators that will be set up based on the rendezvous state.

3.2.1 heartbeat

3.2.1.1 Heartbeat Check

_RendezvousKeepAliveOp is used to determine the next Action based on the current state and time. Periodically check whether the local Node is faulty.

class _RendezvousKeepAliveOp:
    """Represents a rendezvous keep-alive update operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if _should_keep_alive(ctx):
            if time.monotonic() > deadline:
                return _Action.ERROR_TIMEOUT
            return _Action.KEEP_ALIVE
        return _Action.FINISH
Copy the code

The _should_keep_alive method is:

def _should_keep_alive(ctx: _RendezvousContext) - >bool:
    """Determines whether a keep-alive heartbeat should be sent."""
    try:
        last_heartbeat = ctx.state.last_heartbeats[ctx.node]
    except KeyError:
        return False

    return last_heartbeat <= datetime.utcnow() - ctx.settings.keep_alive_interval
Copy the code
3.2.1.2 Periodically

The important thing to note here is that sync is called before any operator is done, and sync synchronizes state between nodes, which is also periodic because heartbeats are periodic.

Dynamic rendezvous shandler will initiate a timer that periodically calls the _keep_alive_weak method.

def _start_heartbeats(self) - >None:
    self._keep_alive_timer = _PeriodicTimer(
        self._settings.keep_alive_interval, self._keep_alive_weak, weakref.ref(self)
    )

    self._keep_alive_timer.set_name(f"RendezvousKeepAliveTimer_{self._this_node.local_id}")
    self._keep_alive_timer.start()
Copy the code

Second, _keep_alive_weak calls self._keep_alive().

@staticmethod
def _keep_alive_weak(weak_self) - >None:
    self = weak_self()
    if self is not None:
        self._keep_alive()
Copy the code

_keep_alive will call _RendezvousKeepAliveOp.

def _keep_alive(self) - >None:
    self._heartbeat_lock.acquire()
    op = _RendezvousKeepAliveOp()
    deadline = self._get_deadline(self._settings.timeout.heartbeat)

    try:
        self._op_executor.run(op, deadline)
        msg = (
            f"The node '{self._this_node}' has sent a keep-alive heartbeat to the rendezvous "
            f"'{self._settings.run_id}'."
        )
        self._record(message=msg)
        log.debug(msg)
    except RendezvousError as ex:
        msg = (
            f"The node '{self._this_node}' has failed to send a keep-alive heartbeat to the "
            f"rendezvous '{self._settings.run_id}' due to an error of type {type(ex).__name__}."
        )
        self._record(message=msg, node_state=NodeState.FAILED)
    finally:
        self._heartbeat_lock.release()
Copy the code
3.2.1.2 Setting the Heartbeat Rate

In addition, _DistributedRendezvousOpExecutor have a _keep_alive function with the same name, is used to implement the internal logic, we will learn later.

3.2.2 closed

_rendezvous scloseop determines next Action based on current status and time.

class _RendezvousCloseOp:
    """Represents a rendezvous close operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if ctx.state.closed:
            return _Action.FINISH
        if time.monotonic() > deadline:
            return _Action.ERROR_TIMEOUT
        return _Action.MARK_RENDEZVOUS_CLOSED
Copy the code

3.2.3 end

_rendezvous sexitOP Determine the next Action based on the current status and time. If the Node is not among the participants, it will not be processed. Otherwise return a next Action removed from the Participants list. If time out, return the corresponding Action.

class _RendezvousExitOp:
    """Represents a rendezvous exit operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if ctx.node in ctx.state.participants:
            if time.monotonic() > deadline:
                return _Action.ERROR_TIMEOUT
            return _Action.REMOVE_FROM_PARTICIPANTS
        return _Action.FINISH
Copy the code

3.2.4 the Join

_RendezvousJoinOp the rendezvous Node is processed according to the system status, such as attempting to add the Node to the participant, waiting list, or waiting. For details, see the code comments.

  • Extract the _RendezvousState from context and store the results in state.
  • If the status is closed, rendezvous has ended, and _action.error_closed is returned.
  • See if it’s a participant and store the results in is_Participant.
  • If the state has ended and the node is already a participant, rendezvous is ready to end and _action.finish is returned.
  • Gets the current time now.
  • If now > deadline, a timeout has occurred.
    • If there is still time to rollback, state that the node is to return to its previous state.
      • If this node is already a participant, it means that the total number of nodes has not reached min. Although this node is already a participant, it needs to be removed from the participant list, so _action.remove_from_participants is returned.
      • If this node is in the wait list, the total number of nodes has not reached Max. Although this node is in the wait list, it needs to be removed from the wait list, so _action.remove_FROM_WAIT_list is returned.
    • Otherwise, _action.error_timeout is returned.
  • Otherwise, there is no timeout and continue processing.
    • If state.complete and the node is not a participant (if the node is a participant, this has been dealt with previously), the rendezvous has ended, and if the maximum number of nodes has not been reached and the current node is not in the wait list, it needs to be added to the wait node list. To re-do rendezvous when the next monitoring cycle is due, nodes from the wait list can be added to the participating list. So return _action.add_to_wait_list.
    • Rendezvous is terminated if the node is a participant and the state is not complete, if the minimum number of nodes has been reached & has timed out, Return _action.mark_rendezvous _complete.
    • Otherwise, the message is not over and this node is not a participant, so it will be directly added to the participant list and return _action.add_to_participants.
  • If the heartbeat needs to be maintained, return _action.keep_alive.
  • Otherwise, return _action.sync.
class _RendezvousJoinOp:
    """Represents a rendezvous join operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        state = ctx.state # extract from context _RendezvousState

        # A closed rendezvous means that it no longer accepts new nodes.
        if state.closed:
            return _Action.ERROR_CLOSED # If it is finished, return _action.error_closed

        is_participant = ctx.node in state.participants # See if you're a participant

        # If we are part of the rendezvous and it is already complete there is
        # no further action to take.
        if state.complete and is_participant: If it is an actor and the status is end, return _action.finish
            return _Action.FINISH

        now = time.monotonic()
        if now > deadline: If you have timed out
            rollback_period = 5  # 5 seconds

            # If we still have time to rollback (a short period on top of the
            # operation deadline), try to remove ourself from the rendezvous.
            # It is okay if we can't though as our keep-alive will eventually
            # expire.
            if now <= deadline + rollback_period: # If there is still time to rollback
                # If we are part of the rendezvous, it means we couldn't find
                # enough participants to complete it on time.
                if is_participant: # At this point, min has not been reached, although it is already a participant, it needs to be removed
                    return _Action.REMOVE_FROM_PARTICIPANTS Need to be removed from the participant list
                # If we are in the wait list, it means we couldn't wait till the
                # next round of the rendezvous.
                if ctx.node in state.wait_list: # At this point, Max has been reached, although it is already on the waiting list and needs to be removed
                    return _Action.REMOVE_FROM_WAIT_LIST Need to be removed from the wait list
            return _Action.ERROR_TIMEOUT # return timeout

        if state.complete: # If rendezvous has ended
            # If we are here, it means we are not part of the rendezvous. In
            # case the rendezvous has capacity for additional participants add
            # ourself to the wait list for the next round.
            if len(state.participants) < ctx.settings.max_nodes: # If the maximum number of nodes has not been reached
                if ctx.node not in state.wait_list: # If the current node is not in the wait list
                    return _Action.ADD_TO_WAIT_LIST To join the wait list, send a wait action
        elif is_participant: # if already in the participant list
            # If the rendezvous has enough number of participants including us,
            # check whether we have passed the rendezvous deadline. If yes,
            # complete it.
            if len(state.participants) >= ctx.settings.min_nodes: # If the minimum number of nodes is reached
                if cast(datetime, state.deadline) < datetime.utcnow(): # if the timeout is reached
                    return _Action.MARK_RENDEZVOUS_COMPLETE # mark rendezvous has ended
        else: # Otherwise join the participants directly
            # The rendezvous is not complete yet and we are not part of it. Try
            # to join.
            return _Action.ADD_TO_PARTICIPANTS

        if _should_keep_alive(ctx): Return _action.keep_alive if the heartbeat needs to be maintained
            return _Action.KEEP_ALIVE

        # At this point either the rendezvous is not complete, but we are part
        # of it, which means we have to wait for other participants to join; or
        # the rendezvous is complete, but we are not part of it, which means we
        # have to wait for the next round.
        return _Action.SYNC Otherwise return the synchronization status _action.sync
Copy the code

The specific logic is as follows:

state.closed +--------------------------> _Action.ERROR_CLOSED | | | complete & participant +-------------------------->  _Action.FINISH | | | timeout & participant +--------------------------> _Action.REMOVE_FROM_PARTICIPANTS | | | timeout & wait +--------------------------> _Action.REMOVE_FROM_WAIT_LIST | +-------------------+ | | | | timeout | _RendezvousJoinOp +------------------------------> _Action.ERROR_TIMEOUT | | | +-------------------+ | complete & <max & not wait
                        |
                        +-------------------------->   _Action.ADD_TO_WAIT_LIST
                        |
                        |  complete & participant & > min & deadline
                        |
                        +-------------------------->   _Action.MARK_RENDEZVOUS_COMPLETE
                        |
                        |  not complete & not participant
                        |
                        +-------------------------->   _Action.ADD_TO_PARTICIPANTS
                        |
                        |  _should_keep_alive
                        |
                        +-------------------------->   _Action.KEEP_ALIVE
                        |
                        |  else
                        |
                        +-------------------------->   _Action.SYNC

Copy the code

The following is the ETCD back-end Rendezvous state description diagram in the source code, we can refer to the c10D state.

Etcd backend Join can be divided into four stages:

  • In the setup phase, a value is written to the fixed directory. This is an exclusive lock, and if the write fails, there is currently onerendezvousThe process is in progress.
  • Join (Joinable) phase. If the write value succeeds, the join phase is entered. If the waiting time ends or the number of nodes participating in the training reaches the maximum value, the frozen phase is entered.
  • Frozen (Confirm) phase. All nodes need to be confirmed to enter the final stage.
  • The final stage. Distribution of rank,RANK 0The instance becomes master.

Following the figure above, we extend C10D as follows.

+ | | v +-----+------+ | | | closed +---------------> ERROR_CLOSED | | +-----+------+ | | v +-----+------+ is_participant | | | complete +---------------> FINISH | | +-----+------+ | is_participant | v +----> REMOVE_FROM_PARTICIPANTS +-----+-------+ now > deadline +-----------+ now < rollback +-----------+ | | | | | | | | | join +----------------> | timeout +---------------------->+ rollback +-----+ | | | | | | | +-----+-------+ +----+------+  +-----------+ | | | |in state.wait_list
      |                               |    now > rollback                              |
      |  now < deadline               |                                                +----> REMOVE_FROM_WAIT_LIST
      |                               +---------->  ERROR_TIMEOUT
      |
      |   complete && not is_participant && < max && not in state.wait_list
      |
      +------------------------------------------------------------------>  ADD_TO_WAIT_LIST
      |
      |   not complete && is_participant && > min && > deadline
      |
      +------------------------------------------------------------------>  MARK_RENDEZVOUS_COMPLETE
      |
      |   not complete && not is_participant
      |
      +----------------------------------------->  ADD_TO_PARTICIPANTS
      |
      |   _should_keep_alive
      |
      +--------------------------->  KEEP_ALIVE
      |
      |
      v
     SYNC

Copy the code

The mobile phone is as follows:

0x04 Service Operation

Internal _DistributedRendezvousOpExecutor. The run is based on the action choice of different business functions to perform.

            if action == _Action.KEEP_ALIVE:
                self._keep_alive()
            elif action == _Action.ADD_TO_PARTICIPANTS:
                self._add_to_participants()
            elif action == _Action.ADD_TO_WAIT_LIST:
                self._add_to_wait_list()
            elif action == _Action.REMOVE_FROM_PARTICIPANTS:
                self._remove_from_participants()
            elif action == _Action.REMOVE_FROM_WAIT_LIST:
                self._remove_from_wait_list()
            elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
                self._mark_rendezvous_complete()
            elif action == _Action.MARK_RENDEZVOUS_CLOSED:
                self._mark_rendezvous_closed()
Copy the code

Let’s look at the internal function logic in detail.

4.1 Adding Participants

After receiving ADD_TO_PARTICIPANTS, call _add_to_participants to remove the node from the wait list and add the node to participants.

    def _add_to_participants(self) - >None:

        state = self._state

        try:
            state.wait_list.remove(self._node)
        except KeyError:
            pass

        # The ranks of the participants will be set once the rendezvous is
        # complete.
        state.participants[self._node] = 0

        self._keep_alive()

        if len(state.participants) == self._settings.min_nodes:
            state.deadline = datetime.utcnow() + self._settings.timeout.last_call

        if len(state.participants) == self._settings.max_nodes:
            self._mark_rendezvous_complete()
Copy the code

4.2 Removing A Participant

After receiving REMOVE_FROM_PARTICIPANTS, call _remove_from_participants to remove participants from the participants and last_heartbeats.

    def _remove_from_participants(self) - >None:

        state = self._state
        del state.participants[self._node]
        del state.last_heartbeats[self._node]

        if state.complete:
            # If we do not have any participants left, move to the next round.
            if not state.participants:
                state.complete = False
                state.round+ =1
        else:
            if len(state.participants) < self._settings.min_nodes:
                state.deadline = None
Copy the code

4.3 Adding the waiting sequence

After receiving ADD_TO_WAIT_LIST, call _add_to_wait_list to add a node to the network wait_list.

    def _add_to_wait_list(self) - >None:
        self._state.wait_list.add(self._node)
        self._keep_alive()
Copy the code

4.4 Removing wait Sequences

After receiving REMOVE_FROM_WAIT_LIST, call _remove_from_WAIT_list to remove the node from wait_list.

    def _remove_from_wait_list(self) - >None:
        self._state.wait_list.remove(self._node)
        del self._state.last_heartbeats[self._node]
Copy the code

4.5 Setting End

After receiving MARK_RENDEZVOUS_COMPLETE, set a rank for each participant as the rendezvous rendezvous operation completes.

The rank is sorted by the same algorithm on each node, so the rank is the same on each node.

    def _mark_rendezvous_complete(self) - >None:
        state = self._state

        state.complete = True
        state.deadline = None

        # Assign the ranks.
        for rank, node in enumerate(sorted(state.participants)):
            state.participants[node] = rank

    def _mark_rendezvous_closed(self) - >None:
        self._state.closed = True
Copy the code

4.6 the heart

After receiving a KEEP_ALIVE action, _keep_alive is called to maintain the heartbeat. In addition, keep_alive will also be called in the _add_to_participants and other methods to update the last heartbeats in the local state. The last_heartbeats will be written into the key-value store in the next sync. This allows other nodes to know the status of the Node. The local handles the last_heartbeats in _sanitize, as we mentioned earlier.

def _keep_alive(self) - >None:
    msg = (
        f"The node '{self._node}' updated its keep-alive heartbeat time for the rendezvous "
        f"'{self._settings.run_id}'. Pending sync."
    )
    self._record(message=msg)
    self._state.last_heartbeats[self._node] = datetime.utcnow()
Copy the code

_record method is as follows:

def _record(self, message: str, node_state: NodeState = NodeState.RUNNING) - >None:
    construct_and_record_rdzv_event(
        name=f"{self.__class__.__name__}.{get_method_name()}",
        run_id=self._settings.run_id,
        message=message,
        node_state=node_state,
        hostname=self._node.fqdn,
        pid=self._node.pid,
        local_id=self._node.local_id,
    )
Copy the code

This is to call the following code to log.

def record_rdzv_event(event: RdzvEvent) - >None:
    _get_or_create_logger("dynamic_rendezvous").info(event.serialize())

def construct_and_record_rdzv_event(
    run_id: str,
    message: str,
    node_state: NodeState,
    name: str = "",
    hostname: str = "",
    pid: Optional[int] = None,
    master_endpoint: str = "",
    local_id: Optional[int] = None,
    rank: Optional[int] = None.) - >None:
    # We don't want to perform an extra computation if not needed.
    if isinstance(get_logging_handler("dynamic_rendezvous"), logging.NullHandler):
        return

    # Set up parameters.
    if not hostname:
        hostname = socket.getfqdn()
    if not pid:
        pid = os.getpid()

    # Determines which file called this function.
    callstack = inspect.stack()
    filename = "no_file"
    if len(callstack) > 1:
        stack_depth_1 = callstack[1]
        filename = os.path.basename(stack_depth_1.filename)
        if not name:
            name = stack_depth_1.function

    # Delete the callstack variable. If kept, this can mess with python's
    # garbage collector as we are holding on to stack frame information in
    # the inspect module.
    del callstack

    # Set up error trace if this is an exception
    if node_state == NodeState.FAILED:
        error_trace = traceback.format_exc()
    else:
        error_trace = ""

    # Initialize event object
    event = RdzvEvent(
        name=f"{filename}:{name}",
        run_id=run_id,
        message=message,
        hostname=hostname,
        pid=pid,
        node_state=node_state,
        master_endpoint=master_endpoint,
        rank=rank,
        local_id=local_id,
        error_trace=error_trace,
    )

    # Finally, record the event.
    record_rdzv_event(event)
Copy the code

So far, the engine part has been analyzed. In the next article, we will see if we can do a comprehensive combing from the overall perspective.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch distributed elastic training (1) — the general idea

PyTorch Distributed Elastic Training (2)– Startup & Single node flow

PyTorch Distributed elastic training (3)– proxy

PyTorch Distributed Elastic Training (4)– Rendezvous Architecture and logic