0 x00 the

In order to better introduce Paracel data loading, we temporarily inserted two PyTorch data loading articles (due to the length of words, we split them into two articles), mainly from the perspective of distribution. This article is just an appetizer, but there will be a whole series of PyTorch distributions to follow.

Other articles in the Parameter server series are as follows:

Machine learning parameters server PS-Lite (1) —– PostOffice

Machine learning parameters server PS-Lite (2) —– communication module Van

Machine learning parameters server PS-Lite (3) —– agent Customer

Machine learning parameters server PS-Lite (4) —– application node implementation

Machine learning parameter server Paracel (1)—–

Machine learning parameter server Paracel (2)—–SSP implementation

0x01 Data Load

1.1 Acceleration path

When distributed training, there are three levels of work that need to be addressed in order to speed up the training.

  • Data loading layer
  • Multi-machine communication layer
  • The code level

At the data level, multi-process parallel loading can be used to speed up the data preprocessing process, and GPU characteristics can also be used to speed up the data preprocessing process. For example, Nvidia DALI solves the CPU bottleneck by transferring data preprocessing to GPU processing.

At the level of multi-machine communication, there are various collective communication libraries available, such as NCCL, OpenMPI, Gloo, etc.

At the code level, you can use distributed apis provided by the framework, or use Horovod to adapt standalone code to support distributed tasks.

Now let’s look at how the data level can be accelerated.

1.2 Parallel Processing

The data processing of AI framework is mainly parallel processing as follows:

  • Data loading/processing uses CPU.
  • Training to use GPU.

Ideally, the CPU should be loaded before each iteration and the training data ready so that the training can continue to iterate seamlessly.

However, GPU computing power doubles every year, and CPU is far behind GPU, so CPU will be the laggard. This is not only a problem of insufficient CPU power, but also a problem of insufficient data reading speed in the village storage.

Therefore, machine learning has higher and higher requirements for data loading and pre-processing. Data preparation for the next iteration must be completed within the computing time of THE GPU, and the GPU cannot be idle waiting for training data.

1.3 line

For machine learning training, loading data can be divided into three steps:

  • Load data from disk or distributed storage to the host (CPU).
  • Transfer data from host paged memory to host fixed memory.
  • Data is transferred from the fixed memory of the host to the GPU of the host.

Therefore, popular deep learning frameworks will pipeline processing according to the characteristics of loading steps and heterogeneous hardware, so as to improve the throughput of data processing.

The pipeline generally consists of multiple operators, each of which consists of a buffer composed of data queues. After the upstream operator completes the processing, it will be transmitted to the downstream operator for processing. In this way, each operator task will be independent of each other, the operator can use fine-grained multi-thread/multi-process to accelerate in parallel, and each operator can independently control the processing speed and memory to meet the needs of different networks for processing speed.

If the data queue inside the operator is not empty, the model will get data continuously and there will be no bottleneck due to waiting for training data.

Here is the serial processing logic:

+------+            +-----------+           +---------------------------+
|      |            |           |           |                           |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
|      |            |           |           |                           |
+------+            +-----------+           +---------------------------+
Copy the code

Here is the parallel pipeline logic:

                    +------------+
+--------+          |            |
|        |          | Process 1  |
| Data 1 +--------> |            +------+
|        |          | Load Data  |      |
+--------+          |            |      |
                    +------------+      |
                                        |
                                        |
                                        |
                    +------------+      |        +-----------------------------------+
+--------+          |            |      |        |                                   |
|        |          | Process 2  |      +------> | Pin-memory process                |
| Data 2 +--------> |            |               |                                   |
|        |          | Load Data  +-------------> |                                   |
+--------+          |            |               |        Transfer to Pinned Memory  |
                    +------------+       +-----> |                                   |
                                         |       |                                   |
                                         |       +-----------------------------------+
                                         |
+--------+          +------------+       |
|        |          |            |       |
| Data 3 +--------> | Process 3  +-------+
|        |          |            |
+--------+          | Load Data  |
                    |            |
                    +------------+
​
Copy the code

1.4 the GPU

This article has so far addressed the issue of data transfer on the CPU side, that is, loading data from disk, from paged to fixed memory.

However, data transfer from fixed memory to GPU (tensor.cuda()) can also be pipelineed using CUDA streams.

