# quote

Apache Kylin is an open source distributed analysis engine that provides SQL query interfaces on top of Hadoop and multi-dimensional analysis (OLAP) capabilities to support very large scale data. It can query huge data sets in subseconds.

The TopN metric has been added since Kylin 1.5 and has not changed much since Kylin 3.x. If you want to know about the TopN implementation principle before Kylin3, please refer to the following article:


In September 2020, the Apache Kylin community released version of Kylin 4.0.0-alpha. In this article, we will introduce TopN implementation in Apache Kylin 4.0.0-alpha in detail.


Let’s start with a typical TOPN application scenario. When we do data analysis on the e-commerce platform, we may often need to check which sellers are among the top 100 in sales. The SQL query example is as follows:

SELECT kylin_sales.part_dt, seller_id

FROM kylin_sales


kylin_sales.part_dt, kylin_sales.seller_id

ORDER BY SUM(kylin_sales.price) desc LIMIT 100;

In the case of large data volume, if you want to request TOPN data, if Group by is first followed by calculating all sum(price), and then sorting sum(price), the total calculation cost here is very large.

TopN introduction

By introducing the TopN realization principle of Kylin 3.x, we know that TopN of Kylin 3 and previous versions use space-saving algorithm, and make optimization on it. Code can check org. Apache. Kylin. Measure. The topn. TopNCounter. Kylin 4.0 continues to use space-saving algorithm, and has made optimization on the basis of Kylin 3.x topnCounter. However, the current TopN also has some errors, which will be explained in detail later.

TopN implementation

The current Kylin4 TopN UDAF is registered in the org. Apache. Kylin. Engine. The spark. Job. CuboidAggregator# aggInternal, code is as follows:

def aggInternal(ss: SparkSession, dataSet: DataFrame, dimensions: util.Set[Integer], measures: util.Map[Integer, FunctionDesc], isSparkSql: Boolean): DataFrame = {/ / omit measure. The expression. The toUpperCase (Locale. ROOT) match {/ / omit case "TOP_N" = > / / Uses new TopN aggregate function // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala val schema = StructType(measure.pra.map { col => val dateType = col.dataType if (col == measure) { StructField(s"MEASURE_${col.columnName}", dateType) } else { StructField(s"DIMENSION_${col.columnName}", dateType) } }) if (reuseLayout) { new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr) .toAggregateExpression()).as(id.toString) } else { new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, Column.drop (1).map(_.expr).toaggregateExpression ()).as(id.toString) As (id. ToString)}}. ToSeq // If (ReuseLayout) {val Columns = NSparkCubingUtil.getColumns(dimensions) ++ measureColumns(dataSet.schema, measures) df.select(columns: _*) } else { df } }

Actually TopN initial implementation in org. Apache. Kylin. Engine. The spark. Job. TopNUDAF, but you can see the TopN implementation is in org. Apache. The spark. SQL. Udaf. BaseTopN. Scala, The latest implementation mainly fixes performance issues for the older implementation, as detailed in Kylin-4760.

Topn of Kylin 4.0 is implemented by Spark UDAF. The following is the relationship between the interfaces of the implementation classes. It can be seen that the final implementation is BaseTopn and inherits TypeDimPerativeAgaggregate. Then BaseTopN has two subclasses, encodeTopn and reuseTopn, and when you start building from the FlatTable, the TopN is not built in the FlatTable, so encodeTopn is called, ReuseTopn is then called when the next layer of Cuboid is built from the existing Cuboid to avoid double counting. The interface diagram is as follows:

To implement TopN by inheriting typedimPerativeAggregate, Rather than UserDefineAggregateFunction mainly because UserDefinedAggregateFunction is put inside the catalyst internalRow type conversion to Row type, And then we use the user’s own update method, and then we have to do the serialization and deserialization of the TypeDimPerativeAggregate, so there’s one less layer of conversion.

TopNCounter introduction

As mentioned above, the space-saving algorithm is implemented in TopnCounter. Here we will briefly introduce the implementation of TopnCounter. When the Basetopn object is initialized, the topnCounter object is created and the user saves the rows that meet the TopN condition during the computation. The corresponding concept of Spark UDAF is Aggregate Buffer. Update, Merge, and Eval are all processed topnCounter. TopnCounter needs to be initialized to specify the size. The recommended size is N topnCounter.EXTRA_SPACE_RATE, where N is the size defined by topN. So if we define topn(10,4), then topnCounter is going to be initialized to 10, 10 = 100.

The processing process of TOPN can be seen in the following figure:

Update () inserts a row into the topnCounter object using topnCounter.offer (). Merge () inserts a row into the topnCounter object using topnCounter. Merge () inserts a row into the topnCounter object using topnCounter.offer (). Finally in the eval () call TopNCounter. SortAndRetain () to sort and adjust TopNCounter size, final polymerization results are obtained.


Kylin 4.0 currently uses PARQUET for storage. We define topn(10,4), and topnCounter.EXTRA_SPACE_RATE is set to 1. The mapping between dimensions and measures in Cuboid is as follows:

0 -> seller_id

1 -> item_id

2 -> id

3 -> price

4 -> Count

5 -> TopN

Here is a cuboid with TopN and SUM only:

Note that in the second row, Count is 11, but in fact the TopN column only stores 10 values. This is because topnCounter has a capacity of only 10 * EXTRA_SPACE_Rate = 10. Anything above 10 will not be stored, which is the reason for the current TopN error. As you can see, TopN puts the calculated dimension together with the dimension of group by and stores it as an array.

For the SUM metric, Kylin is the aggregate value of the sum directly stored.