The information was wrong before and was divided into two articles. Now it is issued in full.

0 x00 the

Alink is a new generation of machine learning algorithm platform developed by Alibaba based on real-time computing engine Flink. It is the first machine learning platform in the industry that supports both batch algorithm and streaming algorithm. This paper will lead you to analyze the implementation of communication model AllReduce in Alink. AllReduce is widely used in Alink, such as KMeans, LDA, Word2Vec, GD, LBFGS, Newton Method, OWLQN, SGD, Gbdt and Random Forest.

Since Alink’s public information is too little, the following is my own speculation, and there will definitely be omissions. I hope you can point out that I will update at any time.

0x01 What is MPI

Message-passing Interface (MPI) is a cross-language communication protocol used to write parallel computing, supporting point-to-point and broadcast.

MPI aims for high performance, large scale, and portability. MPI is still the dominant model for high performance computing today.

Its characteristic is

  • A partitioned address space Each thread can only read non-local data by calling the API. All non-local Memory interactions need to be coordinated (handshake).

  • Supports only Explicit Parallelization. Users must specify how messages are delivered.

AllReduce is a basic primitive provided by MPI, and we need to understand Reduce to better understand AllReduce.

  • Specification function MPI_Reduce: Specification is a classic concept from functional programming. It involves the same variable of each process in the communication cell in the protocol calculation and outputs the calculation result to the specified process. For example, a function can be used to divide a batch of data into smaller batches. Or the element of an array can be reduced to a number by the addition function.
  • Protocol and broadcast function MPI_Allreduce: Based on the calculation protocol, the results of the calculation are distributed to each process. For example, after the function returns the reduced value, it distributes the value to each process so that all process values in parallel know the value.

One difference between MPI_Allreduce and MPI_Reduce is that the MPI_Reduce function sends the final result only to the specified dest_process number, while the MPI_Allreduce function can send the result to all processes, so that all processes can receive the result. The prototype of the MPI_Allreduce function therefore does not need to specify a target process number.

0x02 Alink implements the idea of MPI

AllReduce is widely used in Alink, such as KMeans, LDA, Word2Vec, GD, LBFGS, Newton Method, OWLQN, SGD, Gbdt and Random Forest.

AllReduce plays a key role in the implementation of the algorithm, that is, it forcibly interrupts the parallel tasks that originally run in serial, summarizes and distributes the calculation results, and makes the serial execution continue. A bit like a Barrier is familiar with concurrency.

Compared with Flink’s native KMeans algorithm, we can see that AllReduce corresponds to groupBy(0).reduce. The groupBy operation cannot be performed until all data has been generated.

	DataSet<Centroid> newCentroids = points
		// compute closest centroid for each point
		.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
		// count and sum point coordinates for each centroid
		.map(new CountAppender())
        // If it is Alink, it is AllReduce
		.groupBy(0).reduce(new CentroidAccumulator())
		// compute new centroids from point counts and coordinate sums
		.map(new CentroidAverager());
Copy the code

From AllReduce’s notes we can clearly see Alink’s idea of implementing MPI.

 * An implement of {@link CommunicateFunction} that do the AllReduce.
 *
 * AllReduce is a communication primitive widely used in MPI. In our implementation, all workers do reduce on a partition of the whole data and they all get the final reduce result.
 *
 * There're mainly three stages: * 1. All workers send the there partial data to other workers for reduce. * 2. All workers do reduce on all data it received and then send partial results to others. * 3. All workers merge partial results into final result and put it into session context with pre-defined object name. */Copy the code

Translation:

All workers perform reduce operations on partial data, and all workers can obtain the final reduce results in three stages:1.All workers send part of the data required by Reduce to other workers2.All workers do reduce on the data they receive, and then send the results of this partial reduce to other workers3.All workers combine partial Reduce results into final results and put them into predefined session context variablesCopy the code

“Paper come zhongjue shallow, must know this to practice.”

In order to realize AllReduce, Alink has done a lot of work behind the scenes, and we will analyze them one by one below.

0x03 How do I Implement Sharing

Sharing is the first priority in implementing AllReduce because metadata and input and output are required in the merge/broadcast process, which can be greatly simplified if you have shared variables. Let’s take a look at how Alink implements sharing through Task Manager.