In addition, deep learning applications require complex multi-stage data processing pipelines, including loading, decoding, cropping, resizing, and many other enhancements. These data processing pipelines currently performed on the CPU have become bottlenecks, limiting the performance and scalability of training and reasoning.

Nvidia DALI solves CPU bottlenecks by transferring data preprocessing to GPU processing. Users can build GPU-based pipelines or CPU-based pipelines based on the characteristics of their models.

Next we’ll look at PyTorch’s data loading, mostly from a distributed perspective.

0x02 PyTorch Distributed load

2.1 DDP

Pytorch offers a variety of options for data distributed training. As applications move from simple to complex, from prototype to production, a common development trajectory can be:

  • If the data and model can be put into a single GPU and trained on a single device, there is no need to worry about the training speed.
  • If you have multiple Gpus on a server and you want to speed up training with minimal code modification, use multiple Gpus DataParallel for a single machine.
  • If you want to speed up training even further and are willing to write a bit of code to boot up, use multiple gpus DistributedDataParallel on a single machine.
  • If the application scales across machine boundaries, use multi-machine distributed data aparallel and startup scripts.
  • Use Torchelastic to start distributed training if errors are expected (such as OOM) or resources can be dynamically connected and separated during training.

The most relevant part of this article is DDP. Distributed Data-Parallel Training (DDP) is a widely used single-program multi-data Training method. With DDP, the model is copied to each process, and each copy of the model is then entered into a different subset of the data sample. DDP is responsible for gradient communication to keep model copies in sync and overlaps them with gradient calculations to speed up training.

2.2 Distributed Loading

Let’s first look at the overall structure of distributed loading.

Given the sample code, you can see that the three main entities used are DataSet, DistributedSampler, and DataLoader.

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
        train(loader)
Copy the code

The logical relationship between these three concepts is as follows:

  • Dataset: indicates a Dataset. It is responsible for encapsulating the original training data into a data structure that can be recognized by Python. The derived class of Dataset must provide an interface side to obtain a single data.
  • Sampler: a Sampler that implements an extraction/sampling strategy to retrieve data indexes from the Dataset for use by the DataLoade. It can be argued that Sampler is the commander and decides where the battle will be fought.
  • DataLoader: Is responsible for loading data from a dataset based on indexes. Supports map-style and Iterable style datasets and supports single-process/multi-process loading. Loader is the combat warrior, responsible for following Sampler’s orders.

The details are as follows:

  1. The DataSet sends the number of datasets to the DistributedSampler.
  2. Sampler sends data indices to Loader according to certain rules.
  3. Loader loads data based on indices.
  4. Loader sends data to the model for training.
+------------------------+ +-----------+ |DistributedSampler | |DataLoader | | | 2 indices | | | Some strategy +-------------------> | | | | | | |-------------+----------| | | ^ | | 4 data +-------+ | | -------------->+ train | 1 |  length | | +-------+ | | | +-------------+----------+ | | |DataSet | | | | +---------+ | 3 Load | | | | Data +-------------------------> | | | +---------+ | | | | | | | +------------------------+ +-----------+Copy the code

Because data sets are not the focus of distributed training, the rest of this article focuses on Sampler.

The focus of Sampler is: how to make each worker load only its own part in the data set, and realize the orthogonal allocation of data sets among workers.

0x03 DistributedSampler

For data parallelism and distributed training, the DistributedSampler is responsible for its data sampling task.

DistributedSampler is a derivative of Sampler. When DistributedDataParallel is used, each parallel process gets a DistributedSampler instance, The DistributedSampler instance sends instructions to the DataLoader, which loads the specific data.

The DistributedSampler load policy is responsible for providing only a subset of the data set to be loaded, and there is no overlap or crossover between the subsets provided by the DistributedSampler.

3.1 the initialization

The __init__ initialization code mainly sets various information of the worker node, such as dataset dataset, rank (global GPU number), num_replicas number. And calculate the number of samples total_size.

