Introduction to the

Spark is the most popular distributed batch processing framework for big data. Using Spark, SQL operations of hundreds of GIGABytes or even T-level data can be easily implemented, such as single-row feature calculation or Join joining of multiple tables.

OpenMLDB is an open source database project optimized for AI scenarios, which implements data and computing consistency in offline MPP scenarios and online OLTP scenarios. In fact, MPP engine can be implemented based on Spark, and the performance can be improved several times by expanding the Spark source code.

Spark itself is also very efficient, with Antlr based lexical parsing and syntax analysis of standard ANSI SQL, as well as a lot of SQL static optimization in the Catalyst module, and then converted to distributed RDD computation. The underlying data structure is UnsafeRow, which uses the Java Unsafe API to customize memory distribution, and relies on the Janino JIT compiler to dynamically generate optimized JVM Bytecode for computation methods. However, there is still room for improvement in scalability, especially in machine learning computing scenarios where the requirements can be met but are not efficient. This paper takes LastJoin as an example to introduce how OpenMLDB can achieve multiple or even tens of times performance improvement by expanding Spark source code.

Machine learning scenario LastJoin

LastJoin is a special type of Join table introduced in AI scenarios. It is a variant of LeftJoin. If Join conditions are met, each row of the left table will only Join the last row of the submitted right table. The semantic feature of LastJoin ensures that the number of rows in the output is the same as that in the input left table. In machine learning scenarios, the number of input sample tables is kept consistent, and the final sample number will not increase or decrease due to data operations such as table assembling. This method is friendly to online service support and more in line with scientists’ modeling needs.

From the perspective of technology protection, the design and implementation of LastJoin are patents of Fourth Paradigm (Beijing) Technology Co., LTD., the public number is 111611245A, and the public date is 2020-09-01. The OpenMLDB project code that includes LastJoin is open source on Github under the Apache 2.0 protocol and is available to all users.

LastJoin implementation based on Spark

The LastJoin type is not a standard in ANSI SQL, so it is not implemented in mainstream computing platforms such as SparkSQL. Users can only use lower-level DataFrame or RDD operators to achieve similar functions. The idea of implementing LastJoin based on Spark operator is to add index columns to the left table first, then use standard LeftOuterJoin, and finally reduce and remove index rows from the splicing results. Although LastJoin semantics can be achieved, the performance is still very bottleneck.

Compared with the compatible SQL functions and syntax, Spark allows users to use interfaces such as Map, Reduce, and Groupby and customize UDFs to implement numerical calculation logic that standard SQL does not support. However, users of the Join function cannot use DataFrame or RDD API to expand the implementation, because the implementation of the Join table is implemented in Spark Catalyst physical nodes, which involves the joining of multiple internal rows after shuffle. And generate Java source strings for JIT. Depending on the amount of input table data, Spark will select BrocastHashJoin, SortMergeJoin, or ShuffleHashJoin. The RDD API cannot be used by ordinary users to extend these piecing table implementation algorithms.

You can view the full Spark LastJoin implementation in the OpenMLDB project at github.com/4paradigm/O… .

The first step is to add the index column to the left table, as long as each row of the index column has a unique ID.

// Add the index column for Spark DataFrame def addIndexColumn(spark: SparkSession, df: DataFrame, indexColName: String, method: String): DataFrame = { logger.info("Add the indexColName(%s) to Spark DataFrame(%s)".format(indexColName, df.toString())) method.toLowerCase() match { case "zipwithuniqueid" | "zip_withunique_id" => addColumnByZipWithUniqueId(spark, df, indexColName) case "zipwithindex" | "zip_with_index" => addColumnByZipWithIndex(spark, df, indexColName) case "monotonicallyincreasingid" | "monotonically_increasing_id" => addColumnByMonotonicallyIncreasingId(spark, df, indexColName) case _ => throw new HybridSeException("Unsupported add index column method: " + method) } } def addColumnByZipWithUniqueId(spark: SparkSession, df: DataFrame, indexColName: String = null): DataFrame = { logger.info("Use zipWithUniqueId to generate index column") val indexedRDD = df.rdd.zipWithUniqueId().map { case (row, id) => Row.fromSeq(row.toSeq :+ id) } spark.createDataFrame(indexedRDD, df.schema.add(indexColName, LongType)) } def addColumnByZipWithIndex(spark: SparkSession, df: DataFrame, indexColName: String = null): DataFrame = { logger.info("Use zipWithIndex to generate index column") val indexedRDD = df.rdd.zipWithIndex().map { case  (row, id) => Row.fromSeq(row.toSeq :+ id) } spark.createDataFrame(indexedRDD, df.schema.add(indexColName, LongType)) } def addColumnByMonotonicallyIncreasingId(spark: SparkSession, df: DataFrame, indexColName: String = null): DataFrame = { logger.info("Use monotonicallyIncreasingId to generate index column") df.withColumn(indexColName, monotonically_increasing_id()) }Copy the code

