Shanghai Feipeng Network Technology Co., LTD


abstract

There is A large file A(>20G), through some algorithm can use file A to generate another file B of the same size, and the generation of this target file B needs to use the generated data part at the same time, that is, assuming that both files have N nodes, Node I in the new file needs to use some nodes in the original file and some nodes in the new file [0,i-1].

Due to this limitation of data dependence, in the process of generating target file B, each newly generated node must wait for its previous node to be generated before it can be generated, which eliminates the feasibility of generating different nodes of file B at the same time. However, its dependency relationship is known before each node is generated. Therefore, the producer-consumer model is adopted to prefetch the dependent data. When the consumer generates node I, its dependent data has been prefetch into the queue, thus accelerating the generation process. The new file is generated through a lockless queue.


solution

The overall logical

The difficulty of algorithm implementation is that the generation of new file depends on two parts of data, one is the old file, the other is the data generated by the new file, so the program cannot be separated into two parts: reading and operation. When reading is ahead of calculation, there will be the situation that the required data has not been generated. You need the cell to reload the finished part separately from the finished part.

The algorithm is divided into two parts and implemented by three threads. The two parts are pure reading part and mixed reading operation part respectively. Because the rate of reading data is slower than the rate of computation, multiple threads are used to read and mark the missing parts of the layer. A thread is used to perform calculations and to read missing data from missing markers.

In order to improve the efficiency of the program, all the data to be calculated into the continuous address in memory, for complex operations, this will greatly improve the efficiency of the program.

Preliminary knowledge

– Linux threads

– Spin lock Spin lock definition: when a thread attempts to acquire a lock, if the lock has already been acquired (occupied) by someone else, the thread cannot acquire the lock, and will wait for a period of time before trying to acquire the lock again. This mechanism of cyclic locking -> waiting is called spinlock.

– CAS in the CAS algorithm requires us to provide an expectation, when expectations with the current thread variable values at the same time, also haven’t thread modifies the value, the current thread can modify, is also the CAS operation, but if the expectations in conformity with the current thread, then the value has been modified by another thread, while the update operation, You can choose to re-read the variable and try to modify it again, or you can abandon the operation.

Data sharing

The implementation of shared memory

The three threads share two pieces of memory, which are layer_label (layer_label) and ring_buf (read algorithm parameter). The main thread (consumer) has read and write rights, while the producer has only read rights. The algorithm parameter memory is first populated with the original file data by child threads, then the data in the new file that has been generated, and the missing data is marked.

The child threads share the memory corresponding to the original file, and they are only responsible for putting the corresponding data into the algorithm parameter memory shared with the main thread.

The UnsafeSlice and RingBuf structures are defined to share memory among three threads. The raw data stored in UnsafeCell can be reconstructed by PTR and length. The reconstruction of RingBuf is similar to that of UnsafeSlice, except that Pointers need to be obtained temporarily.

#[derive(Debug)]
pub struct UnsafeSlice<'a, T> {
	// holds the data to ensure lifetime correctness
	data: UnsafeCell<&'a mut [T]>,
	/// pointer to the data   
	ptr: *mut T,   
	/// Number of elements, not bytes.   
	len: usize,}#[derive(Debug)]
pub struct RingBuf {    
	data: UnsafeCell<Box"[u8]>>,   
	slot_size: usize,   
	num_slots: usize,}Copy the code

Refactoring method of UnsafeSlice

#[inline]
pub unsafe fn as_mut_slice(&self) - > &'a mut [T] {
    slice::from_raw_parts_mut(self.ptr, self.len)
}
Copy the code

RingBuf reconstruction method

#[allow(clippy::mut_from_ref)]
unsafe fn slice_mut(&self) - > &mut [u8] {
    slice::from_raw_parts_mut((*self.data.get()).as_mut_ptr(), self.len())
}
Copy the code

The memory (exp_label) corresponding to the original file, and the file (layer_label) type of the current layer is: UnsafeSlice.

The RingBuf parameter memory (ring_buf) is of the following type: RingBuf, due to its limited size, is a circular structure, that is, when the buF is filled with data, it is filled from the beginning, overwriting the previous data. Therefore, the length of consumer cannot exceed the lookahead specified by producer.

Implementation of shared parameters

