0 x00 the

In this series of articles, we introduced HugeCTR, an industry-oriented recommendation system training framework optimized for large-scale CTR models with model parallel embedding and data-parallel dense networks.

Thanks for using HugeCTR source code to read this masterpiece.

Other articles in this series are as follows:

NVIDIA HugeCTR, GPU version parameter server –(1)

NVIDIA HugeCTR, GPU version parameter server — (2)

NVIDIA HugeCTR, GPU version parameter server –(3)

NVIDIA HugeCTR, GPU version parameter server — (4)

NVIDIA HugeCTR GPU version Parameter server — (5) Embedded hash table

NVIDIA HugeCTR, GPU version parameter (6) — Distributed hash table

0x01 Previous review

So far, the logic is as follows:

Now we know that the basic structure of DistributedSlotSparseEmbeddingHash, then look forward.

To better illustrate, we give a practical example that assumes that there are two slots (User ID and Item ID), each with a maximum of four elements inside, and a dense vector length embedding_vec_size of 8. In the CSR file below, each line is a slot, so there are two samples, and each sample has two lines. Assume that batch size = 2, so the two samples are trained together.

*   40.50.10.20
*   30.50.10
*   30.20
*   10
* Will be convert to the form of:
* row offset: 0.4.7.9.10
* value: 40.50.10.20.30.50.10.30.20.10
Copy the code

The first sample includes:

40.50.10.20 # slot 1
30.50.10 # slot 2
Copy the code

The second sample is

30.20 # slot 1
10 # slot 2
Copy the code

0x02 General logic

The overall functionality of forward propagation is: Embedded_lookuper is responsible for the local GPU to compute and find the embed vector, i.e. user input -> embed vector. Only variables of the *train* name are considered and variables of the *evalute* name are ignored, that is, only the training logic is looked at.

2.1 Comments & Ideas

Comments in the source code are as follows:

/**
 * All the CUDA kernel functions used by embedding layer are defined in this file, including
 * forward propagation, backward propagation. The functions are defined by propagation type
 * and combiner type(sum or mean) as below:
 *   1) forward
 *        sum: calling forward_sum_kernel()
 *        mean: calling foward_sum_kernel() + forward_scale_kernel()
 *   2) backward:
 *        calculating wgrad:
 *          sum: calling backward_sum_kernel()
 *          mean: calling backward_mean_kernel()
 *        update embedding table: including several steps as below,
 *          step1: expand sample IDs, calling sample_id_expand_kernel()
 *          step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
 *          step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
 *          step4: count the number for each unduplicated value_index, calling value_count_kernel()
 *          step5: use optimizer method to compute deltaw, and record corresponding, including three
 * types of optimizer: Adam: caling opt_adam_kernel() Momentum sgd: calling
 * opt_momentum_sgd_kernel() Nesterov: calling opt_nesterov_kernel() step6: update embedding table
 * by deltaw, calling update_kernel()
 */
Copy the code

Read data from input_buffers_ -> look up -> write to output_tensors

  • frominput_buffers_Read data. Specifically throughfilter_keys_per_gpuTo complete theembedding_data_A series of configurations.
  • The embedding looks up from the hashmap of the local GPU by calling functors_. Forward_per_gpu.
    • By the characteristics of DistributedSlotSparseEmbeddingHash we know, because the current gpu corresponding data key in the gpu, so don’t need to do at this time the communication between the nodes.
    • herehash_tables_[i].hash_table_value_tensors_[i].hash_value_index_tensors_[i]Is the hashMap combination corresponding to the ith local GPU.
    • Embedding_data_. Get_value_tensors (IS_train)[I] is the input training data extracted from inside the GPU SPARSE input we mentioned earlier.
    • Conduct local protocols.
  • Perform Reduce Scatter. The data of each GPU is batch size, but each slot in each data is only a part of key. After Reduce Scatter is performed, data is complete, and a part of complete data is distributed to each GPU.
  • Wrote output_tensors.

Some specific definitions of member variables need to be recalled.

  • Hash_value_index_tensors_ : The row index of the embedding vector table. Row offset of a lower-dimensional matrix.
    • Note that its type is Tensors2, and its type is STD ::vector

      , so each GPU pair should have one element in the vector.
    • The number of rows of index and value is related.
    • The content is hash table value_index(Row index of embedding).
  • Hash_table_value_tensors_ : The value of the embedding vector table. It’s a lower dimensional matrix.
    • Note that its type is Tensors2, and its type is STD ::vector

      , so each GPU pair should have one element in the vector.
    • The content is called embedding Vector.
    • Find an embedding vector here using the result of hash_value_index_tensors_.

In the follow-up, we still simplified and ignored the situation of multiple workers and gpus.

2.2 Overall Code

The overall forward propagation code is as follows:

  • Multiple Gpus are propagated in parallel. Each thread corresponds to one GPU.
  • The filter_keys_per_GPU call completes the EmbeddingData configuration, where I is the GPU index, and gets the input data corresponding to the GPU.
  • Call forward_per_GPU to lookup from the hashMap of the local GPU.
  • After Reduce Scatter is done, the data is complete and part of the complete data is allocated to each GPU.
  • The all_reduce operation, which is combiner=mean, needs to be continued.
  • Forward_scale operation, do the average.
  /** * The forward propagation of embedding layer. */
  void forward(bool is_train, int eval_batch = - 1) override {
    // Read data from input_buffers_ -> look up -> write to output_tensors

#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
    { // Local multiple Gpus parallel forward propagation
      // Each thread corresponds to one GPU
      size_t i = omp_get_thread_num(a);// Get the thread id
      CudaDeviceContext context(embedding_data_.get_local_gpu(i).get_device_id());
      
      if (embedding_data_.embedding_params_.is_data_parallel) {
        // EmbeddingData is configured, where I is the GPU index
        filter_keys_per_gpu(is_train, i, embedding_data_.get_local_gpu(i).get_global_id(),
                            embedding_data_.get_resource_manager().get_global_gpu_count());
      }
      // Perform lookup from the hashMap of the gpu
      Hash_tables_ [I], hash_table_value_tensors_[I], and hash_value_index_tensors_[I] are the corresponding hashmaps
      functors_.forward_per_gpu(embedding_data_.embedding_params_.get_batch_size(is_train),
                                embedding_data_.embedding_params_.slot_num,
                                embedding_data_.embedding_params_.embedding_vec_size, 0, is_train,
                                embedding_data_.get_row_offsets_tensors(is_train)[i],
                                embedding_data_.get_value_tensors(is_train)[i],
                                *embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
                                hash_table_value_tensors_[i], hash_value_index_tensors_[i],
                                embedding_feature_tensors_[i],
                                embedding_data_.get_local_gpu(i).get_stream());
    }

    // do reduce scatter
    // Once done, the data is complete, and a portion of the complete data is assigned to each GPU
    size_t recv_count = embedding_data_.get_batch_size_per_gpu(is_train) *
                        embedding_data_.embedding_params_.slot_num *
                        embedding_data_.embedding_params_.embedding_vec_size;
    functors_.reduce_scatter(recv_count, embedding_feature_tensors_,
                             embedding_data_.get_output_tensors(is_train),
                             embedding_data_.get_resource_manager());

    // scale for combiner=mean after reduction
    if (embedding_data_.embedding_params_.combiner == 1) {
      size_t send_count = embedding_data_.embedding_params_.get_batch_size(is_train) *
                              embedding_data_.embedding_params_.slot_num +
                          1;
      functors_.all_reduce(send_count, embedding_data_.get_row_offsets_tensors(is_train),
                           row_offset_allreduce_tensors_, embedding_data_.get_resource_manager());

      // do average
      functors_.forward_scale(
          embedding_data_.embedding_params_.get_batch_size(is_train),
          embedding_data_.embedding_params_.slot_num,
          embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
          embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
    }

    return;
  }
Copy the code

The specific process is as follows:

0x03 Configuring Data

Previously, when EmbeddingData is initialized, only its member function train_keys_ is configured, train_keys_ being the sparse_input, the sparse tensor of CSR format.

template <typename TypeKey, typename TypeEmbeddingComp>
class EmbeddingData {
 public:
  const Embedding_t embedding_type_;
  SparseEmbeddingHashParams embedding_params_; /**< Sparse embedding hash params. */

  std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>>
      bufs_;                                         /**< The buffer for storing output tensors. */
  Tensors2<TypeEmbeddingComp> train_output_tensors_; /**< The output tensors. */
  Tensors2<TypeEmbeddingComp> evaluate_output_tensors_; /**< The output tensors. */
  Tensors2<TypeKey> train_row_offsets_tensors_; /**< The row_offsets tensors of the input data. */
  Tensors2<TypeKey> train_value_tensors_;       /**< The value tensors of the input data. */
  std::vector<std::shared_ptr<size_t>> train_nnz_array_;
  Tensors2<TypeKey>
      evaluate_row_offsets_tensors_;         /**< The row_offsets tensors of the input data. */
  Tensors2<TypeKey> evaluate_value_tensors_; /**< The value tensors of the input data. */
  std::vector<std::shared_ptr<size_t>> evaluate_nnz_array_;

  std::shared_ptr<ResourceManager> resource_manager_; /**< The GPU device resources. */

  SparseTensors<TypeKey> train_keys_;
  SparseTensors<TypeKey> evaluate_keys_;
  Tensors2<TypeKey> embedding_offsets_;
}
Copy the code

At this point the data is as follows, embedding_offsets_ and train_output_tensors_ are both pre-allocated, and we assume the CSR data is: 40,50,10,20,30,50,10,30,20, CSR row offset is 0,4,7,9.

3.1 the CUB function

We’ll start with a few cub library methods, which are NVIDIA’s libraries for manipulating CUDA. Some common methods, such as array summation, can be parallelized. If CUDA is parallel, it can be implemented at high speed. The url is: docs.nvidia.com/cuda/cub/in…

3.1.1 cub: : DeviceScan: : InclusiveSum

This function calculates the inclusive prefix sum using the GPU.

The following is an example:

 * int  num_items;      // e.g., 7
 * int  *d_in;          // e.g., [8, 6, 7, 5, 3, 0, 9]
 * int  *d_out;         // e.g., [ , , , , , , ]*... * *// Determine temporary device storage requirements for inclusive prefix sum
 * void     *d_temp_storage = NULL;
 * size_t   temp_storage_bytes = 0;
 * cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes, d_in, d_out, num_items);
 *
 * // Allocate temporary storage for inclusive prefix sum
 * cudaMalloc(&d_temp_storage, temp_storage_bytes);
 *
 * // Run inclusive prefix sum
 * cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes, d_in, d_out, num_items);
 *
 * // d_out <-- [8, 14, 21, 26, 29, 29, 38]