The second step is to carry out the standard LeftOuterJoin. Since OpenMLDB is based on C++, multiple join condition expressions should be converted into Spark expressions (encapsulated as Spark Column objects). Then call Spark DataFrame’s join function, using “left” or “left_outer”.

val joined = leftDf.join(rightDf, joinConditions.reduce(_ && _),  "left")
Copy the code

The third step is to reduce the table after joining, because it is possible to expand the input data through LeftOuterJoin, that is, the transformation of 1:N, and all the newly added rows have the unique ID of the index column expansion in the first step, so reduce for the unique ID is enough. The Spark DataFrame’s groupByKey and mapGroups interfaces are used (note that this API is not supported under Spark 2.0), and the maximum or minimum value of each group can be obtained if additional sort fields are available.

val distinct = joined
  .groupByKey {
    row => row.getLong(indexColIdx)
  }
  .mapGroups {
    case (_, iter) =>
      val timeExtractor = SparkRowUtil.createOrderKeyExtractor(
        timeIdxInJoined, timeColType, nullable=false)

      if (isAsc) {
        iter.maxBy(row => {
          if (row.isNullAt(timeIdxInJoined)) {
            Long.MinValue
          } else {
            timeExtractor.apply(row)
          }
        })
      } else {
        iter.minBy(row => {
          if (row.isNullAt(timeIdxInJoined)) {
            Long.MaxValue
          } else {
            timeExtractor.apply(row)
          }
        })
      }
  }(RowEncoder(joined.schema))
Copy the code

The last step is simply to remove the index column, which can be achieved through the pre-specified index column name.

distinct.drop(indexName)
Copy the code

The LastJoin solution based on Spark operator is the most efficient implementation based on Spark programming interface at present. For earlier versions such as Spark 1.6, interfaces such as mapPartition are needed to realize functions similar to mapGroups. Since it is based on LeftOuterJoin, the implementation of LastJoin is worse than LeftOuterJoin, and the actual output data is less. The overall memory consumption is still very large when a large number of splicing conditions can be met for the left and right tables. So here’s a look at the native LastJoin implementation based on the Spark source modification to avoid these problems.

Extend the LastJoin implementation of Spark source code

The native LastJoin implementation is the LastJoin function implemented directly on Spark source code, rather than based on Spark DataFrame and LeftOuterJoin, which is a huge optimization in terms of performance and memory consumption. OpenMLDB uses customized Spark distribution, which depends on the Spark source code in Github open-source Github-4Paradigm/Spark at v3.0.0-OpenMLDB.

To support LastJoin, add the last syntax to JoinType. Spark’s ANTLR-based SQL parsing directly converts the SQL join type to JoinType. Therefore, you only need to modify the joinType. scala file.

object JoinType {
  def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    // Add by 4Paradigm
    case "last" => LastJoinType
    case "rightouter" | "right" => RightOuter
    case "leftsemi" | "semi" => LeftSemi
    case "leftanti" | "anti" => LeftAnti
    case "cross" => Cross
    case _ =>
      val supported = Seq(
        "inner",
        "outer", "full", "fullouter", "full_outer",
        "last", "leftouter", "left", "left_outer",
        "rightouter", "right", "right_outer",
        "leftsemi", "left_semi", "semi",
        "leftanti", "left_anti", "anti",
        "cross")

      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
        "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
  }
}
Copy the code

The LastJoinType is implemented as follows.

// Add by 4Paradigm
case object LastJoinType extends JoinType {
  override def sql: String = "LAST"
}
Copy the code