1. Task related concepts

  • Task: A Task is a collection of multiple subtasks with the same function at one stage, similar to the TaskSet in Spark.
  • SubTask (subTask) : A subTask is the smallest execution unit of a task in Flink. It is an instance of a Java class that has properties and methods to perform specific calculation logic.
  • Chain optimization: Supposedly a parallelism instance of each operator is a subtask. This causes a lot of problems, because Flink’s TaskManager runs tasks in a separate thread for each task, which incurs a lot of thread switching overhead, which affects throughput. In order to alleviate this situation, Flink carries out optimization, that is, chain operation on subtask, and the task obtained after chain operation is put into a thread for execution as a scheduling execution unit.
  • Operator Chains: Flink combines multiple subtasks into a single Task, a process called Operator Chains. Each Task is executed by a single thread. You can concatenate multiple separate subtasks into a single task using Operator Chains. Similar to the Spark Pipeline.
  • Slot: A unit of Flink in which computing resources are isolated. Multiple subtasks can run in a Slot, but these subtasks must be from different phases of the same application. As a result, each slot can execute an entire pipeline for the job.

Programs in Flink are essentially parallel. During execution, each Operator Transformation has one or more Operator subtasks. Each Operator subTask is independent of each other and executes on a different thread, possibly on a different machine or container.

The same application, subtasks of multiple different tasks, can run in the same slot. Multiple subtasks in the same task cannot run in one slot resource slot, they can be scattered to other resource slots. Multiple instances of AllReduceSend parallelism cannot be run in the same slot.

2. TaskManager

Each TaskManager in Flink is a JVM process that may execute one or more subtasks on separate threads. TaskManager functions as a Slave node in the entire cluster and is responsible for executing specific tasks and applying for and managing resources on each node for corresponding tasks.

In order to isolate resources and increase the number of tasks allowed, TaskManager introduces the concept of slot, which isolates resources only from memory and is evenly divided. A TaskManager has at least one slot. If a TM has N Slots, the Memory allocated to each Slot is 1/N of the entire TM Memory. The Slots within a TM are only Memory isolated and shared by cpus.

The client compiles and packages the written Flink application and submits it to JobManager. Then JobManager assigns tasks to TaskManager nodes with resources according to the resources of TaskManager registered in JobManager. Then start and run the task.

The TaskManager receives the tasks to be deployed from JobManager, starts the Task using Slot resources, establishes a network connection for data access, receives the data, and begins data processing. The taskManagers interact with each other through data flows.

Flink’s tasks run in a multi-threaded manner, where a TaskManager(TM) executes multiple tasks concurrently in multiple threads. This is very different from the multi-JVM approach of MapReduce. Flink can greatly improve CPU efficiency by sharing system resources between multiple tasks and tasks using TaskSlot. Each TaskManager manages multiple TaskSlot resource pools to effectively manage resources.

This translates to: multiple parallel AllReduceSend instances running in the middle of a TaskManager will share all static variables in the TaskManager.

3. Status sharing

Alink implements variable sharing using static variables in Task Manager. Several of the main classes and concepts are more complex. As we go from top to bottom, we can see that as we go from top to bottom, the number of tokens and states required increases.

3.1 Conceptual Analysis

From the top down the levels are as follows:

Algorithm Angle: ComContext

User code calls: context.getobj (bufferName); This is ideal for the user, who knows the variable name and can access it in context.

ComContext needs to know more, such as its own sessioin and taskID, as explained below.

ComContext calls down like this: sessionShareDobjs. put(objName, sessionId, taskId, obj);

Framework Angle: IterativeComQueue

IterativeComQueue is a framework concept. Take Kmeans for example, that is, the Kmeans algorithm corresponds to several Iterativecomqueues.

IterativeComQueue has multiple compute/communicate functions. Each function should know which IterativeComQueue it belongs to and how to communicate with other functions on the Queue. I can’t confuse it with any other Queue. So you need to have a concept that represents this Queue. Hence the following concept of Session.

Session Angle: SessionSharedObjs

In order to distinguish each IterativeComQueue, the concept of session was created. Then all compute/ Communicate functions on the IterativeComQueue bind to the same session ID, and all functions on the same IterativeComQueue can communicate with each other.

An IterativeComQueue corresponds to a session, so <” variable name “+ sessionId> corresponds to a variable that the session can access.

SessionSharedObjs contains static member variables:

  • int sessionId = 0; An incremented flag is used to distinguish sessions.
  • HashMap

    , Long> key2Handle. Mapping: indicates that a variable name in a session corresponds to a variable handle.

Normally, “a variable of some name” corresponds to “a variable handle”. That is, a variable name in a session corresponds to a variable handle. However, in Flink, there will be multiple subtasks operating in parallel, so a new concept is needed to identify the variable corresponding to the subtask, which should be associated with taskId. So we have the following concept of state.

SessionSharedObjs calls down: iterTaskObjkeeper. put(handle, taskId, obj);

Subtask: IterTaskObjKeeper

This is using static variables to implement sharing. Is a shared variable instance accessible to all Tasks (Threads) in Task Manager.