The main thread and its child threads share the main thread’s progress parameter (consumer) and the child thread’s total progress (cur_producer). Reading and modifying these two variables should be atomic, meaning that while one thread is reading or modifying data, another thread cannot access the data. The progress of the child thread should not exceed the progress of the main thread too much, otherwise there will be insufficient for parameter sharing and the missing rate of data in this layer will increase. The main thread, on the other hand, has to wait for the child thread to exceed the main thread’s progress before it can continue (the main thread needs the child thread to read the data in the original file).

The child threads need to synchronize progress with each other using two atomic variables, the total progress that all the child threads have completed (cur_producer), and the variable used to tell the other child threads that they are processing this part (cur_await). In the entire program execution, placeholder variables should precede the total progress of child threads. The child thread first reads the placeholder variable and obtains its own position that can be executed. After successfully obtaining the variable, it will stand as the variable for self-increment operation. When another child thread accesses the variable again, it can know that the previous section has been acquired by another thread and only needs to continue to execute. After data filling is completed, the child thread cannot directly increase the total progress variable. In order to prevent the hollow segment of data shared memory, the thread waiting for the previous segment must complete execution (after cur_producer increases) before the total progress can be increased. Otherwise, it will wait forever. This ensures that each piece of data is retrieved and no empty threads will occur. The types of CUR_awaiting, CUR_Producer, and consumer are AtomicU64, which meet the read Modify Write (RMW) Lockfree program rules.

The code structure

The crossBeam is used to create three threads. The producer thread is started and the consumer thread is executed until all the nodes are created.