Copy the code

The function is implemented as:

/** * \brief Computes a device-wide inclusive prefix sum. * * \par * - Supports non-commutative sum operators. * - Provides "run-to-run" determinism for pseudo-associative reduction * (e.g., addition of floating point types) on the same GPU device. * However, results for pseudo-associative reduction may be inconsistent * from one device to a another device of a different compute-capability * because CUB can employ different tile-sizing for different architectures. * - \devicestorage */
template <
    typename            InputIteratorT,
    typename            OutputIteratorT>
CUB_RUNTIME_FUNCTION
static cudaError_t InclusiveSum(
    void*               d_temp_storage,                 ///< [in] %Device-accessible allocation of temporary storage. When NULL, the required allocation size is written to \p temp_storage_bytes and no work is done.
    size_t&             temp_storage_bytes,             ///< [in,out] Reference to size in bytes of \p d_temp_storage allocation
    InputIteratorT      d_in,                           ///< [in] Pointer to the input sequence of data items
    OutputIteratorT     d_out,                          ///< [out] Pointer to the output sequence of data items
    int                 num_items,                      ///< [in] Total number of input items (i.e., the length of \p d_in)
    cudaStream_t        stream             = 0.///< [in] [optional] CUDA stream to launch kernels within. Default is stream0.
    bool                debug_synchronous  = false)     ///< [in] [optional] Whether or not to synchronize the stream after every kernel launch to check for errors. May cause significant slowdown. Default is \p false.
{
    // Signed integer type for global offsets
    typedef int OffsetT;

    return DispatchScan<InputIteratorT, OutputIteratorT, Sum, NullType, OffsetT>::Dispatch(
        d_temp_storage,
        temp_storage_bytes,
        d_in,
        d_out,
        Sum(),
        NullType(),
        num_items,
        stream,
        debug_synchronous);
}
Copy the code

If you want to study, you can see nvlabs. Making. IO/cub/structc… .

3.1.2 cub: : DeviceSelect: : the If

The select_op function splits the corresponding element from d_in into a partition sequence d_out, using select_op. The total number of elements copied to the first partition is written d_num_selected_out.

In this example, the items less than 7 are placed in the first partition, and the number of elements in the partition is 5.

 * // Functor type for selecting values less than some criteria
 * struct LessThan
 * {
 *     int compare;
 *
 *     CUB_RUNTIME_FUNCTION __forceinline__
 *     LessThan(int compare) : compare(compare) {}
 *
 *     CUB_RUNTIME_FUNCTION __forceinline__
 *     bool operator(a)(const int &a) const {*return(a < compare); * *}}; * *// Declare, allocate, and initialize device-accessible pointers for input and output
 * int      num_items;              // e.g., 8
 * int      *d_in;                  // e.g., [0, 2, 3, 9, 5, 2, 81, 8]
 * int      *d_out;                 // e.g., [ , , , , , , , ]
 * int      *d_num_selected_out;    // e.g., [ ]
 * LessThan select_op(7); *... * *// Determine temporary device storage requirements
 * void     *d_temp_storage = NULL;
 * size_t   temp_storage_bytes = 0;
 * cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes, d_in, d_out, d_num_selected_out, num_items, select_op);
 *
 * // Allocate temporary storage
 * cudaMalloc(&d_temp_storage, temp_storage_bytes);
 *
 * // Run selection
 * cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes, d_in, d_out, d_num_selected_out, num_items, select_op);
 *
 * // d_out <-- [0, 2, 3, 5, 2, 8, 81, 9]
 * // d_num_selected_out <-- [5]
Copy the code

The function implementation is

    /** * \brief Uses the \p select_op functor to split the corresponding items from \p d_in into a partitioned sequence \p d_out. The total number of items copied into the first partition is written to \p d_num_selected_out. */
    template <
        typename                    InputIteratorT,
        typename                    OutputIteratorT,
        typename                    NumSelectedIteratorT,
        typename                    SelectOp>
    CUB_RUNTIME_FUNCTION __forceinline__
    static cudaError_t If(
        void*               d_temp_storage,                ///< [in] %Device-accessible allocation of temporary storage. When NULL, the required allocation size is written to \p temp_storage_bytes and no work is done.
        size_t                      &temp_storage_bytes,            ///< [in,out] Reference to size in bytes of \p d_temp_storage allocation
        InputIteratorT              d_in,                           ///< [in] Pointer to the input sequence of data items
        OutputIteratorT             d_out,                          ///< [out] Pointer to the output sequence of partitioned data items
        NumSelectedIteratorT        d_num_selected_out,             ///< [out] Pointer to the output total number of items selected (i.e., the offset of the unselected partition)
        int                         num_items,                      ///< [in] Total number of items to select from
        SelectOp                    select_op,                      ///< [in] Unary selection operator
        cudaStream_t                stream             = 0.///< [in] [optional] CUDA stream to launch kernels within. Default is stream0.
        bool                        debug_synchronous  = false)     ///< [in] [optional] Whether or not to synchronize the stream after every kernel launch to check for errors. May cause significant slowdown. Default is \p false.
    {
        typedef int                     OffsetT;         // Signed integer type for global offsets
        typedef NullType*               FlagIterator;   // FlagT iterator type (not used)
        typedef NullType                EqualityOp;     // Equality operator (not used)

        return DispatchSelectIf<InputIteratorT, FlagIterator, OutputIteratorT, NumSelectedIteratorT, SelectOp, EqualityOp, OffsetT, true> : :Dispatch(
            d_temp_storage,
            temp_storage_bytes,
            d_in,
            NULL,
            d_out,
            d_num_selected_out,
            select_op,
            EqualityOp(), num_items, stream, debug_synchronous); }};Copy the code

If you want to study, see nvlabs. Making. IO/cub/structc…

3.1.3 Temporary Storage

Cub method in front, there will be a temporary storage area, so one DistributedSlotSparseEmbeddingHash DistributedFilterKeyStorage is used to achieve this goal.

template <typename TypeHashKey, typename TypeEmbeddingComp>
class DistributedSlotSparseEmbeddingHash : public IEmbedding {
  using NvHashTable = HashTable<TypeHashKey, size_t>;

  std::vector<DistributedFilterKeyStorage<TypeHashKey>> filter_keys_storage_;
}
Copy the code

DistributedFilterKeyStorage are defined as follows:

template <typename TypeHashKey>
struct DistributedFilterKeyStorage {
  Tensor2<size_t> value_select_num;
  Tensor2<void> temp_value_select_storage;

  Tensor2<TypeHashKey> rowoffset_select;
  Tensor2<void> temp_rowoffset_select_scan_storage;

  DistributedFilterKeyStorage(const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf,
                              size_t max_nnz, size_t rowoffset_count, size_t global_id,
                              size_t global_num);
};
Copy the code

The specific construction method is as follows:

template <typename TypeHashKey>
DistributedFilterKeyStorage<TypeHashKey>::DistributedFilterKeyStorage(
    const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf, size_t max_nnz,
    size_t rowoffset_count, size_t global_id, size_t global_num) {
  buf->reserve({1}, &value_select_num);
  // select value
  {
    distributed_embedding_kernels::HashOp<TypeHashKey> select_op{global_id, global_num};
    size_t size_in_bytes = 0;
    cub::DeviceSelect::If(nullptr, size_in_bytes, (TypeHashKey *)nullptr, (TypeHashKey *)nullptr,
                          (size_t *)nullptr, max_nnz, select_op);
    buf->reserve({size_in_bytes}, &temp_value_select_storage);
  }

  // count rowoffset
  {
    size_t size_in_bytes = 0;
    cub::DeviceScan::InclusiveSum(nullptr, size_in_bytes, (TypeHashKey *)nullptr,
                                  (TypeHashKey *)nullptr, rowoffset_count);
    buf->reserve({size_in_bytes}, &temp_rowoffset_select_scan_storage);
  }
  buf->reserve({rowoffset_count}, &rowoffset_select);
}
Copy the code

3.2 Configuring Data

In the forward propagation, the first is use among filter_keys_per_gpu train_keys_ to configure other member variables, the purpose is to get the gpus DistributedSlotSparseEmbeddingHash corresponding to the input data. Recall that the members of EmbeddingData get_output_tensors, get_input_keys, get_row_offsets_tensors, get_value_tensors, Get_nnz_array all return references, indicating that most member variables can be modified directly. The specific configuration code is as follows:

template <typename TypeHashKey, typename TypeEmbeddingComp>
void DistributedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::filter_keys_per_gpu(
    bool is_train, size_t id, size_t global_id, size_t global_num) {
  
  // Configure the current GPU
  
  // Get train_keys_ and use it to configure row offsets and values
  const SparseTensor<TypeHashKey> &all_gather_key = embedding_data_.get_input_keys(is_train)[id];
  Embedding_data_. Train_row_offsets_tensors_ = embedding_data_
  Tensor2<TypeHashKey> rowoffset_tensor = embedding_data_.get_row_offsets_tensors(is_train)[id];
  Train_value_tensors_ = embedding_data_.train_value_tensors_ = embedding_data_train_value_tensors_ = embedding_data_train_value_tensors_ = embedding_data_train_value_tensors_
  Tensor2<TypeHashKey> value_tensor = embedding_data_.get_value_tensors(is_train)[id];
  std::shared_ptr<size_t> nnz_ptr = embedding_data_.get_nnz_array(is_train)[id];
  auto &filter_keys_storage = filter_keys_storage_[id];
  auto &stream = embedding_data_.get_local_gpu(id).get_stream(a);size_t batch_size = embedding_data_.embedding_params_.get_batch_size(is_train);
  size_t slot_num = (all_gather_key.rowoffset_count() - 1) / batch_size;
  size_t rowoffset_num = batch_size * slot_num + 1;
  size_t rowoffset_num_without_zero = rowoffset_num - 1;

  // select value
  {
    distributed_embedding_kernels::HashOp<TypeHashKey> select_op{global_id, global_num};

    size_t size_in_bytes = filter_keys_storage.temp_value_select_storage.get_size_in_bytes(a);/ / configuration embedding_data_ train_value_tensors_
    cub::DeviceSelect::If(filter_keys_storage.temp_value_select_storage.get_ptr(), size_in_bytes,
                          all_gather_key.get_value_ptr(), value_tensor.get_ptr(),
                          filter_keys_storage.value_select_num.get_ptr(), all_gather_key.nnz(),
                          select_op, stream);
  }

  // select rowoffset
  {
    cudaMemsetAsync(filter_keys_storage.rowoffset_select.get_ptr(), 0,
                    filter_keys_storage.rowoffset_select.get_size_in_bytes(), stream);
    {
      constexpr int block_size = 512;
      int grid_size = (rowoffset_num_without_zero - 1) / block_size + 1;
      distributed_embedding_kernels::select_rowoffset<<<grid_size, block_size, 0, stream>>>(
          all_gather_key.get_rowoffset_ptr(), rowoffset_num_without_zero,
          all_gather_key.get_value_ptr(), filter_keys_storage.rowoffset_select.get_ptr(), global_id,
          global_num);
    }
    {
      size_t size_in_bytes =
          filter_keys_storage.temp_rowoffset_select_scan_storage.get_size_in_bytes(a);// Configure rowoffset, copy to rowoffset_tensor.
      cub::DeviceScan::InclusiveSum(
          filter_keys_storage.temp_rowoffset_select_scan_storage.get_ptr(), size_in_bytes,
          filter_keys_storage.rowoffset_select.get_ptr(), rowoffset_tensor.get_ptr(), rowoffset_num,
          stream);
    }

    // select nnz
    // Just copy it
    cudaMemcpyAsync(nnz_ptr.get(), filter_keys_storage.value_select_num.get_ptr(), sizeof(size_t), cudaMemcpyDeviceToHost, stream); }}Copy the code

Train_value_tensors_ corresponds to CSR value, train_ROW_offsetS_tensors_ corresponds to CSR row offset, Copy from SparseTensor to EmbeddingData.

Combined with our example, the final forward propagation input training data is:

0 x04 Lookup operation

This part is to complete the embedded table look up operation. EmbeddingData now gets a variety of configurations, known as sparse input parameters, so you can use that as the key to get the embedding vector. This part is done inside the Forward_per_GPU.

  functors_.forward_per_gpu(embedding_data_.embedding_params_.get_batch_size(is_train),
                            embedding_data_.embedding_params_.slot_num,
                            embedding_data_.embedding_params_.embedding_vec_size, 0, is_train,
                            embedding_data_.get_row_offsets_tensors(is_train)[i],
                            embedding_data_.get_value_tensors(is_train)[i],
                            *embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
                            hash_table_value_tensors_[i], hash_value_index_tensors_[i],
                            embedding_feature_tensors_[i],
                            embedding_data_.get_local_gpu(i).get_stream());
}
Copy the code

4.1 Data Extraction

Methods such as get_row_offsets_tensors are used to extract input data from embedding_data_. The data extraction code corresponding to reading data from input_buffers_ is as follows: Input data is read from GPU SPARSE input CSR data, which is used as the key for subsequent search in hash table:

Tensors2<TypeKey>& get_value_tensors(bool is_train) {
  if (is_train) {
    return train_value_tensors_;
  } else {
    returnevaluate_value_tensors_; }}Copy the code