Several parameters are as follows:

  • Dataset: refers to the dataset sampled.
  • Num_replicas: specifies the number of processes participating in the distributed training. If not set, world_size is obtained from the group.
  • Rank: indicates the sequence number of the current process. If not set, it is obtained from the group.
  • Shuffle: Indicates whether to shuffle the indices for sampling.
  • Seed: If you need to scramble, set a random seed.
  • Drop_last: If the data cannot be evenly divided, whether to discard the tail data that cannot be allocated.
  • Epoch: Shuffle the dataset every time the epoch is shuffled. How to keep the dataset consistent after shuffle? It’s done through epoch.

The code is as follows, omitting exception handling.

class DistributedSampler(Sampler[T_co]): def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None, rank: Optional[int] = None, shuffle: bool = True, seed: int = 0, drop_last: bool = False) -> None: self.dataset = dataset self.num_replicas = num_replicas self.rank = rank self.epoch = 0 self.drop_last = drop_last # If the dataset length is evenly divisible by # of replicas, then there # is no need to drop any data, since the dataset will be split equally. if self.drop_last and len(self.dataset) % self.num_replicas ! = 0: # type: ignore[arg-type] # Split to nearest available length that is evenly divisible. # This is to ensure each rank receives the same amount of data when # using this Sampler. self.num_samples = math.ceil( # `type:ignore` is required because Dataset cannot provide a default __len__ # see NOTE in pytorch/torch/utils/data/sampler.py (len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type] ) else: self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type] self.total_size = self.num_samples * self.num_replicas self.shuffle = shuffle self.seed = seedCopy the code

3.2 Iterative method

DistributedSampler is implemented as an iterator (similar to a loop), so the magic methods of Python abstract classes are used:

  • __len__(self): when thelen()The behavior of a function call, typically returning the number of elements in an iterator.
  • __iter__(self): The action of iterating over an element in a container. What is actually returned is an iterator (usually the iterator itself). The result of each iteration is used as the initial value for the next iteration.

A technical detail of the __iter__ code is:

indices = indices[self.rank:self.total_size:self.num_replicas]

When a list contains double quotation marks, such as list[start:end:step], it means:

  • Start: indicates the start position
  • End: indicates the end position
  • Step: step length

Let’s use an example like this:

A =,2,3,4,5,6,7,8,9,10,11,12,13,14,15 [1] print (a [0:15:3]) print (a [1:15:3]) print (a) [2:15:3]Copy the code

Get:

[1, 4, 7, 10, 13], [2, 5, 8, 11, 14] [3, 6, 9, 12, 15]Copy the code

In indices[self.rank:self.total_size:self.num_replicas], num_replicas is actually the total number of ranks, so each worker will strictly return the number of the data corresponding to his own rank.

To summarize the distribution method of a DistributedSampler: Each consecutive num_REPLICas data is divided into one by one and assigned to each num_REPLICas process. In addition, the data is obtained by the rank of each worker, so as to achieve the purpose of non-overlapping and non-overlapping. However, it should also be noted that the data obtained by each process is discontinuous.

The specific code is as follows:

class DistributedSampler(Sampler[T_co]): def __iter__(self) -> Iterator[T_co]: if self.shuffle: # Shuffle if needed, # deterministically shuffle based on epoch and seed G = torch.Generator() g.manual_seed(self.seed +) self.epoch) indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type] else: Indices = list(range(len(self.dataset))) # add extra samples to make it evenly divisible padding_size = self.total_size - len(indices) if padding_size <= len(indices): indices += indices[:padding_size] else: indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] else: # remove tail of data to make it evenly divisible. indices = indices[:self.total_size] assert len(indices) == Self. total_size # subsample # Indices = indices[self.rank:self.total_size:self.num_replicas] Assert Len (indices) == self.num_samples return iter(indices) def __len__(self) -> int: return self.num_samples def set_epoch(self, epoch: int) -> None: r""" Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering. Args: epoch (int): Epoch number. """ self.epoch = epochCopy the code

The logic between internal variables is as follows:

  1. Get length length from data set;
  2. Num_replicas = num_replicas = num_replicas = num_replicas = num_replicas
  3. Num_samples and total_size are obtained based on the data set length and num_replicas.
  4. Indices = indices[rank: total_size: num_replicas];
  5. Return indices to DataLoader