In the Spark source code, there are also syntax checking classes and optimizer classes that check for internally supported join types, Therefore in the Analyzer. The scala, Optimizer. Scala, basicLogicalOperators. Scala, SparkStrategies. Scala several files need to have this simple change, Scala Switch Case supports the addition of new join types to enumeration types, which we will not describe here, whenever parsing and runtime support for new enumeration types is lacking.

// the output list looks like: join keys, columns from left, columns from right
val projectList = joinType match {
  case LeftOuter =>
    leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
  // Add by 4Paradigm
  case LastJoinType =>
    leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
  case LeftExistence(_) =>
    leftKeys ++ lUniqueOutput
  case RightOuter =>
    rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
  case FullOuter =>
    // in full outer join, joinCols should be non-null if there is.
    val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
    joinedCols ++
      lUniqueOutput.map(_.withNullability(true)) ++
      rUniqueOutput.map(_.withNullability(true))
  case _ : InnerLike =>
    leftKeys ++ lUniqueOutput ++ rUniqueOutput
  case _ =>
    sys.error("Unsupported natural join type " + joinType)
}
Copy the code

After parsing the syntax and supporting the new join type in the data structure, it is important to change the implementation code of the three Spark Join physical operators. If the right table is small, Spark automatically optimizes to BrocastHashJoin. In this case, the right table is copied to all executor memory by broadcast. If you traverse the right table, you can find all rows that match the join condiction. If internal right table does not conform to the conditions keep left table row and right table field value is null, if you have one or more lines qualified merge two internal row to output the internal row, code implementation in BroadcastHashJoinExec. Scala. Because of the new Join Type enumeration type, we modify the two methods to support this join type, and to distinguish them from the previous implementation of join Type with parameters.

  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    joinType match {
      case _: InnerLike => codegenInner(ctx, input)
      case LeftOuter | RightOuter => codegenOuter(ctx, input)
      // Add by 4Paradigm
      case LastJoinType => codegenOuter(ctx, input, true)
      case LeftSemi => codegenSemi(ctx, input)
      case LeftAnti => codegenAnti(ctx, input)
      case j: ExistenceJoin => codegenExistence(ctx, input)
      case x =>
        throw new IllegalArgumentException(
          s"BroadcastHashJoin should not take $x as the JoinType")
    }
  }
Copy the code

The core implementation code of BrocastHashJoin is also implemented by JIT, so we need to modify the logic of codeGen into Java code string. In codegenOuter function, the original implementation of LeftOuterJoin is retained. And use the previous arguments to distinguish whether to use the new Join type implementation. The logic of the change here is also very simple, because the new Join type only needs to ensure that the right table has a row to return, so there is no need to traverse the right table candidate set through while.

// Add by 4Paradigm if (isLastJoin) { s""" |// generate join key for stream side |${keyEv.code} |// find matches from HashRelation |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value}); |boolean $found = false; |// the last iteration of this loop is to emit an empty row if there is no matched rows. |if ($matches ! = null && $matches.hasNext() || ! $found) { | UnsafeRow $matched = $matches ! = null && $matches.hasNext() ? | (UnsafeRow) $matches.next() : null; | ${checkCondition.trim} | if ($conditionPassed) { | $found = true; | $numOutput.add(1); | ${consume(ctx, resultVars)} | } |} """.stripMargin }Copy the code

Then modify the implementation of SortMergeJoin to support the new join type. If the right table is too large to broacast directly, SortMergeJoin will most likely be used. The implementation principle is similar to the previous modification, except that it is not implemented by JIT. Therefore, you can directly modify the logic of the table to ensure that as long as a row meets the conditions, it can be joined and returned.