Read the offset code from the CSR as follows:

Tensors2<TypeKey>& get_row_offsets_tensors(bool is_train) {
  if (is_train) {
    return train_row_offsets_tensors_;
  } else {
    returnevaluate_row_offsets_tensors_; }}Copy the code

Since there are tens of millions of inputs, and only a few hundred of them may be non-zero, the hash table maps those tens of millions of inputs for the first time, reducing a lot of memory space.

4.2 find

Here’s where the code comes in, using a hash table to get the corresponding embedding vector from the input CSR.

The forward_per_GPU is divided into two parts: lookup and internal protocol.

4.2.1 Search operator

The forward_per_GPU function is used specifically to do a lookup. Its usage can be seen from its annotations, which we analyzed earlier.

@param row_offset row_offset (CSR format of input sparse tensors)
@param hash_key value (CSR format of input sparse tensors)
@param nnz non-zero feature number per batch
@param hash_table hash table, pairs of <key, value_index>
@param hash_table_value hash table value, which represents embedding vector
@param hash_value_index hash table value_index(row index of embedding)
Copy the code

The parameters here are all references, and you can modify the external data.

  • First, use hash_key value (CSR format of Input Sparse tensors) to call get_insert to find the hash table. If found, the result is hash_value_index. This value is the row index of the low-dimensional embedding table. This part of the code is hash_table.get_insert related. Instead of using the get_insert return value, we insert the hash_key value into the hash table to obtain a mapping, which is done by using CSR row offset.

  • Hash_table. get_insert Returns if it is found in the internal data structure of the hash_table, or inserts an incrementing value if it is not found, which is set to hash_value_index.

  • The final embedding vector is added in hash_table_value using hash_value_index as the index, and is first reduced inside the slot. This part of the code is forward_sum and forward_mean.

So when are hash_table_value_tensors_[I] and hash_value_index_tensors_ set? In fact, it is done on forward_per_GPU. The specific logic is shown in the following figure:

The specific code is:

/** * forward propagation on each GPU for LocalizedSlotSparseEmbeddingHash * @param batch_size batch size for the current mini-batch computation. * @param slot_num the number of slots for current GPU * @param embedding_vec_size embedding vector size. * @param combiner 0-sum; 1-mean * @param row_offset row_offset (CSR format of input sparse tensors) * @param hash_key value (CSR format of input sparse tensors) * @param nnz non-zero feature number per batch * @param hash_table hash table, pairs of 
      
        * @param hash_table_value hash table value, which represents embedding vector * @param hash_value_index hash table value_index(row index of embedding) * @param embedding_feature embedding feature (output) * @param stream cuda stream */
      ,>