Crossbeam::thread::scope(|s| {
    let mut runners = Vec::with_capacity(num_producers);
     
     for i in 0..num_producers {
        
        // sub-thread consumer 
        runners.push(s.spawn(move| _ | {// read data}}))// main-thread producer 
     // calculate
     for runner in runners {
            runner.join().expect("join failed");
     }
     
}).expect("crossbeam scope failure");
Copy the code

– Share parameters

Layer_label: UnsafeSlice New file data

Exp_label: UnsafeSlice Raw file data

Base_parent_missing: data marked by the UnsafeSlice child thread is missing

Consumer: progress on the AtomicU64 main thread

Cur_producer: Total progress for AtomicU64 child threads

Cur_awaiting: Tasks that have been assigned by the AtomicU64 child thread

Ring_buf: RingBuf shared memory for data transmission

The child thread

The child thread realizes synchronization through parameters, completes data filling in ring_buf and missing node base_parent_missing by filling function fill_buffer().

The input parameters

Ring_buf (algorithm parameter memory) : Parameter nodes passed in by producer

Cur_producer: Indicates the progress of all producers

Cur_awaiting: Progress currently allocated by all producers

Layer_labels: Memory corresponding to the new file data

Exp_labels: memory corresponding to the source file

Base_parent_missing: Missing node marked by producer

Consumer: Progress of the current consumer

Stride: The maximum stride length for each child thread

Number_nodes: indicates the number of summary points

Lookahead: Specifies the size of Ring_buf and the number of nodes that can store information

Pseudo code

The child thread executes a loop structure and exits the loop when all the tasks have been allocated, cur_awaiting >= num_nodes. The function is roughly divided into two parts, the synchronization part and the filling data part, reading data by calling the read function to achieve.

Loop{

	If Cur_await >=  num_nodes then quit loopCur_await += stride For node between origin {Cur_await and added Cur_await If node -- consumer > lookahead then sleep Calling fill_buffer function } If origin cur_await > cur_producer then sleep }Copy the code

Thread synchronization

The child threads (producer) jointly maintain the overall progress variable (CUR_producer) of the child threads, and the “placeholder” variable (cur_awaiting) of the child threads realize synchronization. For each child thread, the assigned task node is first fetched from the “placeholder” variable, and a certain length is added to the placeholder variable to inform the other child threads that the task has been assigned. Lookahead before filling the algorithm parameter memory ring_buf, determine that the serial number of the data to be filled cannot exceed the reserved number of nodes ahead of the main program. Otherwise, unused data in the algorithm parameter memory will be overwritten. If the data to be populated is not a certain lookahead length ahead of the main thread, fill the shared memory with p-consumer <=lookahead, assuming the current progress of the child thread is P

After filling the data, the child thread needs to modify the cur_producer variable. The concept of CAS(Compare And Swap) is used here. The child thread cannot directly adjust the total progress variable cur_producer, And it needs to determine whether the parameters of the previous node are filled. If the data is filled, the cur_producer variable can be modified to tell all the threads that the producer has completed some data and stored it in ring_buf. In this way, some data in the ring_buf cannot be filled but the other threads’ variables have already been filled. Synchronization with the main thread.

Populate the function

The fill_buffer() function first processes the node serial number required by the current layer. As the data comes from this layer and needs to be processed and generated by the main program, some nodes with existence probability have not been generated yet. However, based on the design of lockless queue, and the program does not want to wait for these nodes to be generated here, so the nodes that have not been generated are marked to base_parent_MISSING, which will be obtained by the main program.

fn fill_buffer(
    cur_node: u64,
    parents_cache: &CacheReader<u32>,
    mut cur_parent: &[u32], 
    layer_labels: &UnsafeSlice<'_.u32>,
    exp_labels: Option<&UnsafeSlice<'_.u32>>, 
    buf: &mut [u8],
    base_parent_missing: &mut BitMask,
) 
Copy the code

After marking the data of this layer, take it out and put it into the cache or mark it as “missing”. The program takes out data from the upper node and writes buffer (the upper data has already been generated, so there is no missing situation).

The main thread

The main function reads the currently prepared data from ring_buf, reads the missing data from base_parent_missing and fills it in, then calculates the data for the current node in the new file.

The input parameters

Ring_buf: parameter node passed in by producer

Cur_producer: Indicates the progress of all producers

Layer_labels: Memory corresponding to the new file data

Base_parent_missing (missing flag of this layer) : Missing node marked by producer

Consumer: Progress of the current consumer

Number_nodes: indicates the number of summary points

Lookahead: Specifies the size of Ring_buf and the number of nodes that can store information

I: indicates the sequence number of the completed phase, which is incremented continuously until all nodes are calculated. I >=num_nodes.

Main thread logic

The main thread performs a finite loop structure until all nodes are generated. For nodes ready for each child thread, the corresponding node is filled according to the missing mark of this layer, and then the node is calculated by using all the data that has been prepared in the algorithm parameter memory.

While i < num_nodes
{

	If cur_producer < i then sleep
       	
	Get all the done node
	
	For known done nodes
	{
       		
		Use base_parent_missing to fill ring_buf
		Generate node[i] with ring_buf
		i +=1
		consumer +=1}}Copy the code

The main thread executes the process

First, extract cur_producer from the shared variable, and wait until the current serial number (I) in the main thread is smaller than the total progress of producer, cur_producer> I. Then, calculate according to all the existing completed nodes of producer.

For all nodes that are known to be completed by consumer, do the following. First, missing layer nodes are read from base_parent_missing and filled into ring_buf to prepare for the following operation. When the calculation is complete, the consumer variable is incremented (to tell all child threads that the calculation is complete), and any other variables necessary to prepare for the calculation of the next node. After all the nodes that have been prepared are calculated, you need to visit again to check whether there are any nodes that have been prepared. If there are any nodes, you need to take all nodes out and calculate as above.

The overall model

Producer and Consumer interact with each other through ring_buf to synchronize.

conclusion

Parallel access to the same block of memory is extremely insecure, and if you read and write to the same address at the same time, it is very likely to crash the program. Therefore, the timing of read and write needs to be controlled. Under the restriction of rust ownership mechanism, concurrent reading and writing of a variable requires Mutex or Rwlock to ensure security, but also introduces certain performance overhead. For example, the parallel read and write operations on different parts of a memory area do not affect each other in theory. However, if the memory area is locked, the access to the entire memory area will be strictly controlled and the operation efficiency will be reduced. This is not applicable to the problem scenario described in this article.

UnsafeCell encapsulates memory, and PTR retrievals mutable references over unsafe every time memory is accessed. Through different atomic variables, the progress of producers and consumers is controlled to ensure that only one thread can read or write the memory at the same address at the same time, thus ensuring the memory security.

Of course, it is not consistent with Rust’s design philosophy to leave the control of memory security to the developer, and the lock-free queue implemented here is just a practice for specific algorithm-specific scenarios. Attached source code address

The source code

fn create_layer_labels(
    parents_cache: &CacheReader<u32>,
    replica_id: &[u8],
    layer_labels: &mut MmapMut,
    exp_labels: OptionThe < &mut MmapMut>,
    num_nodes: u64,
    cur_layer: u32,
    core_group: Arc<Option<MutexGuard<'_.Vec<CoreIndex>>>>,
) 
Copy the code

code at fn create_layer_labels()