0 x00 the

In the previous article, we introduced the dynamic logic of the back propagation engine, because the specific back propagation algorithm is done in the device thread, so we have a separate chapter to explain.

Other articles in this series are as follows:

Automatic Differentiation of Deep Learning Tools (1)

Automatic Differentiation of Deep Learning Tools (2)

Automatic differentiation of Deep Learning Tools (3) — Interpretation of examples

PyTorch implements forward propagation (1) — Base class (1)

PyTorch implements forward propagation (2) — Base class (2)

PyTorch how to implement forward propagation (3) – implementation

How to implement back propagation (1)—- call engine

Pytorch how to implement backward propagation (2)—- engine static structure

Pytorch how to implement backward propagation (3)—- engine dynamic logic

0x01 Worker thread body

Thread_main is the main function of the worker thread. The main logic is to execute a while loop around ReadyQueue. The worker thread blocks at ReadyQueue -> pop. The pop returns to fetch a NodeTask, which the worker thread processes, completes one portion of the backward calculation, inserts new NodeTasks into a ReadyQueue if needed, and drives the engine to continue the other portion of the backward calculation.

Thread_main is called from:

  1. Autograd Threads for CUDA, XLA devices will be called.
  2. The backpropagation main thread on the CPU is called.
  3. The first two cases of reentrant back propagation are also called.

1.1 Thread body code

The calculation of the worker thread starts from the GraphRoot function of the dynamic graph. The back-propagation takes the edge of Node as the link, and the layer upon layer is calculated backwards until it reaches the leaf Node, and the reverse calculation is finally completed, as follows:

  • Local_graph_task represents the graph_task we retrieved from the queue. The external graph_ task represents the overall reentrant graph_ task that we need to execute.

  • Take the NodeTask instance from your own ReadyQueue and execute evaluate_function with local_graph_task as a parameter.

  • Outstanding_tasks is reduced by 1.

  • If this local_graph_task has ended (reentrant backpropagation will run multiple GraphTasks), that is:

    • Follow up with exec_post_processing and use future_result_->markCompleted.
    • If the task is from another worker thread, that is, worker_Device! Dummy Function task = base_owner sends a dummy function task to the worker thread’s queue.

The specific code is as follows:

// thread_main is used by: // 1). autograd threads for devices (i.e. CUDA, XLA) // 2). the caller/owning thread of the backward call on CPU (sync mode) // 3). Renetrant backward that invoked by either 1) or 2) // The exit conditions are different for the above three cases. // For 1), we are spinning on running the thread_main on device autograd // threads throughout the Engine lifetime, thread_main will get // terminated during Engine destruction by pushing shutdown tasks // For 2), the owning thread of the backward call drives the thread_main // synchronously until the graph_task of that owning thread is // completed and exit the thread_main to continue executing the // result of caller's code. // For 3), the reentrant backward that invokes // thread_main, either from 1) or 2), will not spin and will exit as // long as graph_task is completed and notify the owning thread as // needed. auto Engine::thread_main(const std::shared_ptr<GraphTask>& graph_task) -> void { // When graph_task is nullptr, this is a long running thread that processes // tasks (ex: device threads). When graph_task is non-null (ex: reentrant // backwards, user thread), this function is expected to exit once that // graph_task complete. // local_ready_queue should already been initialized  when we get into thread_main while (graph_task == nullptr || ! graph_task->future_result_->completed()) { // local_graph_task represents the graph_task we retrieve from the queue. // The outer graph_task represents the overall graph_task we need to execute // for reentrant execution. std::shared_ptr<GraphTask> local_graph_task; { // Scope this block of execution since NodeTask is not needed after this // block and can be deallocated (release any references to grad tensors // as part of inputs_). NodeTask task = local_ready_queue->pop(); // This will only work if the worker is running a non backward task // TODO Needs to be fixed This to work in all cases if (task.isShutdownTask_) { break; } if (! (local_graph_task = task.base_.lock())) { // GraphTask for function is no longer valid, skipping further // execution. continue; } if (task.fn_ && ! Local_graph_task ->has_error_.load()) {// Configure AutoGradMode with grad_mode_, GradMode::is_enabled() : GradMode grad_mode grad_mode(local_graph_task->grad_mode_); try { // The guard sets the thread_local current_graph_task on construction // and restores it on exit. The current_graph_task variable helps // queue_callback() to find the target GraphTask to append final // callbacks. GraphTaskGuard guard(local_graph_task); NodeGuard ndguard(task.fn_); Evaluate_function (local_graph_task, task.fn_.get(), task.inputs_, local_graph_task->cpu_ready_queue_); } catch (std::exception& e) { thread_on_exception(local_graph_task, task.fn_, e); } } } // Decrement the outstanding tasks. --local_graph_task->outstanding_tasks_; // Check if we've completed execution. If (local_graph_task->completed()) { Local_graph_task -> mark_AS_completed_and_run_post_processing (); auto base_owner = local_graph_task->owner_; // The current worker thread finishes The graph_task, but the owning thread // of the graph_task might be sleeping on pop() if it does not have work. // So we need to send a dummy function task to the owning thread just to // ensure that it's not sleeping, so that we can exit the thread_main. // If it has work, it might see that graph_task->outstanding_tasks_ == 0 // before it gets to the task, but it's a no-op anyway. // // NB: This is not necessary if the current thread is the owning thread. if (worker_device ! = base_owner) { // Synchronize outstanding_tasks_ with queue mutex std::atomic_thread_fence(std::memory_order_release); Queue ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner) ->push(NodeTask(local_graph_task, base_owner)) nullptr, InputBuffer(0))); }}}}Copy the code