template <typename TypeHashKey, typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::forward_per_gpu(
    size_t batch_size, size_t slot_num, size_t embedding_vec_size, int combiner, bool train,
    const Tensor2<TypeHashKey> &row_offset, const Tensor2<TypeHashKey> &hash_key, size_t nnz,
    HashTable<TypeHashKey, size_t> &hash_table, const Tensor2<float> &hash_table_value,
    Tensor2<size_t> &hash_value_index, Tensor2<TypeEmbeddingComp> &embedding_feature,
    cudaStream_t stream) {
  try {
    if (train) { // The training will come here
      [hash_value_index] [hash_value_index] [hash_value_index] [hash_value_index
      // Insert the hash_key value into the hash table using CSR row offset
      hash_table.get_insert(hash_key.get_ptr(), hash_value_index.get_ptr(), nnz, stream);
    } else {
      hash_table.get_mark(hash_key.get_ptr(), hash_value_index.get_ptr(), nnz, stream);
    }

    // do sum reduction
    if (combiner == 0) { // 0-sum; 1-mean
      // Then use hash_value_index to get the value from hash_table_value
      forward_sum(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
                  hash_value_index.get_ptr(), hash_table_value.get_ptr(),
                  embedding_feature.get_ptr(), stream);
    } else if (combiner == 1) {
      // Then use hash_value_index to get the value from hash_table_value
      forward_mean(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
                   hash_value_index.get_ptr(), hash_table_value.get_ptr(),
                   embedding_feature.get_ptr(), stream);
    } else {
      CK_THROW_(Error_t::WrongInput, "Invalid combiner type "); }}catch (const std::runtime_error &rt_err) {
    std::cerr << rt_err.what() << std::endl;
    throw;
  }

  return;
}
Copy the code

Operator internal also divided into get_insert to handle hash table, and combiner processing, we have a look.

4.2.2 get_insert

Get and insert hash tables get and insert hash tables You do not need to assign an initial value to the hash table at the beginning of the training. Instead, you use get_INSERT to dynamically insert the hash table during the training.

Let’s remember how it works.

Let’s say there are 100 million words, and 40 is the 40th word. If you want to indicate that 10,30,40,50,20 are valid in 100 million words, the most common way is to make a 100 million length array and set the five positions 40,50,20,30,10 to 1 and the rest to 0. The corresponding embedding matrix is also a high dimensional matrix, such as a 100 million x 64 dimensional matrix.

If you want to save space, you can build a small data structure (low dimensional matrix) to store these meaningful values, and create a hash function m_hf to “convert from a high dimensional matrix to a low dimensional matrix”, which is 10 –>? , 20 — >? And so on.

If I pick the tens digit as key, for our example, it’s key

m_hf(10) =1
m_hf(20) =2
m_hf(30) =3
m_hf(40) =4
m_hf(50) =5
Copy the code

1, 2, 3, 4, and 5 are the internal hash_values, called hash_value (as in the following code), and the corresponding internal storage array is called hashtbl_VALUES. But because buckets are divided, inside the hash table they are placed in hashtbl_values (we made a simplification here, hashtbl_values[I] = I).

hashtbl_values[1] = 1, hashtbl_values [2] = 2, hashtbl_values[3] = 3.Copy the code

The above said is a hash table, we return to DistributedSlotSparseEmbeddingHash itself, so 1, 2, 3 (array of content, is not an array index, Simplification is exactly equal to) is DistributedSlotSparseEmbeddingHash want 10, 20, 30, the corresponding data is 10 in low dimensional embedding table the first position, 20 in the low dimensional embedding table location, the second is of low dimension matrix row offset. That is, the contents of hash_value_index are [1,2,3,4,5]. These are the indexes of the original input data 10, 20, 30, 40, and 50 in hash_table_value. Therefore, 10 corresponds to hash_table_value[1], 20 to hash_table_value[2], and so on.

To return to the hash table, the get_insert code for NvHashTable is as follows.

template <typename KeyType, typename ValType>
void NvHashTable<KeyType, ValType>::get_insert(const void *d_keys, void *d_vals, size_t len, cudaStream_t stream) {
    const KeyType *_d_keys = reinterpret_cast<const KeyType*>(d_keys);
    ValType *_d_vals = reinterpret_cast<ValType*>(d_vals);
    return hashtable_.get_insert(_d_keys, _d_vals, len, stream);
}
Copy the code

The get_insert of HashTable is located in sparse_OPERATION_kit /kit_cc/kit_cc_infra/ SRC/HashTable /nv_hashtable.cu. Here, parallel operation is performed on GPU to extract value.

template <typename Table>
__global__ void get_insert_kernel(Table* table, const typename Table::key_type* const keys,
                                  typename Table::mapped_type* const vals, size_t len,
                                  size_t* d_counter) {
  ReplaceOp<typename Table::mapped_type> op;
  const size_t i = blockIdx.x * blockDim.x + threadIdx.x;
  if (i < len) {
    auto it = table->get_insert(keys[i], op, d_counter);
    vals[i] = it->second; // The external hash_value_index is set here}}template <typename KeyType, typename ValType>
void HashTable<KeyType, ValType>::get_insert(const KeyType* d_keys, ValType* d_vals, size_t len,
                                             cudaStream_t stream) {
  if (len == 0) {
    return;
  }
  const int grid_size = (len - 1) / BLOCK_SIZE_ + 1;
  get_insert_kernel<<<grid_size, BLOCK_SIZE_, 0, stream>>>(container_, d_keys, d_vals, len,
                                                           d_counter_);
}
Copy the code

Finally came to HugeCTR/include/hashtable cudf/concurrent_unordered_map cuh. If there is no value, a value is generated.

// __forceinline__ means to compile as an inline function
// __host__ __device__ indicates that this function is compiled for both hosts and devices
template <typename aggregation_type, typename counter_type, class comparison_type = key_equal,
          typename hash_value_type = typename Hasher::result_type>
__forceinline__ __device__ iterator get_insert(const key_type& k, aggregation_type op,
                                               counter_type* value_counter,
                                               comparison_type keys_equal = key_equal(),
                                               bool precomputed_hash = false,
                                               hash_value_type precomputed_hash_value = 0) {
  const size_type hashtbl_size = m_hashtbl_size;
  value_type* hashtbl_values = m_hashtbl_values;

  hash_value_type hash_value{0};

  // If a precomputed hash value has been passed in, then use it to determine
  // the write location of the new key
  if (true == precomputed_hash) {
    hash_value = precomputed_hash_value;
  }
  // Otherwise, compute the hash value from the new key
  else {
    hash_value = m_hf(k); // 3356 as the key, get a hash_value
  }

  size_type current_index = hash_value % hashtbl_size; // Find the location
  value_type* current_hash_bucket = &(hashtbl_values[current_index]); // Find the bucket at that location
  const key_type insert_key = k;
  bool insert_success = false;
  size_type counter = 0;

  while (false == insert_success) {
    // Situation %5: No slot: All slot in the hashtable is occupied by other key, both get and
    // insert fail. Return empty iterator
    // The hash table is full
    if (counter++ >= hashtbl_size) {
      return end(a); } key_type& existing_key = current_hash_bucket->first;// This is the table key
    volatile mapped_type& existing_value = current_hash_bucket->second; // This is the table value

    // If existing_key == unused_key, the current hash position is empty, so existing_key is updated from atomicCAS to insert_key.
    // If existing_key == insert_key, this position is already inserted into the key.
    // In either case, the atomic aggregation of existing_value and insert_value is performed. Since the hash table is initialized with the identifier value of the aggregate operation, it is safe to perform this operation while existing_value still has its initial value
    // Try and set the existing_key for the current hash bucket to insert_key
    const key_type old_key = atomicCAS(&existing_key, unused_key, insert_key);

    // If old_key == unused_key, the current hash bucket was empty
    // and existing_key was updated to insert_key by the atomicCAS.
    // If old_key == insert_key, this key has already been inserted.
    // In either case, perform the atomic aggregation of existing_value and insert_value
    // Because the hash table is initialized with the identity value of the aggregation
    // operation, it is safe to perform the operation when the existing_value still
    // has its initial value
    // TODO: Use template specialization to make use of native atomic functions
    // TODO: How to handle data types less than 32 bits?

    // Situation #1: Empty slot: this key never exist in the table, ready to insert.
    if (keys_equal(unused_key, old_key)) { // If no hash key is found
      existing_value = (mapped_type)(atomicAdd(value_counter, 1)); // Hash value is incremented
      break;

    }  // Situation #2+#3: Target slot: This slot is the slot for this key
    else if (keys_equal(insert_key, old_key)) {
      while (existing_value == m_unused_element) {
        // Situation #2: This slot is inserting by another CUDA thread and the value is not yet
        // ready, just wait
      }
      // Situation #3: This slot is already ready, get successfully and return (iterator of) the
      // value
      break;
    }
    // Situation 4: Wrong slot: This slot is occupied by other key, get fail, do nothing and
    // linear probing to next slot.

    // This position is already occupied by another key and can only be traversed backwards
    current_index = (current_index + 1) % hashtbl_size;
    current_hash_bucket = &(hashtbl_values[current_index]);
  }

  return iterator(m_hashtbl_values, m_hashtbl_values + hashtbl_size, current_hash_bucket);
}
Copy the code

The specific logical evolution is as follows:

4.3 combiner

Once you have multiple vectors, you need to do aggregation, because this is too tedious, so let’s take it out separately, and raise it to the same level as finding it, don’t get me wrong.

4.3.1 Why aggregation

In the CTR field, multiple embedding vectors are merged into one vector, namely pooling. Let’s say the user reads 3 art books and 2 sports books, so reading habits = 3 * art + 2 * sports. Such aggregation often uses weighted pooling rather than concat. Although concat is better, pooling is faster, and the advantage of pooling is that a new tensor of the same length can be generated even if the vectors are of different lengths. For example, the embeddingSize of a feature is 10, and now the number of all fields is 50, five of which are features of sequence form (40 for the upper limit of sequence length). There are two ways you can handle this:

  • Mean /sum pooling: The number of parameters is 10 * 50 = 500

  • Concat: The number of embedding parameters is 10*(50-5) + 40 * 10* 5 = 2450

If concat is used, the number of embedding parameters directly increases by about 4 times. The largest number of embedding parameters in the actual CTR model is generally the layer of embedding -> MLP, so concat will directly slow down the inference on the line.

4.3.2 Design criteria

Recall the design rule mentioned earlier: Embedded tables can be split into multiple slots (or feature fields). To get the best performance on different embeddings, you can choose different embedding layer implementations.

  • LocalizedSlotEmbeddingHash: features in the same slot (domain) are stored in a GPU, which is why it’s called the “localization” trough, according to the index number of groove, different slot may be stored in different GPU.

  • DistributedSlotEmbeddingHash: all features are stored in different domain/slot, no matter how much trough index number is all of these characteristics according to the characteristics of the index number distribution to different gpus. This means that features in the same slot may be stored in different Gpus, which is why it is called a “distributed slot”. Since the global protocol is required, DistributedSlotEmbedding works well if the embedding is greater than the GPU memory size, and therefore more memory swapping between Gpus.

It is important to note that LocalizedSlotEmbeddingHash and DistributedSlotEmbeddingHash difference is that the same slot (domain) the feature is stored in the same GPU. For example, there are 2 GPU cards with 4 slots.

  • Local mode: GPU0 stores SLOt0 and SLOt1, and GPU1 stores SloT2 and SLOt3.
  • Distribute mode: Each GPU stores some parameters of all slots, and determines how to allocate a parameter to a GPU using the hash method.

In the process of embedding search, the sparse feature input belonging to the same slot is simplified into a single embedding vector after being transformed into the corresponding dense embedding vector. The embedding vectors from the different slots are then joined together. This is the combiner operation mentioned earlier.

4.3.3 Combiner code

Now that we have the embedding table index, we need to see how we can get the embedding vector and simply manipulate it.

// do sum reduction
if (combiner == 0) { // 0-sum; 1-mean Here is the combiner type
  // Then use hash_value_index to get the value from hash_table_value
  forward_sum(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
              hash_value_index.get_ptr(), hash_table_value.get_ptr(),
              embedding_feature.get_ptr(), stream);
} else if (combiner == 1) {
  // Then use hash_value_index to get the value from hash_table_value
  forward_mean(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
               hash_value_index.get_ptr(), hash_table_value.get_ptr(),
               embedding_feature.get_ptr(), stream);
}
Copy the code

This is done by forward_sum and forward_mean, so let’s use forward_sum.

// do sum reduction
template <typename TypeHashKey>
void forward_sum(size_t batch_size, size_t slot_num, size_t embedding_vec_size,
                 const TypeHashKey *row_offset, const size_t *hash_value_index,
                 const float *hash_table_value, __half *embedding_feature, cudaStream_t stream) {
  
  const size_t grid_size = batch_size;  // each block corresponds to a sample
  if (embedding_vec_size % 2= =0) {
    const size_t block_size = embedding_vec_size / 2;
    forward_sum_align2_kernel<<<grid_size, block_size, 0, stream>>>(
        batch_size, slot_num, embedding_vec_size / 2, row_offset, hash_value_index,
        hash_table_value, embedding_feature);
  } else {
    const size_t block_size =
        embedding_vec_size;  // each thread corresponds to one element in an embedding vector
    forward_sum_kernel<<<grid_size, block_size, 0, stream>>>( batch_size, slot_num, embedding_vec_size, row_offset, hash_value_index, hash_table_value, embedding_feature); }}Copy the code

There are two comments to note in the code above

  • grid_size = batch_size; // each block corresponds to a sample
  • const size_t block_size = embedding_vec_size; // each thread corresponds to one element in an embedding vector
4.3.3.1 example

Recall our example:

*   40.50.10.20
*   30.50.10
*   30.20
*   10
* Will be convert to the form of:
* row offset: 0.4.7.9.10
* value: 40.50.10.20.30.50.10.30.20.10
Copy the code

The first sample includes:

40.50.10.20 # slot 1
30.50.10 # slot 2
Copy the code

The second sample is

30.20 # slot 1
10 # slot 2
Copy the code

So you should have 10 dense vectors, so let’s say 40 has a dense vector, 50 has a dense vector.

How do you know which row 40 corresponds to in a low-dimensional embedded table? Through a hash table, if the hash function is to select ten digits as key, then:

m_hf(40) =4
Copy the code

So, we know that 40 should be embedded in row 4 of the lower-dimensional table (we’ve simplified the hash table).

4.3.3.2 points

The code for forward_sum_kernel is as follows, which is a bit of a puzzle and requires careful analysis with comments.

The first point is to recall the use of hash_value_index_tensors_ :

Careful readers may wonder if hash_value_index_tensors_ is useless if a hash table can map from a higher dimensional offset to a lower one. Here’s the explanation:

  • In fact, because of decoupling, hash_value_index_tensors_ should not know how many lower-dimensional matrices are mapped from the higher-dimensional matrix inside the hash table, and the hash_value_index_tensors_ size should not change accordingly.
  • Therefore, hash_value_index_tensors_ is fixed to batch_size * nnz_per_slot, which can be thought of as the number of elements in the CSR. Therefore, hash_value_index_tensors_ actually records the corresponding low-dimensional matrix offset value of each element, and hash_value_index_tensors_ actually corresponds to the position of elements in CSR.
  • Therefore, when the embedded table is finally searched, CSR row offset is used to find each element in CSR. Thus, the index of hash_value_index_tensors_ table is also found, and its low-dimensional matrix offset can be found.
  • For our example, the value of hash_value_index_tensors_ is 4,5,1,2,3,5,1,3,2,1.

The remaining points are:

  • Bid is the number of samples.
  • Tid is the number of elements of the final embedded vector, and each thread processes one element of the embedded vector.
  • Hash_value_index is a pointer to the offset table of a low-dimensional embedded table.
    • Hash_value_index is a table that contains hash_value_index_tensors_.
  • Row_offset is CSR offset, for example 0,4,7,9,10, so for the second sample, row offset is 7,9.
  • Hash_table_value can be thought of as an array in which the low-dimensional embedding matrix is stored. Hash_table_value [value_index * embedding_vec_size] is the corresponding dense vector.
4.3.3.3 Annotated version code
// forward kernel funcion: for both combiner=sum and combiner=mean
template <typename TypeKey, typename TypeEmbeddingComp>
__global__ void forward_sum_kernel(int batch_size, int slot_num, int embedding_vec_size,
                                   const TypeKey *row_offset, const size_t *hash_value_index,
                                   const float *hash_table_value,
                                   TypeEmbeddingComp *embedding_feature) {
  
  // if the bid is 1, it is the second sample
  int bid = blockIdx.x;   // each block corresponding to one sample
  // Tid is ultimately the number of elements in the embedded vector. Each thread processes one element of the embedded vector
  int tid = threadIdx.x;  // each thread corresponding to one element in the embedding vector

  if (bid < batch_size && tid < embedding_vec_size) { // batch_size = 2
    for (int i = 0; i < slot_num; i++) { // slot_num = 2
      // find the position of the current row in the row offset, such as 2 or 3, from 0,4,7,9,10
      int feature_row_index = bid * slot_num + i; // feature_row_index has the range 2,3
      // Get the element offset of the current row within the CSR, row 0, row 1 is the first sample, row 2, and row 3 is the second sample
      TypeKey value_offset = row_offset[feature_row_index]; // Line 2 has a value_offset of 7 and line 3 of 9
      // The number of elements in each row is 9-7=2 and 10-9=1
      TypeKey feature_num = 
          row_offset[feature_row_index + 1] - value_offset;  // number of hash values in one slot

      float sum = 0.0 f;

      // reduce in a slot
      for (int j = 0; j < feature_num; j++) { // The number of elements in the row, row 2 is 2, row 3 is 1
        // select hash_value_index (3,2) and hash_value_index (3,2)
        size_t value_index = hash_value_index[value_offset + j];
        // Fetch the values of the 3rd and 2nd elements of hash_table_value
        // value_index is the starting position of the CSR user ID in the hash_table_value. That is, hash_value_index records the CSR user ID in the hash_table_value line
        // hash_table_value[value_index * embedding_vec_size] is the dense vector corresponding to the CSR user ID
        // hash_table_value[value_index * embedding_vec_size + tid] is the tith element of the dense vector corresponding to the CSR user IDsum += (value_index ! = std::numeric_limits<size_t> : :max())? hash_table_value[value_index * embedding_vec_size + tid] :0.0 f;
      }

      // store the embedding vector
      // there are 2,3 rows of slots for a sample. The two slots of a sample are placed together in order. The final output for each element of the dense vector is the sum of all the elements of the dense vector in the sample
      embedding_feature[feature_row_index * embedding_vec_size + tid] =
          TypeConvertFunc<TypeEmbeddingComp, float> : :convert(sum); }}}Copy the code
4.3.3.4 Parallel Operations

For parallel operations, note the following:

  • Bid is the number of samples.

  • Tid is the number of elements of the final embedded vector, and each thread processes one element of the embedded vector.

  • Hash_table_value [value_index * embedding_vec_size] is the dense vector corresponding to the CSR user ID.

  • Hash_table_value [value_index * embedding_vec_size + tid] is the tith element of the dense vector corresponding to the CSR user ID.

Said before, it should be a total of 40,50,10,20,30,50,10,30,20,10 10 element, two sample should correspond to the 10 dense vector. But in the GPU, tid threads are started in parallel, reduce is performed in a slot, and the result is stored in the embedding_feature. GPU parallelism is the simultaneous generation of all elements in a dense vector. That is, each sample generates slot_num dense vectors. Each element of a dense vector is computed from the elements in the sample.

For example, the first sample is:

40.50.10.20 # slot 1
30.50.10 # slot 2
Copy the code
  • Slot 1 should output a dense vector for 40 + a dense vector for 50 + a dense vector for 10 + a dense vector for 20.

  • Slot 2 should output a dense vector of 30 + a dense vector of 50 + a dense vector of 10.

However, after combiner, sample 1 outputs two dense vectors corresponding to two slots, assuming that the length of each dense vector is 8, and the calculation method is as follows:

  • So dense 1 is equal to 40 plus dense 50 plus dense 10 plus dense 20

  • So dense 2 is equal to 30 plus dense 50 plus dense 10

The 8 elements in dense vector 1 are respectively composed of the sum of 8 elements in the same position of the corresponding dense vector 40, 50, 10 and 20. That is, the [0] of the dense vector 1 = sum(the [0] of the dense vector 40, the [0] of the dense vector 50, the [0] of the dense vector 10, the [0] of the dense vector 20). It can be seen that it is indeed transformed into embedded vector, but instead of matrix multiplication, it uses its own mechanism, as shown in the following figure:

4.3.4 Embedded table size

We already know that you can reduce the size of an embedded table by using hash tables, and now we know that you can further simplify it by using Combine, so we need to ask a few questions before we already have hash tables.

  • How big is hash_table_value? How big is the weight matrix?
  • How big is the embedding_feature (the forward-propagated output of the embedding layer)? How big should the matrix be after the output specification?
  • How is each element of the embedding_feature calculated?
  • How big is the actual matrix?

Let’s solve it.

  • The first question is how big is hash_table_value?

The size of hash_table_value is: max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu;

In fact, you can roughly think of the size of hash_table_value as :(value number in CSR) * (embedding_vec_size).

The value of hash_table_value is initialized randomly. Each original CSR user ID corresponds to an embedding_VEC_size element. Together with hash_value_index and row_offset, you can find each original CSR user ID that corresponds to an embedding_VEC_size element.

  • Second question: how big is the embedding_feature? How big is a logically dense matrix? As you can see from the code,
  embedding_feature[feature_row_index * embedding_vec_size + tid] =
      TypeConvertFunc<TypeEmbeddingComp, float> : :convert(sum);
Copy the code

As can be seen, the size of the embedding_feature is :(row number in CSR) * (embedding_vec_size). So, for embedding_feature_tensors_, we abstract that the input is assumed to be four lines of CSR format, and the output is four lines of dense vector format.

  • Third question: How is each element of the embedding_feature calculated?

The slot and element are iterated.

sum += (value_index ! = std::numeric_limits<size_t> : :max())? hash_table_value[value_index * embedding_vec_size + tid] :0.0 f;
Copy the code
  • Fourth question: how big is the actual embedding matrix, or the dense matrix in engineering?

That’s slot_num * embedding_vec_size. Row number is slot_num. As you can see from the output below.

In the case of deep_data, slot num is 26, embedding_vec_size = 16, and the final output sample size is [26 x 16].

model.add(hugectr.Input(label_dim = 1, label_name = label,
                        dense_dim = 13, dense_name = dense,
                        data_reader_sparse_param_array = 
                        [hugectr.DataReaderSparseParam(wide_data, 30, True, 1),
                        hugectr.DataReaderSparseParam(deep_data, 2, False, 26)]))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 23,
                            embedding_vec_size = 1,
                            combiner = sum,
                            sparse_embedding_name = sparse_embedding2,
                            bottom_name = wide_data,
                            optimizer = optimizer))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 358,
                            embedding_vec_size = 16,
                            combiner = sum,
                            sparse_embedding_name = sparse_embedding1,
                            bottom_name = deep_data,
                            optimizer = optimizer))