private def bufferMatchingRows(): Unit = { assert(streamedRowKey ! = null) assert(! streamedRowKey.anyNull) assert(bufferedRowKey ! = null) assert(! bufferedRowKey.anyNull) assert(keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0) // This join key may have been produced by a mutable projection, so we need to make a copy: matchJoinKey = streamedRowKey.copy() bufferedMatches.clear() // Add by 4Paradigm if (isLastJoin) { bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow]) advancedBufferedToRowWithNullFreeJoinKey() } else { do { bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow]) advancedBufferedToRowWithNullFreeJoinKey() } while (bufferedRow ! = null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0) } }Copy the code

When we call the outerJoin function to traverse the stream table, we modify the core traversal logic to ensure that the left table is retained and null is added if the left table cannot be joined. Return as soon as you reach a line.

private def outerJoin( streamedIter: Iterator[InternalRow], hashedRelation: HashedRelation, isLastJoin: Boolean = false): Iterator[InternalRow] = { val joinedRow = new JoinedRow() val keyGenerator = streamSideKeyGenerator() val nullRow = new GenericInternalRow(buildPlan.output.length) streamedIter.flatMap { currentRow => val rowKey = keyGenerator(currentRow) joinedRow.withLeft(currentRow) val buildIter = hashedRelation.get(rowKey) new RowIterator { private var found = false override def advanceNext(): Boolean = { // Add by 4Paradigm to support last join if (isLastJoin && found) { return false } // Add by 4Paradigm to support last join if (isLastJoin) { if (buildIter ! = null && buildIter.hasNext) { val nextBuildRow = buildIter.next() if (boundCondition(joinedRow.withRight(nextBuildRow))) { found = true return true } } } else { while (buildIter ! = null && buildIter.hasNext) { val nextBuildRow = buildIter.next() if (boundCondition(joinedRow.withRight(nextBuildRow))) { found = true return true } } } if (! found) { joinedRow.withRight(nullRow) found = true return true } false } override def getRow: InternalRow = joinedRow }.toScala } }Copy the code

By modifying JoinType and three Join physical nodes, users can use SQL or DataFrame interface to create new JoinType logic like other built-in Join types. After JoinType, ensure that the number of rows output is the same as the left table. The result is the same as the previous scheme based on LeftOuterJoin + dropDuplicated.

LastJoin implements performance comparison

Since the new Join algorithm is implemented, let’s compare the performance of the previous two solutions. The former is directly based on the latest Spark 3.0 open source version and uses broadcast Join to optimize the performance of small data without modifying the Spark optimizer. The latter directly uses the compiled version after modifying the Spark source code. Spark is also optimized to implement broadcast Join for small data.

First of all, the join Condiction can splicing multiple lines. As LeftOuterJoin can splicing multiple lines, the LeftOuterJoin output table in the first stage will be much larger and the dropDuplication in the second stage will be more time-consuming. LastJoin, on the other hand, does not degrade performance due to the splicing of multiple rows because it returns a single row during shuffle.

From the results, the performance difference is also obvious. Since the data volume of the right table is relatively small, Spark will optimize the three groups of data into the implementation of Broadcast Join. LeftOuterJoin will join multiple rows, so the performance is much slower than the new LastJoin. When the amount of data increases, the result table of LeftOuterJoin is more explosive and its performance decreases exponentially, which is tens to hundreds of times different from that of LastJoin. Finally, it may fail due to OOM, while LastJoin will not significantly degrade due to the increase of the amount of data.

This method is unfair to LeftOuterJoin + dropDupilicated, so we add a test scenario to ensure that the left table can only join a row with the right table. In this way, the results of LeftOuterJoin and LastJoin are exactly the same. In this scenario, performance comparison is more meaningful.

The result shows that the performance difference is not so obvious, but LastJoin is still nearly twice as fast as the former scheme. The first two groups of right table data are relatively small and optimized by Spark into broadcast Join implementation, while the last group is not optimized and implemented by Sorge Merge Join implementation. As can be seen from the code generated by BroadcastHashJoin and SortMergeJoin, the implementation logic of LeftOuterJoin and LastJoin is basically the same if only one row of the right table is successfully joined. Then the performance difference is mainly that the former scheme also needs to perform a dropheaded calculation. Although the computational complexity of this stage is not high, the time occupied in a small data scale is relatively large. No matter which test scheme modiates Spark source code in this special table pooling scenario or the implementation scheme with the best performance.

Technical summary

In conclusion, by understanding and modifying Spark source code, the OpenMLDB project can implement the new table pooling algorithm logic according to business scenarios. In terms of performance, compared with the implementation of the original Spark interface, the performance can be greatly improved. Spark source code covers SQL syntax parsing, Catalyst logic plan optimization, JIT code dynamic compilation, etc. With these foundations, you can further expand Spark’s functions and performance. In the future, OpenMLDB will continue to share more technical details about Spark optimization.

We also welcome more developers to pay attention to and participate in OpenMLDB open source projects.