1.2 Using the Ready Queue

At the end of the above code, use ready_queue_by_index to get the queue for subsequent work.

ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner)
    ->push(NodeTask(local_graph_task, nullptr, InputBuffer(0)));
Copy the code

How do I get the Ready Queue? Specific strategies are as follows:

  • If the next device to execute is the CPU, select CPU_ready_queue.
  • Otherwise, select a GPU ReadyQueue from Device_readY_queues_.

The code is as follows:

auto Engine::ready_queue_by_index(std::shared_ptr<ReadyQueue> cpu_ready_queue, int device_index) -> std::shared_ptr<ReadyQueue> { if (device_index == CPU_DEVICE) { // return the cpu ready queue passed in TORCH_INTERNAL_ASSERT(cpu_ready_queue); return cpu_ready_queue; } else { // Static cast is ok here as the number of device should never overflow an int. TORCH_INTERNAL_ASSERT(0 <= device_index && device_index < static_cast<int>(device_ready_queues_.size())); // See Note [Allocating GPUs to autograd threads] // NB: This function would become obsolete if we truly allocated a CPU thread // per device, rather than colocate. return device_ready_queues_.at(device_index); }}Copy the code

The logic is as follows:

+---------------------------------------------------------------------+ | Main Thread | | | | push(NodeTask)+--------------+ | | | | +---------------------------------------------------------------------+ | | v +------+-----+ | | | ReadyQueue | | | +------+-----+ | | | +---------------------------------------------------------------------+ | Worker Thread 1 | | | | | | thread_main{ | | |  v | | NodeTask task = local_ready_queue->pop() | | | | evaluate_function(task.fn_.get(),task.inputs_) | | } | +---------------------------------------------------------------------+Copy the code

0x02 Reverse compute the overall logic

The evaluate_function method completes the logic of reverse calculation, and the overall logic is as follows:

  • Preparation: If exec_info needs processing, process captured_VARs_.

  • Reverse calculation: call call_function(graph_task, func, inputs) for reverse propagation:

    • Call Pre hooks.
    • Call fn to compute.
    • Call post hooks.
  • Cleaning up work:

    • If you don’t need keep graph, fn.release_variables();
    • According to the outputs of call_function, calculate num_outputs = elsion.size () to get the number of elements of num_outputs (which is equal to the number of elements in the list returned by fn’s next_edge()).
  • To prepare for the next step, which is to find the subsequent NodeTask to be calculated, num_outputs is used here. This part is more complicated.

The overall code is as follows:

void Engine::evaluate_function(
    std::shared_ptr<GraphTask>& graph_task,
    Node* func, // 导数计算方法
    InputBuffer& inputs, // 当前Node的输入梯度
    const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
    
  // 进行准备工作  
  // If exec_info_ is not empty, we have to instrument the execution
  auto& exec_info_ = graph_task->exec_info_;
  if (!exec_info_.empty()) {
    auto& fn_info = exec_info_.at(func); // 取出当前的进行处理
    if (auto* capture_vec = fn_info.captures_.get()) {
      // Lock mutex for writing to graph_task->captured_vars_.
      std::lock_guard<std::mutex> lock(graph_task->mutex_);
      for (const auto& capture : *capture_vec) {
        // captured_grad 就是临时存储下,每次node计算都会更新,最终输出给调用者,相当于引用
        // 1. captured_grad 引用了captured_vars_[capture.output_idx_],
        auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
        // 2. 给 captured_vars_[capture.output_idx_] 赋值 inputs[capture.input_idx_]
        captured_grad = inputs[capture.input_idx_];
        // 遍历hooks,链式调用hook进行计算,captured_grad 不停的作为输入和输出在流水线中流淌
        // 就是针对 captured_vars_[capture.output_idx_]不停的计算,最终结果还是在 captured_vars_[capture.output_idx_] 之中。
        for (auto& hook : capture.hooks_) {
          captured_grad = (*hook)(captured_grad);
        }
      }
    }
    if (!fn_info.needed_) {
      // Skip execution if we don't need to execute the function.
      return;
    }
  }

  // Set the ThreadLocalState before calling the function.
  // NB: The ThreadLocalStateGuard doesn't set the grad_mode because GraphTask
  // always saves ThreadLocalState without grad_mode.
  at::ThreadLocalStateGuard tls_guard(graph_task->thread_locals_);

  // Switches to a function's CUDA stream (if applicable) before calling it
  const auto opt_parent_stream = (*func).stream(c10::DeviceType::CUDA);
  c10::OptionalStreamGuard parent_stream_guard{opt_parent_stream};

  // 进行反向计算
  auto outputs = call_function(graph_task, func, inputs);

  // 如果不需要保持计算图,则本节点释放变量
  auto& fn = *func;
  if (!graph_task->keep_graph_) {
    fn.release_variables();
  }

  // 得到 num_outputs的元素数量(该数量等同于当前fn的next_edge()返回的list中的元素数量),后续遍历本节点输出时候会用到
  int num_outputs = outputs.size();
  if (num_outputs == 0) { // Note: doesn't acquire the mutex
    // Records leaf stream (if applicable)
    // See note "Streaming backwards"
    if (opt_parent_stream) {
      std::lock_guard<std::mutex> lock(graph_task->mutex_);
      graph_task->leaf_streams.emplace(*opt_parent_stream);
    }
    return;
  }

  if (AnomalyMode::is_enabled()) {
    AutoGradMode grad_mode(false);
    for (int i = 0; i < num_outputs; ++i) {
      auto& output = outputs[i];
      at::OptionalDeviceGuard guard(device_of(output));
      if (output.defined() && isnan(output).any().item<uint8_t>()) {
        std::stringstream ss;
      }
    }
  }

  // 准备下一步工作
  // Lock mutex for the accesses to GraphTask dependencies_, not_ready_ and cpu_ready_queue_ below
  std::lock_guard<std::mutex> lock(graph_task->mutex_);
  for (int i = 0; i < num_outputs; ++i) {
    auto& output = outputs[i];
    const auto& next = fn.next_edge(i); // next_edge是该node在前向传播图中的输入,在反向传播时候就是本节点的输出,所以next就是下一个可能运算的节点

    if (!next.is_valid()) continue;

    // Check if the next function is ready to be computed
    bool is_ready = false;
    auto& dependencies = graph_task->dependencies_;
    auto it = dependencies.find(next.function.get()); // 找到下一个节点的依赖

    if (it == dependencies.end()) {
      auto name = next.function->name();
      throw std::runtime_error(std::string("dependency not found for ") + name);
    } else if (--it->second == 0) {
      dependencies.erase(it);
      is_ready = true; // 下一个节点没有入度了,那么说明计算该节点梯度依赖的其他节点梯度都已经计算完成
    }

    // 要去 not_ready里面看看,是否已经存储了
    auto& not_ready = graph_task->not_ready_;
    auto not_ready_it = not_ready.find(next.function.get());
    if (not_ready_it == not_ready.end()) {
      // 下一个节点的梯度还没有进行计算
      // Skip functions that aren't supposed to be executed
      // 跳过不需要计算的节点
      if (!exec_info_.empty()) {
        auto it = exec_info_.find(next.function.get());
        if (it == exec_info_.end() || !it->second.should_execute()) {
          continue;
        }
      }
      // No buffers have been allocated for the function
      InputBuffer input_buffer(next.function->num_inputs()); // 下一个节点前置梯度的buffer,就是下一个节点的输入梯度

      // Accumulates into buffer
      // 下一个节点的输入梯度就是当前节点的输出,所以要拷贝过去
      const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
      input_buffer.add(next.input_nr,
                       std::move(output),
                       opt_parent_stream,
                       opt_next_stream);

      if (is_ready) {
        auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
        // 既然依赖全部完成,就插入到ReadyQueue 之中
        queue->push(
            NodeTask(graph_task, next.function, std::move(input_buffer)));
      } else {
        // 下一个节点的输入依赖还没有完成,就放到not_ready之中。
        not_ready.emplace(next.function.get(), std::move(input_buffer));
      }
    } else {
      // 如果下一个节点已经开始计算,但是没有完成(就是依赖梯度还有),此时应该在not_ready之中
      // The function already has a buffer
      auto &input_buffer = not_ready_it->second;

      // Accumulates into buffer
      const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA);
      input_buffer.add(next.input_nr,
                       std::move(output),
                       opt_parent_stream,
                       opt_next_stream);
        
      // Graph中每一个node(fn)的输出是下一个node(fn)的输入,下面4句代码来将前一个fn的输出转化为下一个fn的输入  
      if (is_ready) {
        // 如果此时已经没有输入依赖,就放入新的NodeTask,就是下一个需要计算梯度的NodeTask
        auto queue = ready_queue(cpu_ready_queue, input_buffer.device());
        queue->push(
            NodeTask(graph_task, next.function, std::move(input_buffer)));
        //已经完成下一个节点前置梯度计算,从not_ready中移除相应的buffer
        not_ready.erase(not_ready_it);
      }
    }
  }
}
Copy the code

Because this part of the code is quite complex, let’s go through it one by one.

0x03 Preparations

First of all, let’s look at the preparations, as follows:

  • Extract the ExecInfo for the current Node.

  • Take its Captures_ and walk through each of these Capture.

  • Loop through the hooks of Capture.

    • Captured_grad continuously flows in the pipeline as input and output, aiming atcaptured_vars_[capture.output_idx_]One by one.
    • The final result is saved incaptured_vars_[capture.output_idx_].

One detail in the code is that captured_grad is only temporary storage, updated every node calculation, and finally output to the caller as a reference.

Evaluate_function (STD ::shared_ptr< graph_task >& Node* func, Const STD ::shared_ptr<ReadyQueue>& cpu_ready_queue) {If exec_info_ is not empty, we have to instrument the execution auto& exec_info_ = graph_task->exec_info_; if (! exec_info_.empty()) { auto& fn_info = exec_info_.at(func); If (auto* capture_vec = fn_info.captures_.get()) {// Lock mutex for writing to graph_task->captured_vars_.  std::lock_guard<std::mutex> lock(graph_task->mutex_); for (const auto& capture : *capture_vec) {// Captured_grad is a temporary storage, updated every node calculation, and finally output to the callers. Captured_grad refers to captured_vars_[capture. Output_idx_].  auto& captured_grad = graph_task->captured_vars_[capture.output_idx_]; Inputs [capture. Input_idx_] captured_vars_[capture. Captured_vars_ [capture. Output_idx_] = captured_vars_[capture. Output_idx_] = captured_vars_[capture. The final result is in capTUred_vars_ [capture. Output_idx_]. for (auto& hook : capture.hooks_) { captured_grad = (*hook)(captured_grad); } } } if (! fn_info.needed_) { // Skip execution if we don't need to execute the function. return; }}Copy the code

0x04 Core Logic

Call_function is the computation-related core logic in back propagation.

  • Call pre_hooks registered on this node;

  • Call node itself, such as MeanBackward0, MulBackward0, etc.

    • Input isInputBuffer::variables(std::move(inputBuffer)), a set of instances of Variable. When the dynamic graph is first evaluated in reverse, the engine first executes the graph_root node, whose input is task.inputs — InputBuffer(0).
    • The call is FN’s apply(). Apply is a polymorphic implementation and will be dispatched to the corresponding Apply implementation of operation for different operations.
    • Outputs are also a set of instances of Variable outputs = fn(STD ::move(inputs_copy)), which will be input to the next fn.
  • Call post hooks registered on Node.

  • Returns the derivative of the current node, which is a variable_list.

The specific code is as follows:

static variable_list call_function( std::shared_ptr<GraphTask>& graph_task, Node* func, InputBuffer& inputBuffer) { CheckpointValidGuard cpvguard(graph_task); auto& fn = *func; auto inputs = call_pre_hooks(fn, InputBuffer::variables(std::move(inputBuffer))); if (! graph_task->keep_graph_) { fn.will_release_variables(); } const auto has_post_hooks = ! fn.post_hooks().empty(); variable_list outputs; if (has_post_hooks) { // In functions/accumulate_grad.cpp, there is some logic to check the // conditions under which the incoming gradient can be stolen directly // (which elides  a deep copy) instead of cloned. One of these conditions // is that the incoming gradient's refcount must be 1 (nothing else is // referencing the same data). Stashing inputs_copy here bumps the // refcount, so if post hooks are employed, it's actually still ok for // accumulate_grad.cpp to steal the gradient if the refcount is 2. // // "new_grad.use_count() <= 1 + ! post_hooks().empty()" in // accumulate_grad.cpp accounts for this, but also creates a silent // dependency between engine.cpp (ie, this particular engine // implementation) and accumulate_grad.cpp. // // If you change the logic here, make sure it's compatible with // accumulate_grad.cpp. auto inputs_copy = inputs; outputs = fn(std::move(inputs_copy)); } else { outputs = fn(std::move(inputs)); } validate_outputs(fn.next_edges(), outputs, [&](const std::string& msg) { std::ostringstream ss; return ss.str(); }); if(has_post_hooks){ return call_post_hooks(fn, std::move(outputs), inputs); } return outputs; }Copy the code

0x05 Preparing for the Next step

This is part of the complexity of backpropagation.

Now call call_function and get the output propagated backwards, recorded in the outputs.

auto outputs = call_function(graph_task, func, inputs);
Copy the code

Therefore, the latter part is to find the subsequent nodes that can be calculated from the outputs.

The general idea is to traverse the output nodes propagated backward (that is, the nodes that the node connects to the edge in the forward calculation graph) and measure the output nodes one by one. The traversal loop is divided into two pieces of code and does the following for each output node:

  • The first section is to check the node according to the dependency, and get whether the node is ready. The key is to see if the output node counts to zero in the Dependencies of GraphTask.

    • 0 indicates that the node is ready, indicating that the node is not dependent on future calculations.
    • Non-zero means that the node has multiple inputs, that is, connected by multiple nodes, some of which have not yet calculated the gradient.
  • The second section deals with the node in terms of readiness, such as which queue to put in.

5.1 Check nodes based on dependencies

The function of the first code is to check the node according to the dependency relationship and get whether the node is ready, as follows:

  • Given that a node is output, we get the corresponding edge and iterate over the output edge.

    • One output edge at a time is recorded as next, and func is a function in NodeTask.

    • Dependencies_ information to determine if next can be calculated. Dependencies_ records the dependencies of all nodes in the figure.

    • Dependencies_ find the corresponding number of dependencies for Next and reduce the number of dependencies by one (usually because there are multiple inputs).

      • If –it->second == 0, it indicates that all gradients of other nodes on which the front node calculates the gradient have been calculated. the

        • Remove the information corresponding to this front node from GraphTask’s dependencies (and later from the Not_ready member variable of GraphTask).
        • Set is_ready to true, and subsequent operations will be performed based on the is_ready value.
    • The corresponding input buffer for next is obtained from not_ready_ (this is what the following code does);

      • std::unordered_map<Node*, InputBuffer> not_ready_;
        Copy the code

The code is as follows:

for (int i = 0; i < num_outputs; Outputs = outputs[I]; outputs = outputs[I]; outputs = outputs[I]; const auto& next = fn.next_edge(i); // Get an output node if (! next.is_valid()) continue; // Check if the next function is ready to be computed bool is_ready = false; auto& dependencies = graph_task->dependencies_; Auto it = dependencies. Find (next.function.get()); If (it == dependencies. End ()) {auto name = next.function->name(); // throw STD ::runtime_error(STD ::string("dependency not found for ") + name); } else if (--it->second == 0) { dependencies.erase(it); Is_ready = true; } auto& not_ready = graph_task->not_ready_; auto not_ready_it = not_ready.find(next.function.get()); // Find the input bufferCopy the code

Now you have found an output node, know whether it has been evaluated (depending on whether there are dependencies), and get the input buffer that has an “unready queue” (if any).

5.2 Handling this node

The second part is to process the node in terms of whether it’s ready, like which queue is it in? Or not ready queue? The core is:

  • If it is ready, it is placed in the node’s ReadyQueue for processing.
  • If not, create a new NodeTask and put it in not_ready of GraphTask for subsequent processing. Note that the new NodeTask is created within the worker Thread.
  • How do I find the ReadyQueue? Look at the input_buffer.device() of this Node, that is, the new NodeTask should be sent to the ReadyQueue corresponding to input_buffer.device().

Let’s take a look at how not_READY is operated based on the value of IS_ready.

  • If no element corresponding to next_edge is found in the not_ready queue, then:

    • If exec_info_ is not empty, the element corresponding to next_edge is searched in exec_info_, and if there is an element and no execution is indicated, the next for loop is skipped.
    • Construct an input_buffer using the next_edge stream, inUT_NR, etc.
    • If is_ready is True, build a NodeTask with this GraphTask, next.function, input_buffer, Put the ReadyQueue (input_buffer.device() to get the corresponding queue). This wakes up the next worker thread.
    • If is_ready is False, this usually indicates that the node has multiple inputs(connected by more nodes, num_inputs()). It also indicates that the node is processing its first input, and next_edge is needed later. So the next_edge needs to be placed in not_ready. Next. Function, input_buffer, which is the input required for subsequent next_edge execution, is put into not_ready.
  • If an element corresponding to next_edge is found in the not_ready queue, then:

    • Take the element’s corresponding input_buffer and accumulate the information into it. What accumulates this time are other inputs to the node. Input_buffer. Add (next. Input_nr, STD ::move(output), opt_parent_stream, opt_next_stream) Next. Input_nr indicates which node is the node (next) input to flow to in the back propagation.
    • If is_ready is True, build a NodeTask with this GraphTask, next.function, input_buffer and put it in ReadyQueue. This wakes up the next worker thread.
    • Removing this element from not_ready removes the GraphTask dependency.

The code is as follows:

if (not_ready_it == not_ready.end()) { // Skip functions that aren't supposed to be executed if (! exec_info_.empty()) { auto it = exec_info_.find(next.function.get()); if (it == exec_info_.end() || ! it->second.should_execute()) { continue; } } // No buffers have been allocated for the function InputBuffer input_buffer(next.function->num_inputs()); // Accumulates into buffer const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA); input_buffer.add(next.input_nr, std::move(output), opt_parent_stream, opt_next_stream); If (is_ready) {auto queue = ready_queue(cpu_ready_queue, input_buffer.device()); queue->push( // NodeTask(graph_task, next.function, std::move(input_buffer))); } else { not_ready.emplace(next.function.get(), std::move(input_buffer)); } } else { // The function already has a buffer auto &input_buffer = not_ready_it->second; // Accumulates into buffer const auto opt_next_stream = next.function->stream(c10::DeviceType::CUDA); input_buffer.add(next.input_nr, std::move(output), opt_parent_stream, opt_next_stream); If (is_ready) {auto queue = ready_queue(cpu_ready_queue, input_buffer.device()); queue->push( NodeTask(graph_task, next.function, std::move(input_buffer))); not_ready.erase(not_ready_it); }}Copy the code

The specific logic diagram is as follows:

  1. Func points to the Node that is currently doing the reverse calculation.
  2. Func calls its own apply method to calculate, and obtains outputs. Suppose there are three outputs, traverse, we choose the third one as output.
  3. The func edge is the next_edges_ member variable, traversed, and we select the third edge as next.
  4. Dependencies_ next and GraphTask to determine if next is ready.
  5. If ready, build the output into an input_buffer and then generate a NodeTask and insert it into the corresponding ReadyQuieue.
  6. If not, build the output into an input_buffer and put it in GraphTask’s not_ready_ along with next, which will be used later.
1 +---------------+ func +--> | Node | +---> ... | | | | | | | apply() +------> outputs +------> ... 2 | | | | | | | | | +--------------+ | | +---> output +--> | input_buffer +--+ | | +--------------+ | | | | | | | | | | 5 | | | | | | | | + -- -- -- - >... | | | | +---------+ | | | | | | next_edges_+---> +----> ... 3 | | | | | | | | | | | | | | | 5 v | | | +----> next +------>+ YES | +------------+ +---------------+ | +---> push(NodeTask) +-----> | ReadyQueue | | 4 | | +------------+ | | | +---------------+ +--> Ready? +-+ | | GraphTask | | | 6 | | | | | NO | 6 | | | +----> next.function | | dependencies_+--> map<Node*, int> +-->+ + | | | | | | | | | | | 6 v v | not_ready_ +---------------------------------------------> map<Node*, InputBuffer> | | +---------------+Copy the code

The mobile phone is as follows:

0x06 Sweep operation

In thread_main, if the task has finished, do the following.

Auto Engine::thread_main(const STD ::shared_ptr<GraphTask>& graph_task) -> void {// Check if we've completed Execution. If (local_graph_task->completed()) {execute. Local_graph_task ->mark_as_completed_and_run_post_processing(); auto base_owner = local_graph_task->owner_; // The current worker thread finish the graph_task, but the owning thread // of the graph_task might be sleeping on pop() if it does not have work. // So we need to send a dummy function task to the owning thread just to // ensure that it's not sleeping, so that we can exit the thread_main. // If it has work, it might see that graph_task->outstanding_tasks_ == 0 // before it gets to the task, but it's a no-op anyway. // // NB: This is not necessary if the current thread is the owning thread. if (worker_device ! = base_owner) { // Synchronize outstanding_tasks_ with queue mutex std::atomic_thread_fence(std::memory_order_release); ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner) ->push(NodeTask(local_graph_task, nullptr, InputBuffer(0))); }}Copy the code

Let’s look at these sweeps next. Notice that this is the cleanup in thread_main.

6.1 Judgment End

The following code is used to determine if this GraphTask is finished, which is actually a NodeTask in the ReadyQueue that has yet to be run.

Outstanding_tasks_ is the number of nodeTasks to be processed. It is used to determine whether the GrapTask needs to be executed. The value is always added and then subtracted.

  • This value is 0 when GraphTask is created.
  • If a NodeTask is sent to the ReadyQueue, outstanding_tasks_ increases by 1.
  • After evaluate_function(task) is executed once by the worker thread, the value of outstanding_tasks is reduced by 1.
  • If this number is not zero, the GraphTask still needs to run.
Bool GraphTask: : completed () {/ / outstanding_tasks may be changed in the evaluate_function return outstanding_tasks_. The load () = = 0 | | (exit_on_error_ && has_error_.load()); }Copy the code

6.2 Follow-up & Notification

Mark_as_completed_and_run_post_processing is the subsequent processing.

Perform the subsequent operation exec_post_processing, and then notify the main thread using future_result_->markCompleted.

void GraphTask::mark_as_completed_and_run_post_processing() { // Allow only one thread one attempt to process this logic. if (future_completed_.exchange(true)) { // Future is already marked complete, or being marked as such. // In case the marking complete is only in progress, we add a // wait() to guarantee the future is marked complete on exit. future_result_->wait(); return; } try { // Run post processing, before marking the future as complete. // Drop lock prior to completing, to avoid holding across callbacks. std::unique_lock<std::mutex> lock(mutex_); exec_post_processing(); STD ::vector<Variable> vars = STD ::move(captured_vars_); // Need to unlock before we call markCompleted to avoid holding locks // when the callbacks are called. lock.unlock(); future_result_->markCompleted(std::move(vars)); } catch (STD ::exception& e) {future_result_->setErrorIfNeeded(STD :: current_Exception ()); }}Copy the code

6.2.1 Subsequent Operations

Next, if a callback was previously registered, the callback is called. Stream synchronization is also performed.

void GraphTask::exec_post_processing() { if (! not_ready_.empty()) { throw std::runtime_error("could not compute gradients for some functions"); } // set the thread_local current_graph_task_ as more callbacks can be installed // by existing final callbacks. GraphTaskGuard guard(shared_from_this()); // Lock mutex during each iteration for accessing final_callbacks.size() // Unlocking is necessary, because the callback can register // more callbacks (or they can be registered from other threads // while it's waiting.  std::unique_lock<std::mutex> cb_lock(final_callbacks_lock_); // WARNING: Don't use a range-for loop here because more callbacks may be // added in between callback calls, so iterators may become invalidated. for (size_t i = 0; i < final_callbacks_.size(); ++i) { cb_lock.unlock(); final_callbacks_[i](); cb_lock.lock(); } // Syncs leaf streams with default streams (if necessary) // See note "Streaming backwards" for (const auto& leaf_stream : leaf_streams) { const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA}; const auto default_stream = guard.getDefaultStream(leaf_stream.device()); if (leaf_stream ! = default_stream) { auto event = c10::Event{c10::DeviceType::CUDA}; event.record(leaf_stream); default_stream.wait(event); }}}Copy the code

6.2.2 Notify the main thread

Previously, fut->wait() was used in execute to wait for the task to complete. We omit some of the code below.

auto Engine::execute(const edge_list& roots,
                     const variable_list& inputs,
                     bool keep_graph,
                     bool create_graph,
                     bool accumulate_grad,
                     const edge_list& outputs) -> variable_list {
​
  
  // Queue the root
  if (skip_dummy_node) {
    execute_with_graph_task(graph_task, graph_root, std::move(input_buffer));
  } else {
    execute_with_graph_task(graph_task, graph_root, InputBuffer(variable_list()));
  }
  auto& fut = graph_task->future_result_;
  fut->wait();
  return fut->value().toTensorVector();
}
Copy the code

The main thread is notified in mark_AS_completed_and_run_post_processing with the following code.

future_result_->markCompleted(std::move(vars)); // Notify the main threadCopy the code

6.3 Notifying Other threads

If the task is from another work thread, that is, worker_Device! Dummy Function task = base_owner sends a dummy function task to the worker thread’s queue.

Local_graph_task represents the graph_task we retrieved from the queue. The external graph_ task represents the overall reentrant graph_ task that we need to execute.

In thread_main, there is a work around. The current worker thread completes the graph_task, but at this point, the thread that owns the graph_Task may be waiting to sleep on pop(). Therefore, we need to send a dummy function task to the owning thread to wake it up so that we can exit thread_main.

This occurs in the case of reentrant back propagation.

// If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
//    backward call from that device.
graph_task->owner_ = worker_device;
Copy the code

The specific code is as follows:

// Check if we've completed execution. if (local_graph_task->completed()) { local_graph_task->mark_as_completed_and_run_post_processing(); auto base_owner = local_graph_task->owner_; // If (worker_device! = base_owner) {// Synchronize outstanding_tasks_ with queue mutex std::atomic_thread_fence(std::memory_order_release); ready_queue_by_index(local_graph_task->cpu_ready_queue_, base_owner) ->push(NodeTask(local_graph_task, nullptr, InputBuffer(0))); // dummy task } }Copy the code

Other threads receive dummy task, do not process it because function is nullptr, and then call localReadyQueue ->pop() to read the next task from their queue.

Details are as follows:

  1. The main thread waits.
  2. If the worker thread finds that GraphTask has ended, it notifies the main thread.
  3. If you need to wake up another thread, insert a NodeTask into its queue.
  4. The corresponding thread takes out the NodeTask and executes it.
+------------------------------------------------+ | Worker Thread 1 | | | | thread_main{ | | | | mark_as_completed_and_run_post_processing | 2 markCompleted() | { | +-------------------+ | | | } | | | | +---------------+ | | push(NodeTask) +-----+ | | Main Thread | | | | | | | | | } | | | | | | | | | | | +------------------------------------------------+ | | | | | | | 3 | | | v v | | +-------+-------+ | | 1 +----------------+ | | | | wait() | | | ReadyQueue | | +------------> | future_result_ | | | | | | | +-------+-------+ |  | +----------------+ | | | | | | 4 | pop(NodeTask) | | | | | v | | +--------+---------------------+ | | | Worker Thread 2 | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

At this point, backward propagation has been analyzed. From the next chapter, we will officially enter into PyTorch distributed training.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

www.zhihu.com/column/gemf…

PyTorch: Talk about the code behind backward

Pytorch Note (Calculation diagram + Autograd)-Node(1)

Explain the network construction in Pytorch

PyTorch’s optimizer

Distribution of PyTorch

PyTorch’s Tensor

PyTorch’s Tensor

PyTorch’s Tensor

PyTorch dynamic diagram (part 2)

PyTorch dynamic diagram (part 1)

PyTorch Internals 5: Implementation of Autograd

A GENTLE INTRODUCTION TO TORCH.AUTOGRAD

PyTorch Learning Notes (12) — Introduction to The Autograd mechanism in PyTorch

The PyTorch Autograd