Copy the code

Output:

"------------------------------------------------------------------------------------------------------------------\n"."Layer Type Input Name Output Name Output Shape \n"."------------------------------------------------------------------------------------------------------------------\n"."DistributedSlotSparseEmbeddingHash wide_data sparse_embedding2 (None, 1, 1) \n"."DistributedSlotSparseEmbeddingHash deep_data sparse_embedding1 (None, 26, 16)Copy the code

0x05 Reduce Scatter

Each GPU now has a dense vector corresponding to its own sample, recorded at embedding_feature_tensors_. The data of each GPU is batch size, and each GPU has a slot number of dense vectors. Let’s recall now:

DistributedSlotEmbeddingHash: all features are stored in different domain/slot, no matter how much trough index number is all of these characteristics according to the characteristics of the index number distribution to different gpus. This means that features in the same slot may be stored in different Gpus, which is why it is called a “distributed slot”. Since the global protocol is required, DistributedSlotEmbedding works well if the embedding is greater than the GPU memory size, and therefore more memory swapping between Gpus.

The slot in each sample data on the GPU is only a part of the slot data. We give an example. Assume that there are two Gpus in total, and the batch size is set to 2. There are three slots in total. Take the first sample as an example. Slot 1 has two keys, 1 on GPU 1 and 7 on GPU 2. So we need to merge these two keys. Details are as follows:

Each slot in each piece of data is only a part of a key. Features in the same slot may be stored on different Gpus. These features are distributed to different Gpus according to the feature index number. In this way, data on GPU 1 and GPU 2 need to be combined. After Reduce Scatter, the data should be complete and only a part of complete data should be allocated to each GPU.

5.1 Background

About the Reduce the principle of Scatter, see docs.nvidia.com/deeplearnin…

The Reduce operation reduces the values on all compute nodes and saves the reduced results only to the primary node. This is the same operation as AllReduce, except that the result is only placed on root.

The ReduceScatter operation performs the same operation as the Reduce operation, except that the results are scattered in equal blocks between ranks, and each rank obtains a piece of data according to its rank index, that is, each rank receives only a part of the data of the Reduce result.

In other words, ReduceScatter means to first Scatter, cut data into data blocks of the same size, and then Reduce the data obtained by each Rank according to Rank Index. This is similar to full aggregation, but instead of simply concatenating the data together, a specification operation (for example, a sum or a maximum operation) is performed.

Or see below, from NVIDIA document images. The NVIDIA. Cn/events/sc15…

5.2 code

The code below is reduce scatter for embedding_feature_tensors_, and the result is placed over embedding_data_. Get_output_tensors (IS_train).

    // do reduce scatter
    // Once done, the data is complete, and a portion of the complete data is assigned to each GPU
    size_t recv_count = embedding_data_.get_batch_size_per_gpu(is_train) *
                        embedding_data_.embedding_params_.slot_num *
                        embedding_data_.embedding_params_.embedding_vec_size;
    functors_.reduce_scatter(recv_count, embedding_feature_tensors_,
                             embedding_data_.get_output_tensors(is_train),
                             embedding_data_.get_resource_manager());
Copy the code

The code of the reduce_Scatter operator is: here is the sum operation:

template void SparseEmbeddingFunctors::reduce_scatter<float> (size_t recv_count, const Tensors2<float> &send_tensors, Tensors2<float> &recv_tensors,
    const ResourceManager &resource_manager);

template <typename TypeEmbeddingComp>
void SparseEmbeddingFunctors::reduce_scatter(size_t recv_count,
                                             const Tensors2<TypeEmbeddingComp> &send_tensors,
                                             Tensors2<TypeEmbeddingComp> &recv_tensors,
                                             const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);// need to know the type of TypeHashKey here
  ncclDataType_t type;
  switch (sizeof(TypeEmbeddingComp)) {
    case 2:
      type = ncclHalf;
      break;
    case 4:
      type = ncclFloat;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: TypeHashKey not support by now");
  }

  // for multi GPUs, use NCCL to do Reduce-Scatter(supporting multi-node GPU servers)
  if (total_gpu_count > 1) {
    CK_NCCL_THROW_(ncclGroupStart());
    for (size_t id = 0; id < local_gpu_count; id++) {
      const auto &local_gpu = resource_manager.get_local_gpu(id);
      CK_NCCL_THROW_(ncclReduceScatter(send_tensors[id].get_ptr(),  // send buf
                                       recv_tensors[id].get_ptr(),  // recv buff
                                       recv_count, type, ncclSum, local_gpu->get_nccl(),
                                       local_gpu->get_stream()));
    }
    CK_NCCL_THROW_(ncclGroupEnd());
  }
  // for single GPU, just do memcpyD2D
  else {  // total_gpu_count == 1
    const auto &local_gpu = resource_manager.get_local_gpu(0);
    CudaDeviceContext context(local_gpu->get_device_id());
    CK_CUDA_THROW_(cudaMemcpyAsync(recv_tensors[0].get_ptr(), send_tensors[0].get_ptr(),
                                   recv_count * sizeof(TypeEmbeddingComp), cudaMemcpyDeviceToDevice,
                                   local_gpu->get_stream()));
  }

  return;
}
Copy the code

We use legend to show the current process. To better understand the process, we can divide reduce-Scatter into sections.

  • Reduce is similar to AllReduce, after which all data is held on all gpus.

  • Sample is distributed according to rank, so Sample 1 is located above GPU 1 and Sample 2 is located above GPU 2.

We finally summarize the whole as follows:

0x06 Combiner

If mean pooling is required, two operations need to be performed.

 *   1) forward
 *        sum: calling forward_sum_kernel()
 *        mean: calling foward_sum_kernel() + forward_scale_kernel()
Copy the code

The first operation is to do an AllReduce on CSR row offset, which is equivalent to a global offset, and get the total number of keys in each sample and slot.

The second operation is Forward Scale, which divides the value of the embedding by the “number”, which is equal to the average.

    // scale for combiner=mean after reduction
    if (embedding_data_.embedding_params_.combiner == 1) {
      size_t send_count = embedding_data_.embedding_params_.get_batch_size(is_train) *
                              embedding_data_.embedding_params_.slot_num +
                          1;
      functors_.all_reduce(send_count, embedding_data_.get_row_offsets_tensors(is_train),
                           row_offset_allreduce_tensors_, embedding_data_.get_resource_manager());

      // do average
      functors_.forward_scale(
          embedding_data_.embedding_params_.get_batch_size(is_train),
          embedding_data_.embedding_params_.slot_num,
          embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
          embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
    }

Copy the code

6.1 AllReduce

The results of AllReduce are as follows:

Recall the CSR example.

* 40,50,10,20 * 30,50,10 * 30,20 * 10 * 10 Will be convert to the form of: * row offset: 0,4,7,9,10 * value: 40,50,10,20,30,50,10,30,20,10Copy the code

The number of row_offset is 0 in the first row, 4 in the second row, and 7….. in the third row Let’s assume this is on top of Node 1.

If row_offset of Node 2 is 0,5,7,10,11, the first row on the Node starts at 0, the second row starts at 5, and the third row starts at 7….. , corresponding to CSR is:

* 40,50,10,20,30 * 30,50 * 30,20,40 * 10 * 10 Will be convert to the form of: * row offset: 0,5,7,10,11 * value: 40,50,10,20,30,50,10,30,20,10Copy the code

After doing AllReduce, we get 0,9,14,19,21. So we know that the total number of rows in the first row is 9, and the total number of rows in the second row is 7 plus 7 minus 9 is 5.

The specific operator is as follows:

/** * collection communication: all_reduce. * @param send_count the count of elements will be sent. * @param send_tensors the send tensors of multi GPUs. * @param recv_tensors the recv tensors of multi GPUs. * @param device_resources all gpus device resources. * @param context gpu device context, for switching device. */
template <typename TypeHashKey>
void SparseEmbeddingFunctors::all_reduce(size_t send_count,
                                         const Tensors2<TypeHashKey> &send_tensors,
                                         Tensors2<TypeHashKey> &recv_tensors,
                                         const ResourceManager &resource_manager) {
  size_t local_gpu_count = resource_manager.get_local_gpu_count(a);size_t total_gpu_count = resource_manager.get_global_gpu_count(a);// need to know the type of Type here
  ncclDataType_t type;
  switch (sizeof(TypeHashKey)) {
    case 4:
      type = ncclUint32;
      break;
    case 8:
      type = ncclUint64;
      break;
    default:
      CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
  }

  // for multi GPUs, use NCCL to do all_reduce (supporting multi-node GPU servers)
  if (total_gpu_count > 1) {
    CK_NCCL_THROW_(ncclGroupStart());
    for (size_t id = 0; id < local_gpu_count; id++) {
      const auto &local_gpu = resource_manager.get_local_gpu(id);
      / / ALLReduce operations
      CK_NCCL_THROW_(ncclAllReduce(send_tensors[id].get_ptr(), recv_tensors[id].get_ptr(),
                                   send_count, type, ncclSum, local_gpu->get_nccl(),
                                   local_gpu->get_stream()));
    }
    CK_NCCL_THROW_(ncclGroupEnd());
  }
  // for single GPU, just do memcpyD2D
  else {  // total_gpu_count == 1
    const auto &local_gpu = resource_manager.get_local_gpu(0);
    CudaDeviceContext context(local_gpu->get_device_id());
    CK_CUDA_THROW_(cudaMemcpyAsync(recv_tensors[0].get_ptr(), send_tensors[0].get_ptr(),
                                   send_count * sizeof(TypeHashKey), cudaMemcpyDeviceToDevice,
                                   local_gpu->get_stream()));
  }

  return;
}
Copy the code

6.2 Forward Scale

The last step is to do a Forward Scale operation.

  // do average
  functors_.forward_scale(
      embedding_data_.embedding_params_.get_batch_size(is_train),
      embedding_data_.embedding_params_.slot_num,
      embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
      embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
Copy the code

After we did AllReduce earlier, we get row_offset_allreduce_tensors_ is 0,9,14,19,21. So we know that the total number of rows in the first row is 9, and the total number of rows in the second row is 7 plus 7 minus 9 is 5. To do mean, each element of the embedding_data_. Get_output_tensors (IS_train) is divided by the total number of elements in the slot.

The operator is as follows:

// forward kernel function: this is an additional function for combiner=mean (only for Distributed
// Embedding)
template <typename TypeKey, typename TypeEmbeddingComp>
__global__ void forward_scale_kernel(int batch_size, int slot_num, int embedding_vec_size,
                                     const TypeKey *row_offset,
                                     TypeEmbeddingComp *embedding_feature) {
  int bid = blockIdx.x;
  int tid = threadIdx.x;

  if (bid < batch_size && tid < embedding_vec_size) {
    for (int i = 0; i < slot_num; i++) {
      size_t feature_row_index = bid * slot_num + i;
      // Total number of slot elements
      int feature_num = row_offset[feature_row_index + 1] - row_offset[feature_row_index];
      // Output matrix row offset
      size_t feature_index = feature_row_index * embedding_vec_size + tid;
      float feature =
          TypeConvertFunc<float, TypeEmbeddingComp>::convert(embedding_feature[feature_index]);
      float scaler = 1.0 f;
      if (feature_num > 1) {
        scaler = 1.0 f / (float)feature_num; / / divisor
      }

      embedding_feature[feature_index] = / / set
          TypeConvertFunc<TypeEmbeddingComp, float> : :convert(feature * scaler); }}}template <typename TypeKey, typename TypeEmbeddingComp>
void do_forward_scale(size_t batchsize_per_gpu, size_t slot_num, size_t embedding_vec_size,
                      const TypeKey *row_offset, TypeEmbeddingComp *embedding_feature,
                      cudaStream_t stream) {
  const size_t grid_size = batchsize_per_gpu;
  const size_t block_size = embedding_vec_size;
  forward_scale_kernel<<<grid_size, block_size, 0, stream>>>(
      batchsize_per_gpu, slot_num, embedding_vec_size, row_offset, embedding_feature);
};
Copy the code

0 x07 summary

So the end result is this, and the graph has a couple of simplifications, like hash_table_value_tensors_ should be a vector of vectors, so this simplifies to vectors.

The embedding vector value is also virtual. The final output of the embedded layer is above the EmbeddingData member variable train_output_tensors_.

Or look at it from the bottom.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Faster than HugeCTR: Easy implementation of a large recommendation system engine with OneFlow

Nvlabs. Making. IO/cub/annotat…

Developer.nvidia.com/blog/introd…

Developer.nvidia.com/blog/announ…

Developer.nvidia.com/blog/accele…

Read HugeCTR source code

How does embedding propagate back

Web.eecs.umich.edu/~justincj/t…

Sparse matrix storage format summary + storage efficiency comparison :COO,CSR,DIA,ELL,HYB

Out of Thin Air: On the Embedding idea in recommendation algorithm

Principle of the tf.nm.embedding_lookup function

Tensorflow’s embedding_lookup interface is embedding_lookup.

How does embedding do in the recommended scene of big factory

Can CTR preestimation be applied to stitching of sequence feature embedding and input MLP? With pooling

Depth matching model in recommendation system

Local processing: How is Embedding realized?

Depth Matching Model in Search of Asymmetrical Two-bar Model (Part 2)

Depth feature fast ox strategy about high and low layer feature fusion

Deep learning introduction to DeepFM with Pytorch code explanation

deepFM in pytorch

Recommended algorithm 7 — DeepFM model

DeepFM Parameter Understanding (2)

Recommendation system meets deep learning (III)– Theory and Practice of DeepFM model

Deep learning introduction to DeepFM with Pytorch code explanation

Docs.nvidia.com/deeplearnin…

Introduce you to the key algorithm of large model training: distributed training Allreduce algorithm