+-----------------------------------------------------------+ | DistributedSampler | | | | 2 2 | | rank +---+ num_replicas | | + | + | | | | | 3 | | | | | | | | | v | | | | num_samples = ceil(len(dataset)/ num_replicas) | | | | + | | | | | | | | | | 3 | | | | v | | | | total_size = num_samples * num_replicas | | | | + | | |4 |4 |4 | | | | | | | v v  v | | +-+----+------------+--------------------------------+ | +-------------+ | | | | indices | | | | indices = indices[rank: total_size: num_replicas] +------------->+ DataLoader | | | ^ | | 5 | | | | | | | +-------------+ | | | | | | +----------------------------------------------------+ | +-----------------------------------------------------------+ |  1 | length +------+--------+ | DataSet | +---------------+Copy the code

3.3 Shuffle Data Set

Each epoch will shuffle the dataset, but how do different processes keep the dataset consistent after shuffle?

The DistributedSampler uses the current epoch as the seed for a random number, which is configured before the index calculation to ensure that different processes use the same seed so that the shuffled data is consistent.

3.3.1 use

As you can see from the following code, if distributed training is required, set the epoch to the Sampler.

sampler = DistributedSampler(dataset) if is_distributed else None loader = DataLoader(dataset, shuffle=(sampler is None), ... , sampler=sampler) for epoch in range(start_epoch, n_epochs): if is_distributed: Sampler.set_epoch (epoch) # this set epoch train(loader)Copy the code

3.3.2 rainfall distribution on 10-12 python

Corresponds to the implementation of DistributedSampler.

Setting up the epoch is simple, just configure it.

    def set_epoch(self, epoch: int) -> None:
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.
​
        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch
Copy the code

The specific use of setting the random seed is in the iterating function:

def __iter__(self) -> Iterator[T_co]: if self.shuffle: # deterministically shuffle based on epoch and seed g = torch.Generator() g.manual_seed(self.seed + self.epoch) # Indices = torch. Randperm (Len (self.dataset), generator=g). Tolist () # type: ignore[arg-type] else: Indices = list(range(len(self.dataset)) # type: ignore[arg-typeCopy the code

3.3.3 c + +

We can also take a look ahead at DistributedRandomSampler in C++ code, which is a C++ API that also functions as python.

We can see the setup seed and shuffle as follows:

void DistributedRandomSampler::reset(optional<size_t> new_size) {
  size_ = new_size.value_or(size_);
  populate_indices();
​
  std::mt19937 rand(epoch_);
  std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
  sample_index_ = begin_index_;
}
Copy the code

3.3.4 summary

We extend the current logic as follows:

  1. Get length length from data set;
  2. From the configuration, num_replicas (with several ranks), itself rank, epoch;
  3. Epoch was used to set the random seed;
  4. Random SEED is used to scramble the data set indices, and the scrambled indices will be used all the time;
  5. Num_samples and total_size are obtained based on the data set length and num_replicas.
  6. Indices = indices[rank: total_size: num_replicas];
  7. Return indices to DataLoader
+-----------------------------------------------------------------+ | DistributedSampler | | | | | | 2 3 | | epoch +------> manual_seed(seed + epoch) +---------> indices | | + | | | | | | | | 2 2 | | | rank +---+ num_replicas 4 | | | +  | + | | | | | | 5 | | | | | | | | | | | v | | | | | num_samples = ceil(len(dataset)/ num_replicas) | | | | | + | | | | | | | | | | | | 5 | | | | | v | | | | | total_size = num_samples * num_replicas | | | | | + | | | |6 |6 |6 | | | | | | |  | | v v v | | | +-+----+------------+--------------------------------+ | | | | | | | | | indices = indices[rank: total_size: num_replicas] | <----+ | | | ^ + | | | | | | | | | | | | | | | +----------------------------------------------------+ | +-----------------------------------------------------------------+ | | 1 | length 7 v indices | +-------+--------+ +-------------+ | | | | | DataSet | | DataLoader | | | | | +----------------+ +-------------+Copy the code

3.4 Sampler in c + +

Since some companies develop in C++ and have a pressing need to use pytorch, pytorch also provides a C++ API, which we’ll see how to implement next.

3.4.1 track definition

Its class definition is: torch\ CSRC \ API \include\torch\data\samplers\distributed. H

As we can see, DistributedSampler is the base class, and the main member variables are: DistributedSampler

  • Size_t size_ File size
  • Size_t num_replicas_ Number of replicas
  • Size_t rank_ specifies the process or GPU that the sampler corresponds to
  • Size_t epoch Indicates the epoch of the training
  • Bool ALLOW_DUPLicATes_ Indicates whether backup is allowed