IterTaskObjKeeper contains static member variables:

  • long handle = 0L; An incremental identifier is used to distinguish state.
  • Map

    states; It’s a mapping. That is, handle represents which variable state.

    represents the state instance corresponding to “which task” in this variable, which is a subdivision for subTask.
    ,>
    (handle,>

In Flink, an algorithm is operated on in parallel by multiple subtasks. If there is only one Handle, then multiple subtasks can access it, leading to the familiar problems of multithreading. So Alink here splits Handle into multiple states. From the perspective of subtask, each state is uniquely identified with

.
,>

To sum up, for the same variable name, the shared state corresponding to each subtask is actually independent and does not interfere with each other. Sharing is essentially sharing between operators running on the subtask.

3.2 Analysis of variable instances

We can get a clearer idea from the variables actually executed.

// In session 0, centroidAllReduce corresponds to handle 7
SessionSharedObjs.key2Handle = {HashMap@10480}  size = 9
 {Tuple2@10492} "(initCentroid,0)" -> {Long@10493} 1
 {Tuple2@10494} "(statistics,0)" -> {Long@10495} 2
 {Tuple2@10496} "(362158 a2-588 - b - 429 - f - b848 - c901a1e15e17, 0)" -> {Long@10497} 8
 {Tuple2@10498} "(k,0)" -> {Long@10499} 6
 {Tuple2@10500} "(centroidAllReduce,0)" -> {Long@10501} 7 // This is what it says
 {Tuple2@10502} "(trainData,0)" -> {Long@10503} 0
 {Tuple2@10504} "(vectorSize,0)" -> {Long@10505} 3
 {Tuple2@10506} "(centroid2, 0)" -> {Long@10507} 5
 {Tuple2@10508} "(centroid1, 0)" -> {Long@10509} 4

// As can be seen below, the variable handle 7 has 4 subtasks, so it is subdivided into 4 states.
 com.alibaba.alink.common.comqueue.IterTaskObjKeeper.states = {HashMap@10520}  size = 36
 {Tuple2@10571} "(7, 0)"- > {double[15] @10572} 
 {Tuple2@10573} "(7, 1)"- > {double[15] @10574} 
 {Tuple2@10577} "(7, 2)"- > {double[15] @10578} 
 {Tuple2@10581} "(7, 3)"- > {double[15] @10582} 

 {Tuple2@10575} "(5, 0)" -> {Tuple2@10576} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@29a72fbb)"
 {Tuple2@10579} "(5, 1)" -> {Tuple2@10580} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@26c52354)"
 {Tuple2@10585} "(5, 2)" -> {Tuple2@10586} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@7c6ed779)"
 {Tuple2@10588} "(5, 3)" -> {Tuple2@10589} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@154b8a4d)"
Copy the code

Let’s go through the code and parse each of the classes.

3.3 ComContext

ComContext is the topmost class that gets runtime information and shared variables. All compute/ Communicate functions on IterativeComQueue (BaseComQueue) access shared variables through ComContext. Such as:

public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {

    // Each BaseComQueue gets a unique sessionId.
    private final int sessionId = SessionSharedObjs.getNewSessionId();

    int taskId = getRuntimeContext().getIndexOfThisSubtask();

    public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
        // 获取到了一个ComContext
        ComContext context = new ComContext(sessionId, getIterationRuntimeContext());
        if (getIterationRuntimeContext().getSuperstepNumber() == maxIter || criterion) {
            // Continue to access shared variables using ComContextList<Row> model = completeResult.calc(context); }}}// The user calls something like this

double[] sendBuf = context.getObj(bufferName);
Copy the code

As you can see, ComContext is the top-level context concept that the user should see. TaskId, sessionId is the key to use.

  • SessionId is a static class member variable defined in SessionSharedObjs that is automatically incremented. Each BaseComQueue gets a unique sessionId, that is, the Queue maintains a unique session. So all ComContext generated in BaseComQueue will have the same sessionId.
  • TaskId is obtained from runtime.
/** * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. */
@Internal
public class TaskInfo {
	/** * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as  returned by {@link #getNumberOfParallelSubtasks()}).
	 *
	 * @return The index of the parallel subtask.
	 */
	public int getIndexOfThisSubtask(a) {
		return this.indexOfSubtask; // Get taskId here}}Copy the code

The ComContext concrete class is defined as follows

/** * Context used in BaseComQueue to access basic runtime information and shared objects. */
public class ComContext {
	private final int taskId;
	private final int numTask;
	private final int stepNo;
	private final int sessionId;

	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber();
	}
    
	/**
	 * Put an object into shared objects for access of other QueueItem of the same taskId.
	 *
	 * @param objName object name
	 * @param obj     object itself.
	 */
	public void putObj(String objName, Object obj) { SessionSharedObjs.put(objName, sessionId, taskId, obj); }}// For example, here are some examples
this = {ComContext@10578} 
 taskId = 4
 numTask = 8
 stepNo = 1
 sessionId = 0
Copy the code

3.4 SessionSharedObjs

SessionSharedObjs is a next-level class that maintains shared session objects, which are shared using session ids.

SessionSharedObjs maintains a static class variable sessionId to differentiate sessions.

The SessionSharedObjs core is HashMap

, Long> key2Handle. A mapping of <” variable name “+ sessionId> –> < real variable handle>.

An IterativeComQueue corresponds to a session, so <” variable name “+ sessionId> corresponds to a variable that the IterativeComQueue can access. Normally, one variable, handle, is enough.

However, since an IterativeComQueue will be executed by several subtasks in parallel, each Handle is subdivided into several states for mutual exclusion and differentiation. Each state is uniquely identified by

. It will be mentioned below.
,>

/**
 * An static class that manage shared objects for {@link BaseComQueue}s.
 */
class SessionSharedObjs implements Serializable {
	private static HashMap<Tuple2<String, Integer>, Long> key2Handle = new HashMap<>();
	private static int sessionId = 0;
	private static ReadWriteLock rwlock = new ReentrantReadWriteLock();
    
	/**
	 * Get a new session id.
	 * All access operation should bind with a session id. This id is usually shared among compute/communicate function of an {@link IterativeComQueue}.
	 *
	 * @return new session id.
	 */
	synchronized static int getNewSessionId(a) {
		return sessionId++;
	}    
    
	static void put(String objName, int session, int taskId, Object obj) {
		rwlock.writeLock().lock();
		try {
			Long handle = key2Handle.get(Tuple2.of(objName, session));
			if (handle == null) {
				handle = IterTaskObjKeeper.getNewHandle();
				key2Handle.put(Tuple2.of(objName, session), handle);
			}
      // call here. TaskId is also key to identification.
			IterTaskObjKeeper.put(handle, taskId, obj);
		} finally{ rwlock.writeLock().unlock(); }}}Copy the code

3.5 IterTaskObjKeeper

This is the lowest level shared class and is a static instance on the heap memory of the Task Manager process. All tasks (Threads) of task Manager can be shared.

IterTaskObjKeeper is shared across the JVM using a static variable called States. The contents are determined by ‘handle’ and ‘taskId’.

IterTaskObjKeeper maintains the handle increment as the unique type identifier for the “variable state”.

Use <handle, taskId> as the unique identifier for “variable state”. This is the variable shared in the Task Manager Process heap memory.

State, <handle, taskId> indicates the variable corresponding to the task in this variable. This is a subdivision for Task.

/** * A 'state' is an object in the heap memory of task manager process, * shared across all tasks (threads) in the task manager. * Note that the 'state' is shared by all tasks on the same task  manager, * users should guarantee that no two tasks modify a 'state' at the same time. * A 'state' is identified by 'handle' and 'taskId'. */
public class IterTaskObjKeeper implements Serializable {
	private static Map <Tuple2 <Long, Integer>, Object> states;

	/** * A 'handle' is a unique identifier of a state. */
	private static long handle = 0L;

	private static ReadWriteLock rwlock = new ReentrantReadWriteLock();

	static {
		states = new HashMap <>();
	}

	/ * * *@note Should get a new handle on the client side and pass it to transformers.
	 */
	synchronized public static long getNewHandle(a) {
		return handle++;
	}

	public static void put(long handle, int taskId, Object state) {
		rwlock.writeLock().lock();
		try {
			states.put(Tuple2.of(handle, taskId), state); 
		} finally{ rwlock.writeLock().unlock(); }}}Copy the code

Sample code 0x04

Our sample code remains as follows.

KMeansTrainBatchOp call

	staticDataSet <Row> iterateICQ(... Omit...). {return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}
Copy the code

AllReduce implementation

The main code excerpt of Alink’s AllReduce is as follows:

public static <T> DataSet <T> allReduce(
    return input
		.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
		.withBroadcastSet(input, "barrier")
		.returns(
			new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
		.name("AllReduceSend")
		.partitionCustom(new Partitioner <Integer>() {
			@Override
			public int partition(Integer key, int numPartitions) {
				returnkey; }},0)
		.name("AllReduceBroadcastRaw")
		.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
		.returns(
			new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
		.name("AllReduceSum")
		.partitionCustom(new Partitioner <Integer>() {
			@Override
			public int partition(Integer key, int numPartitions) {
				returnkey; }},0)
		.name("AllReduceBroadcastSum")
		.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
		.returns(input.getType())
		.name("AllReduceRecv");
}
Copy the code

0 x05 AllReduce implementation

Based on the above specific codes, we first summarize the use process of AllReduce as follows

  • KMeansAssignCluster: Find the Knight Cluster for every point and calculate the sums of the points belonging to the same cluster. Then write your calculated cluster to CENTROID_ALL_REDUCE in your Task Manager.

  • Each AllReduceSend takes the previously stored cluster from CENTROID_ALL_REDUCE of its task Manager (each AllReduceSend takes the cluster that only it can see) and sends it to the downstream task. The system determines which tasks to send based on the downstream Task Index and data volume. Note that the specific part of the variable to be sent to a particular task is calculated based on the task index and data volume of that task. This calculation mechanism (how it is calculated in the code, but also part of it is sent along with the data as meta information) is reused later in AllReduceRecv.

  • Each AllReduceSum receives the cluster sent by AllReduceSend, calculates the sum, and then sends out the calculation results. Each AllReduceSum uniformly sends the data summed up by its own calculation to each downstream task.

  • Each AllReduceRecv receives the cluster (after sum) sent by all AllReduceSum. Store to the shared variable CENTROID_ALL_REDUCE. Specific how to store reuse AllReduceSend calculation mechanism, so that stored in shared variables where will not conflict with each other. Merge operation: For example, there are five AllReduce, and the data of each AllReduce is sent to five AllReduceRecv. After receiving the five data, each AllReduceRecv will write it into its corresponding state according to its own subtask index. However, the location of the 5 pieces of data to be written in the state is specified in the data metadata information, and there will be no writing conflict between each other, so that each AllReduceRecv has all 5 pieces of data.

  • KMeansUpdateCentroids: Take out the CENTROID_ALL_REDUCE variable and then Update the centroids based on the sum of points and point number belonging to the same cluster

1. KMeansAssignCluster

The purpose of this class is to calculate the nearest cluster center for each point and count and sum the coordinates of each cluster center.

We can see that KMeansAssignCluster stores CENTROID_ALL_REDUCE through ComContext for subsequent AllReduce use. If there are five Kmeans assignclusters, their results are generally different. Although the same variable name CENTROID_ALL_REDUCE is stored, its state varies.

Since the five KmeansAssignClusters must correspond to five subtasks, their <handle, taskId> in the shared variables must be different, and thus correspond to different states, so they are stored separately.

// Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
public class KMeansAssignCluster extends ComputeFunction {
        // Access shared variables
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }  
    
        for (FastDistanceVectorData sample : trainData) {
            // Find the closest centroid from centroids for sample, and add the sample to sumMatrix.
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix); }}// The variables in the program are as follows

sample = {FastDistanceVectorData@13274} 
 vector = {DenseVector@13281} "6.3 2.5 4.9 1.5"
 label = {DenseVector@13282} "72.2"
 rows = {Row[1] @13283} 

// This is the shared variable. The 4-dimensional vector + 1 weight --> are both "sample and ".
sumMatrixData = {double[15] @10574} 
 0 = 23.6
 1 = 14.9
 2 = 8.7
 3 = 1.7000000000000002
 4 = 5.0
 5 = 52.400000000000006
 6 = 25.1
 7 = 39.699999999999996
 8 = 13.299999999999999
 9 = 9.0
 10 = 33.0
 11 = 16.9
 12 = 28.900000000000002
 13 = 11.4
 14 = 5.0
     
trainData = {ArrayList@10580}  size = 19
 0 = {FastDistanceVectorData@10590} 
  vector = {DenseVector@10595} "7.7 3.8 6.7 2.2"
   data = {double[4] @10601} 
    0 = 7.7
    1 = 3.8
    2 = 6.7
    3 = 2.2
  label = {DenseVector@10596} "123.46000000000001"
  rows = {Row[1] @10597} 
 1 = {FastDistanceVectorData@10603} 
  vector = {DenseVector@10623} "5.7 2.8 4.1 1.3"
  label = {DenseVector@10624} "58.83"
  rows = {Row[1] @10625} 
 2 = {FastDistanceVectorData@10604} 
 3 = {FastDistanceVectorData@10605}...17 = {FastDistanceVectorData@10619} 
 18 = {FastDistanceVectorData@10620} 
  vector = {DenseVector@10654} "6.5 3.0 5.2 2.0"
  label = {DenseVector@10655} "82.29"
  rows = {Row[1] @10656}      
Copy the code

2. AllReduceSend

I’m going to have to extract this code again, mainly because of withBroadcastSet. Its role is:

  • It can be understood as a common shared variable. We can broadcast a dataset dataset, and then different tasks can obtain it on the node. There is only one copy of this data on each node.
  • If broadcast is not used, each task in each node needs to copy a dataset, which wastes memory (that is, there may be multiple dataset copies in one node).
		return input
			.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
			.withBroadcastSet(input, "barrier")
Copy the code

KMeansAssignCluster adds data to the context variable centroidAllReduce. So AllReduce is essentially waiting for this variable.

The first step in AllReduce is to take the shared variable from the context and send it. This part of the code is done by AllReduceSend.

For each task in AllReduceSend, bufferName is centroidAllReduce.

Because each AllReduceSend also corresponds to different tasks, the centroidAllReduce read by each AllReduceSend must be different, so the sendBuf obtained by each task is different. They take out the “centroidAllReduce” state corresponding to their <handle, taskId> and send it downstream.

When AllReduceSend is sent to its downstream, AllReduceSend is sent to each task based on the number of subtask, that is, the shared variable obtained in this task will be sent to each task, but the specific part of the variable is sent to which task. Is calculated based on the task index and data volume of that task. If the amount of data is small, it may only be sent to one or several tasks.

The subsequent taskId is a subtask ID.

How to calculate how much to send to which task is done in DefaultDistributedInfo. The analysis needs to be done in conjunction with the Pieces function. It should be noted that AllReduceSend is sent in this way, and AllReduceRecv is accepted in accordance with this routine. Thus AllReduceRecv can be merged.

AllReduceSend is sent in this way, and AllReduceRecv is followed by this routineint pieces = pieces(sendLen);// Indicates that the send data is divided into several pieces, for example, 50 pieces. Each slice size is TRANSFER_BUFFER_SIZE

// 8 subtasks will be sent
for (int i = 0; i < numOfSubTasks; ++i) {
      // If the fifth subtask is sent, the starting position is 50/8 * 4
      int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces);
      // How many slices to send to the 5th subtask
      int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces);
Copy the code

The specific code is as follows:

	private static int pieces(int len) {
		int div = len / TRANSFER_BUFFER_SIZE; // The data sent by me is divided into several pieces. The size of each piece is TRANSFER_BUFFER_SIZE
		int mod = len % TRANSFER_BUFFER_SIZE;

		return mod == 0 ? div : div + 1;
	}

public class DefaultDistributedInfo implements DistributedInfo {

	public long startPos(long taskId, long parallelism, long globalRowCnt) {
		long div = globalRowCnt / parallelism;
		long mod = globalRowCnt % parallelism;

		if (mod == 0) {
			return div * taskId;
		} else if (taskId >= mod) {
			return div * taskId + mod;
		} else {
			returndiv * taskId + taskId; }}public long localRowCnt(long taskId, long parallelism, long globalRowCnt) {
		long div = globalRowCnt / parallelism;
		long mod = globalRowCnt % parallelism;

		if (mod == 0) {
			return div;
		} else if (taskId >= mod) {
			return div;
		} else {
			return div + 1; }}}Copy the code

The specific AllReduceSend code is as follows, detailed in the annotations.

// Here is the variable name definition.
public static final String CENTROID_ALL_REDUCE = "centroidAllReduce";

private static class AllReduceSend<T> extends RichMapPartitionFunction <T.Tuple3 <Integer.Integer.double[] > >{
        
    	int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
		// Related to parallelism, each task performs the same operation
		// bufferName is centroidAllReduce, and sendBuf obtained by each task is different
    
        // Calculate how to send the required data structure
    	int pieces = pieces(sendLen);
    	DistributedInfo distributedInfo = new DefaultDistributedInfo();

        // Get the data to be passed from the context
		double[] sendBuf = context.getObj(bufferName);
        
			int agg = 0;
    		// Send the required data to each task. Of course, this is determined by the size of the data to be sent. If the data volume is small, it may only be sent to one or several tasks.
			for (int i = 0; i < numOfSubTasks; ++i) {
                // startPos: The part of the variable to send is determined according to task index.
                // CNT: How much data is sent to the downstream task I. If 0, no data is sent to task I.
				int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces);
				int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces);

				for (int j = 0; j < cnt; ++j) {
                    // Which part to send
					int bufStart = (startPos + j) * TRANSFER_BUFFER_SIZE;
					// the last
					if (startPos + j == pieces - 1) {
						System.arraycopy(sendBuf, bufStart, transBuf, 0, lastLen(sendLen));
					} else {
						System.arraycopy(sendBuf, bufStart, transBuf, 0, TRANSFER_BUFFER_SIZE);
					}
					agg++;
                    
          // I is the index of the subTasks, startPos + j is the position in the buffer, and the subsequent partitions are actually partitioned according to this I. This AllReduceSend is sent to the numOfSubTasks tasks.out.collect(Tuple3.of(i, startPos + j, transBuf)); }}}private static int pieces(int len) {
		int div = len / TRANSFER_BUFFER_SIZE; / / 4096
		int mod = len % TRANSFER_BUFFER_SIZE;
		return mod == 0 ? div : div + 1;
	}

sendBuf = {double[15] @10602} 
 0 = 40.3
 1 = 18.200000000000003
 2 = 33.6
 3 = 12.5
 4 = 6.0
 5 = 45.3
 6 = 30.599999999999998
 7 = 12.4
 8 = 2.0
 9 = 9.0
 10 = 24.0
 11 = 10.4
 12 = 17.1
 13 = 5.199999999999999
 14 = 4.0

this = {AllReduce$AllReduceSend@10598} 
 bufferName = "centroidAllReduce"
 lengthName = null
 transferBufferName = "3dfb2aae-683d-4497-91fc-30b8d6853bce"
 sessionId = 0
 runtimeContext = {AbstractIterativeTask$IterativeRuntimeUdfContext@10606}       
Copy the code

3. AllReduceBroadcastRaw

AllReduceSend uses a custom partition (partitionCustom) when sending variables downstream. Index of subtask is used as the key partition. This corresponds to AllReduceSend out.collect.

			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					returnkey; }},0)
			.name("AllReduceBroadcastRaw")
               
// Call stack to the partition function
                
partition:102, AllReduce$2 (com.alibaba.alink.common.comqueue.communication)
partition:99, AllReduce$2 (com.alibaba.alink.common.comqueue.communication)
customPartition:235, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:149, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:120, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
mapPartition:257, AllReduce$AllReduceSend (com.alibaba.alink.common.comqueue.communication)
run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:705, Task (org.apache.flink.runtime.taskmanager)
run:530, Task (org.apache.flink.runtime.taskmanager)
run:745, Thread (java.lang)                  
                
                 
 . / / @ AllReduceSend mapPartition began to call here
 for (int i = 0; i < numOfSubTasks; ++i) {   
     // I is the index of subTasks, and subsequent partitions are actually partitioned according to this index. This AllReduceSend is sent to the numOfSubTasks tasks.
	 out.collect(Tuple3.of(i, startPos + j, transBuf));     
 }
                
 // As can be seen from the sequence of subsequent calls, the index of subtask is used as the key partition.

// Send record here

 public class CountingCollector<OUT> implements Collector<OUT> {
	public void collect(OUT record) {
		this.numRecordsOut.inc();
		this.collector.collect(record);
	}     
 }
             
 record = {Tuple3@10586} "(0,0,[40.50000000000001, 18.7, 33.300000000000004, 12.8, 6.0, 29.7, 21.0, 8.4, 1.7, 6.0, 48.1, 22.1999999999999999996, 36.0, 12.200000000000001, 8.0, 0.0,"
 f0 = {Integer@10583} 0
 f1 = {Integer@10583} 0
 f2 = {double[4096] @10598}                
       
// Start partitioning here

public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
	private int customPartition(T record, int numberOfChannels) {
		if (extractedKeys == null) {
			extractedKeys = new Object[1];
		}

		if (comparator.extractKeys(record, extractedKeys, 0) = =1) {
            // So key is 0
			final Object key = extractedKeys[0];
			returnpartitioner.partition(key, numberOfChannels); }}}public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
	public int extractKeys(Object record, Object[] target, int index) {
		int localIndex = index;
		for(int i = 0; i < comparators.length; i++) {
			localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex);
		}
		returnlocalIndex - index; }}// Retrieve the first field value

key = {Integer@10583} 0
 value = 0
    
extractedKeys = {Object[1] @10587} 
 0 = {Integer@10583} 0
  value = 0
Copy the code

4. AllReduceSum

All workers do partial Reduce on the data they receive, and then send the partial reduce results to other workers.

Partial results indicate that each task receives different data. The upstream calculates the location based on the Task index and sends the data.

However, the calculation results of AllReduceSum will be sent to each downstream Task index.

private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer.Integer.double[] >,Tuple3 <Integer.Integer.double[] > >{
    
    	public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,Collector <Tuple3 <Integer, Integer, double[]>> out) {
            
            // The context is also used to retrieve sendBuf, but only to get its length.
    		int taskId = getRuntimeContext().getIndexOfThisSubtask();
			int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();

			double[] sendBuf = context.getObj(bufferName);
			intsendLen = lengthName ! =null ? context.getObj(lengthName) : sendBuf.length;
			int pieces = pieces(sendLen);
			DistributedInfo distributedInfo = new DefaultDistributedInfo();

            // startPos: the data received by this task. StartPos is the location where the original data should be started. Is determined by task Index.
            // CNT: Which downstream task I sends how much data is determined.
			int startPos = (int) distributedInfo.startPos(taskId, numOfSubTasks, pieces);
			int cnt = (int) distributedInfo.localRowCnt(taskId, numOfSubTasks, pieces);
    
    		// Reduce SUM is done here
			double[][] sum = new double[cnt][];
			double[] agg = new double[cnt];
			do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				int localPos = val.f1 - startPos;
				if (sum[localPos] == null) {
					sum[localPos] = val.f2;
					agg[localPos]++;
				} else{ op.accept(sum[localPos], val.f2); }}while (it.hasNext());    
    
    		// Subtask index is still used as the partition key.
            // Note that the result is sent to all downstream tasks.
			for (int i = 0; i < numOfSubTasks; ++i) {
				for (int j = 0; j < cnt; ++j) {
          // startPos is where the data sent by this task should start from.
          // But the same data is sent to each task I. But startPos + j is important enough that downstream Task I knows where it should store the received data in a predefined variable.
					out.collect(Tuple3.of(i, startPos + j, sum[j]));
				}
			}   
        }
}

sum = {double[1@] []10605} 
 0 = {double[4096] @10613} 
  0 = 118.50000000000001
  1 = 77.7
  2 = 37.2
  3 = 5.9
  4 = 25.0
  5 = 621.1000000000001
  6 = 284.7
  7 = 487.59999999999997
  8 = 166.5
  9 = 99.0
  10 = 136.9
  11 = 95.7
  12 = 39.0
  13 = 7.4
  14 = 26.0
Copy the code

5. AllReduceBroadcastSum

AllReduceSum uses a custom partition (partitionCustom) when sending variables downstream. Index of subtask is used as the key partition.

This has the same meaning as the previous partitionCustom.

6. AllReduceRecv

All workers merge partial results into final result and put it into session context with pre-defined object name.

Each downstream AllReduceRecv receives the cluster (after sum) sent by each upstream AllReduceSum, Each data is then stored to a different part of the predefined variable state corresponding to its Task Manager (the different part is calculated from the received data val.f1).

Combined with the above, it can be seen that AllReduceSend sending and AllReduceRecv receiving are calculated according to the same routine in the data position in the shared variable. Thus AllReduceRecv can be merged.

In this way, all workers merge the results of partial reduce sum into the final result, and then put it into the predefined context variables.

	private static class AllReduceRecv<T> extends RichMapPartitionFunction <Tuple3 <Integer.Integer.double[] >,T> {
		private final String bufferName;
		private final String lengthName;
		private final int sessionId;

		@Override
		public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values, Collector <T> out) throws Exception {
			ComContext context = new ComContext(sessionId, getIterationRuntimeContext());
			Iterator <Tuple3 <Integer, Integer, double[]>> it = values.iterator();
			if(! it.hasNext()) {return;
			}
			double[] recvBuf = context.getObj(bufferName);
			intrecvLen = lengthName ! =null ? context.getObj(lengthName) : recvBuf.length;
			int pieces = pieces(recvLen); // The same routine as AllReduceSend calculates where shared variables should be stored.
			do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				if (val.f1 == pieces - 1) {
					System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, lastLen(recvLen));
				} else {
           // Copy to the appropriate part of the shared variable. Val.f1 is sent upstream. As the starting point for the Merge function.
					System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE); }}while (it.hasNext());
		}
	}

val = {Tuple3@10672} "(3,0,[335.3, 150.89999999999998, 277.5, 99.799999999998, 50.0, 290.9, 136.3, 213.1, 67.8, 50.0, 250.3, 170.899999999998, 73.2, 12.2, 50.0, 0.0....."
 f0 = {Integer@10682} 3
  value = 3
 f1 = {Integer@10638} 0
  value = 0
 f2 = {double[4096] @10674} 
  0 = 335.3
  1 = 150.89999999999998
  2 = 277.5
  3 = 99.79999999999998
  4 = 50.0
  5 = 290.9
  6 = 136.3
  7 = 213.1
  8 = 67.8
  9 = 50.0
  10 = 250.3
  11 = 170.89999999999998
  12 = 73.2
  13 = 12.2
  14 = 50.0
  15 = 0.0.// Each task received the reduce sum result.
recvBuf = {double[15] @10666} 
 0 = 404.3
 1 = 183.1
 2 = 329.3
 3 = 117.2
 4 = 61.0
 5 = 250.3
 6 = 170.89999999999998
 7 = 73.20000000000002
 8 = 12.2
 9 = 50.0
 10 = 221.89999999999998
 11 = 104.1
 12 = 161.29999999999998
 13 = 50.4
 14 = 39.0      
      
Copy the code

7. KMeansUpdateCentroids

Based on point count and coordinates, the new clustering center is calculated. Here is the shared variable CENTROID_ALL_REDUCE stored by AllReduce taken from Task Manager.

/** * Update the centroids based on the sum of points and point number belonging to the same cluster. */
public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);

        // Here is the shared variable stored by AllReduce
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2= =0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else{ stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } stepNumCentroids.f0 = context.getStepNo(); context.putObj(KMeansTrainBatchOp.K, updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance)); }}Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 x0ff reference

My way of parallel computing (4) Reduce and Allreduce for MPI set communication

Message Passing Interface(MPI)

Flink Dataflow, Task, subTask, Operator Chains, Slot introduction

TaskManager of the Flink runtime executes tasks