Followed by two derived classes: DistributedRandomSampler and DistributedSequentialSampler.

/// A `Sampler` that selects a subset of indices to sample from and defines a
/// sampling behavior. In a distributed setting, this selects a subset of the
/// indices depending on the provided num_replicas and rank parameters. The
/// `Sampler` performs a rounding operation based on the `allow_duplicates`
/// parameter to decide the local sample count.
template <typename BatchRequest = std::vector<size_t>>
class DistributedSampler : public Sampler<BatchRequest> {
 public:
  DistributedSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true)
      : size_(size),
        num_replicas_(num_replicas),
        rank_(rank),
        epoch_(0),
        allow_duplicates_(allow_duplicates) {}
​
  /// Set the epoch for the current enumeration. This can be used to alter the
  /// sample selection and shuffling behavior.
  void set_epoch(size_t epoch) {
    epoch_ = epoch;
  }
​
  size_t epoch() const {
    return epoch_;
  }
​
 protected:
  size_t local_sample_count() {
    if (allow_duplicates_) {
      return (size_ + num_replicas_ - 1) / num_replicas_;
    } else {
      return size_ / num_replicas_;
    }
  }
​
  size_t size_;
  size_t num_replicas_;
  size_t rank_;
  size_t epoch_;
  bool allow_duplicates_;
};
​
​
/// Select samples randomly. The sampling order is shuffled at each `reset()`
/// call.
class TORCH_API DistributedRandomSampler : public DistributedSampler<> {
 public:
  DistributedRandomSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true);
​
  /// Resets the `DistributedRandomSampler` to a new set of indices.
  void reset(optional<size_t> new_size = nullopt) override;
​
  /// Returns the next batch of indices.
  optional<std::vector<size_t>> next(size_t batch_size) override;
​
  /// Serializes the `DistributedRandomSampler` to the `archive`.
  void save(serialize::OutputArchive& archive) const override;
​
  /// Deserializes the `DistributedRandomSampler` from the `archive`.
  void load(serialize::InputArchive& archive) override;
​
  /// Returns the current index of the `DistributedRandomSampler`.
  size_t index() const noexcept;
​
 private:
  void populate_indices();
​
  size_t begin_index_;
  size_t end_index_;
  size_t sample_index_;
  std::vector<size_t> all_indices_;
};
​
/// Select samples sequentially.
class TORCH_API DistributedSequentialSampler : public DistributedSampler<> {
 public:
  DistributedSequentialSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true);
​
  /// Resets the `DistributedSequentialSampler` to a new set of indices.
  void reset(optional<size_t> new_size = nullopt) override;
​
  /// Returns the next batch of indices.
  optional<std::vector<size_t>> next(size_t batch_size) override;
​
  /// Serializes the `DistributedSequentialSampler` to the `archive`.
  void save(serialize::OutputArchive& archive) const override;
​
  /// Deserializes the `DistributedSequentialSampler` from the `archive`.
  void load(serialize::InputArchive& archive) override;
​
  /// Returns the current index of the `DistributedSequentialSampler`.
  size_t index() const noexcept;
​
 private:
  void populate_indices();
​
  size_t begin_index_;
  size_t end_index_;
  size_t sample_index_;
  std::vector<size_t> all_indices_;
};
Copy the code

3.4.2 implementation

The specific implementation of the class is located at: Torch \ CSRC \ API \ SRC \data\samplers\distributed. CPP

3.4.2.1 DistributedRandomSampler

Let’s first look at the distributeddrandomSampler.

The function is to get the scrambled index based on the worker’s rank. We go through the functions in logical order.

  • Reset (size_) is called shuffle during initialization.

  • The reset function sets sampler to a new set of indices:

    • Populate_indices () is used to set the start index of the rank and terminate the index.
    • In the populate_indices function, all_indices_ is configured, and the start index and end index of the rank are configured.
    • Then shuffle all_indices_.
  • The next function is relatively simple, because the main work is reset, so the data has been randomly scrambled, so find the starting position, return the number of pairs of rows in the data.

Because the following uses the IOTA function, which may be unfamiliar to some students, here is the function of IOTA:

std::vector<int> test; test.resize(10); std::iota(test.begin(), test.end(), 5); // Assign 10 increments from 5 to test :5 6 7 8 9 10 11 12 13 14Copy the code

The specific code is as follows:

DistributedRandomSampler::DistributedRandomSampler( size_t size, size_t num_replicas, size_t rank, bool allow_duplicates) : DistributedSampler(size, num_replicas, rank, allow_duplicates), begin_index_(0), end_index_(0), sample_index_(0) { // shuffle first time. reset(size_); } // Every time a new epoch is loaded, Is to call reset void DistributedRandomSampler: : reset (optional < size_t > new_size) {size_ = new_size. Value_or (size_); populate_indices(); std::mt19937 rand(epoch_); // Shuffle STD ::shuffle(all_indices_.begin(), all_indices_.end(), rand); sample_index_ = begin_index_; } void DistributedRandomSampler::populate_indices() { size_t num_local_samples = local_sample_count(); Size_t sample_count = num_replicas_ == 1? size_ : num_local_samples * num_replicas_; all_indices_.resize(sample_count); STD :: IOTA (STD ::begin(all_indices_), STD :: ioTA (STD ::begin(all_indices_)), std::end(all_indices_), 0); For (size_t I = size_; // If sample count is greater than size_, we need to assign additional values to the index. i < sample_count; ++i) { // we may have added duplicate samples to make all // replicas to have the same number of samples. all_indices_[i] = i - size_; } begin_index_ = rank_ * num_local_samples; // The initial index end_index_ = begin_index_ + num_local_samples of this rank; // index sample_index_ = begin_index_; } size_t DistributedRandomSampler::index() const noexcept { return sample_index_; } // Note that reset is called every time a new epoch is loaded, so for next, Work has been very small optional < STD: : vector < size_t > > DistributedRandomSampler: : next (size_t batch_size) {if (sample_index_ = = End_index_) {return nullopt; } size_t end = sample_index_ + batch_size; If (end > end_index_) {end = end_index_; } auto iter = all_indices_.begin(); STD ::vector<size_t> res(iter + sample_index_, iter + end); // Extract the first few rows of sample_index_ = end; return res; }Copy the code
3.4.2.2 DistributedSequentialSampler

Then see DistributedSequentialSampler.

Its function is to obtain the index order based on the worker’s rank_. We go through the functions in logical order.

  • The reset function is much simpler. Use populate_indices to set indexes in sequence.
  • The next function is more complex, returning index sequentially and setting the starting position for the next time.
DistributedSequentialSampler::DistributedSequentialSampler( size_t size, size_t num_replicas, size_t rank, bool allow_duplicates) : DistributedSampler(size, num_replicas, rank, allow_duplicates), begin_index_(0), end_index_(0), sample_index_(0) { populate_indices(); / / here will set the corresponding starting position rank} void DistributedSequentialSampler: : reset (optional < size_t > new_size) {size_t size = new_size.value_or(size_); if (size ! = size_) { size_ = size; populate_indices(); } else { sample_index_ = begin_index_; } } void DistributedSequentialSampler::populate_indices() { begin_index_ = rank_ * local_sample_count(); End_index_ = begin_index_ + local_sample_count(); sample_index_ = begin_index_; } size_t DistributedSequentialSampler::index() const noexcept { return sample_index_; } optional<std::vector<size_t>> DistributedSequentialSampler::next( size_t batch_size) { if (sample_index_ == End_index_) {return nullopt; } size_t end = sample_index_ + batch_size; If (end > end_index_) {end = end_index_; } std::vector<size_t> res(end - sample_index_); // Set the res values to increment from sample_index_ (end-sample_index_). This order returns index STD :: IOTA (STD :: BEGIN (res), STD ::end(res), sample_index_); If (end >= size_) {for (size_t& index: res) {// If (end >= size_) {for (size_t& index: res) { } } sample_index_ = end; // Set the next start line return res; }Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Convolutional Neural Networks: One Weird Trick for Parallelizing Convolutional Neural Networks

Challenges and solutions of data processing in AI framework

PyTorch source code interpretation torch. Utils. data: parsing data processing process

What is your understanding of the field of large-scale machine learning?

Nvidia-dali goes from giving up to getting started

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel