1. Introduction

The aggregation analysis of big data is very useful in enterprises. Those with experience in developing big data know that ES and Mongo provide special aggregation schemes to solve this problem. However, real-time aggregation of a large amount of data has always been a pain point in business implementation. ES and Mongo are naturally friendly to distribution and tend to store massive data in different fragments. Go language is born for parallelism, and data aggregation can often divide data into blocks. This section combines the parallel computing features of Go language to achieve one-second aggregation of tens of millions of Mongo document data. The author does not have an in-depth study on big data and hopes that experienced readers can provide criticism and more suggestions. Mongo million efficient document aggregation

2. Common aggregation methods of Mongo database

Mongo does not have the same normal form constraints as mysql. It can store complex types, such as arrays, objects and other document structures that mysql is not good at dealing with. At the same time, the operation of aggregation is much more complex than mysql.

Mongo provides three ways to perform document data aggregation. This section summarizes the differences between the three ways:

  • Aggregate Pipeline
  • Converged Computing Model (MapReduce)
  • Individual aggregation commands (group, distinct, count)

2.1 Separate aggregation commands

A single aggregation command has lower performance than Aggregate and flexibility than Mapreduce. Simple to use.

  • Group: can be used for small amount of data document aggregation operation, used to provide more abundant statistical requirements than count, DISTINCT, can use JS functions to control the accounting logic.

Before 2.2, the group operation could only return 10000 group records. However, after 2.2 and 2.4, mongodb has been optimized to return 20000 group records. If the number of group records is greater than 20000, you may need to use other methods to count the group records. Such as converged pipes or MapReduce

  • Count: db.collection.count() is the same as db.collection.find().count(). It is not applicable to the distributed environment

  • Distinct: An index can be used. The syntax is very simple: db.collection.distinct(field,query),field is a deduplication field (single or nested field name). Query is a query condition

2.2 Aggregate Pipeline

The aggregate aggregation framework is based on the data processing pipeline model. The aggregate pipeline aggregation scheme uses the built-in aggregate operation of mongodb, which is relatively more efficient. Aggregate is preferentially recommended in the data aggregation operation of mongodb.

Aggregate can improve performance through indexes, and there are specific tricks to pipe performance (aggregate pipe operations are done in memory, have memory limits, and process limited data sets).

The aggregate pipe operation is similar to the pipe operation in Unix /Linux system. The current document is entered into the first pipe node after processing, and the data is discarded to the next pipe node until the final processing is completed.

The aggregate limit

  1. The aggregate command returns an error when a single document in the result set returned by aggregate is larger than 16MB. (Using aggregate without specifying cursor options or storing the results in the set, the Aggregate command returns a Bson file containing the fields in the result set. This command will generate an error if the total size of the result set exceeds the bson file size limit (16MB);)
  2. The maximum memory limit for pipeline processing cannot exceed 100MB. If the limit is exceeded, an error will be reported. AllowDiskUse can be enabled to handle larger data sets, and pipe operations can be written to temporary files. The application scenario of aggregate is applicable to the scenario (index and combinatorial optimization) that requires certain performance of aggregate response.

2.3 Aggregation calculation model MapReduce

The power of MapReduce lies in its ability to perform complex aggregation logic in parallel across multiple servers. MapReduce is a computing model that simply decomposes a large amount of work (data) (MAP) and executes it, and then merges the results into a final result (REDUCE). MapReduce uses conventional javascript operations to perform Map and Reduce operations. Therefore, MapReduce is more flexible and complex than Aggregate Pipeline, and consumes more performance than Aggregate Pipeline. MapReduce typically uses disk to store preprocessed data, while Pipeline always processes data in memory.

MapReduce is used to process large data result sets. With flexible javascript, MapReduce can handle complex aggregation requirements

3. Implementation principle and common syntax of Aggregate Pipeline

Code using

List<AggregationOperation> operations = new ArrayList<>();
operations.add(Aggregation.group("name").sum("score").as("totleScore"));
Aggregation aggregation = Aggregation.newAggregation(operations);
mongoTemplate.aggregate(aggregation, getCollectionName(), clazz);
Copy the code

Aggregate pipeline in MongoDB uses aggregate(), and its syntax is as follows:

db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

The following is an analogy between the aggregate() method and mysql aggregation

Mongo aggregation operation SQL operations (functions) instructions
$match where Perform a conditional search on the data
$group group by Group aggregation of data
$having having Filter and filter the aggregated data
$project select Select data field
$sort order by Sort the data
$limit limit Limit the amount of data returned
$sum The sum (), count () Aggregate statistics fields

$match and $group operations in aggregate are called stages in pipeline. They provide a wealth of methods to screen the aggregated data, provides $$match gt (>), $lt (<) and $(in), in $nin (not in), $gte (> =), $lte (< =), and so on selection operator.

$group groups documents according to the specified expression and outputs the documents for each different grouping to the next stage. The output document contains an _ID field that contains different groups for the key. The output document can also contain calculated fields that hold the values of some Accumulator expressions grouped by the _id field of $group. The $group does not output specific documentation, only statistics. Grammar:

{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, … }}

  • The _ID field is mandatory; However, the _id value can be specified to null to calculate the cumulative value for the entire input document.
  • The remaining computed fields are optional and computed using operators.

Accumulator accumulator

The name of the describe SQL analogy
$avg Calculate the average AVG avg
$first Returns the first document in each group, sorted if any, or not stored in the default order. Limit 0, 1
$last Returns the last document in each group, sorted if any, or last document in the order not stored by default.
$max Obtain the maximum value corresponding to all documents in the collection according to the grouping. max
$min According to the grouping, get the minimum value corresponding to all documents in the collection. min
$sum Calculate the total sum
$push Adds the specified expression value to an array.

Db.collection.aggregate () is an aggregation channel based on data processing. Each document passes a channel consisting of multiple stages, and the channels at each stage can be grouped and filtered. After a series of processes, the corresponding results can be output. According to this figure, the process of Aggregate treatment can be understood:

The aggregation pipeline can detect if aggregation can be done using only a subset of the fields in the document. If so, the pipe can use only these necessary fields, thereby reducing the amount of data entering the pipe.

Here are a few common optimization tips:

  • 1.$match + $group sequence optimization Using $match to filter document data in $group thousand of pipeline can greatly reduce the number of documents returned by a single pipeline, thus improving efficiency
  • $group + $project; $group + $project; $group + $project
  • $skip + $limit if $skip is followed by $limit, the optimizer moves $limit to the front of $skip, adding the number of $skips to the value of the $limit.
  • 4. If $sort precedes $limit, the optimizer can merge $limit inside $sort. If a limit of n results is specified, then the sorting operation only needs to maintain the first N results, and MongoDB only needs to store N elements in memory

For more aggregation optimization tips, see Mongo Aggregation optimization.

4. Code implementation

4.1 Data Collation

This section of the code to demonstrate the need to use a large amount of data, you can use mysql stored procedures to generate massive data, and then imported into the Mongo database, generation method can refer to: mysql quickly generate millions of data

{
    "_id" : ObjectId("5e06de309d1f74e9badda0db"),
    "username" : "dvHPRGD1"."age" : 87,
    "sex" : 1,
    "salary": 3251} {"_id" : ObjectId("5e06de309d1f74e9badda0dc"),
    "username" : "rNx6NsK"."age" : 7,
    "sex" : 1,
    "salary": 7891}...Copy the code

The document is very simple, randomly generated name (username), age (age), sex (sex), salary (salary) four content (salary), among which age is 0-99 random number, gender only 0 and 1, salary is also a certain range of random number.

4.2 Goals and solutions

With data sources, our goal is simple: quickly get the total number and average salary of people of different ages and genders. In effect, we were asked to aggregate the ages and genders of those millions of data points, and then do the salary calculation.

With that goal in mind, we index the age and gender in the document to speed up our statistics. Considering that we need to get results quickly, we use Mongo’s aggregate pipe to aggregate. As mentioned before, the aggregate pipe has a limit on the size of memory and returned documents, and 10 million data will definitely exceed mongo’s limit on memory usage. In order to solve the problem, Developers often open the disk through allDiskUse parameter to complete data aggregation, but the computing efficiency of disk and memory is a hundred times different, which is bound to affect the aggregation efficiency and cannot ensure real-time performance.

Let’s change our thinking to solve this problem. Although we have a lot of document data, the age is limited, there are only 100 numbers between 0 and 99 years old, and there are only male and female. We can use go to open 100 Goroutines to aggregate document data between 0 and 99 respectively. After the aggregation is completed, the data can be integrated together to complete our aggregation work. Go is perfect for this kind of work because goroutine is cheap to turn on and distributed aggregation of data if it is distributed across different machines. The aggregation task is just right. It can be broken up into smaller tasks, providing a prerequisite for parallel computing in GO. Everything looks just right.

4.3 Code Interpretation

The author of the limited conditions, using their own computer local database to demonstrate, first create a mongo connection singleton (if the environment is distributed, you can use connection pool)

// mongoAggregate/mongoClient/mongoClient.go
package mongoClient

import (
	"context"
	"fmt"
	"time"

	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type MongoClient struct {
	Client *mongo.Client
	Collection *mongo.Collection
}

var (
	GMongo *MongoClient
)

func InitMongodb() { var( ctx context.Context opts *options.ClientOptions client *mongo.Client err error collection *mongo.Collection ) CTX = context.withTimeout (context.background (), 10* time.second) // CTX opts = options.client ().applYuri ()"Mongo: / / 127.0.0.1:27017")  // opts
	ifclient, err = mongo.Connect(ctx,opts); err ! = nil{ fmt.Println(err)return} // Connect Database and table collection = client.database ("screen_data_stat").Collection("test"GMongo = &mongoClient {Client: Client, Collection: Collection,}}...... // Initialize Mongo connection func in main.goinit() {
	mongoClient.InitMongodb()
}
Copy the code

Aggregate functions are implemented in aggregate packets:

package aggregate

import (
	"context"
	"log"
	"mongoAggregate/mongoClient"
	"sync"
	"time"

	bson2 "go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func genPipeline(age int) (bson2.D, bson2.D, bson2.D) {
	matchStage := bson2.D{
		{"$match", bson2.D{
			{"age",
				bson2.D{
					{"$eq", age},
				}},
		}},
	}
	groupStage := bson2.D{
		{"$group", bson2.D{
			{"_id", bson2.D{
				{"age"."$age"},
				{"sex"."$sex"}}, {},"age", bson2.D{
				{"$first"."$age"}}, {},"sex", bson2.D{
				{"$first"."$sex"}}, {},"total", bson2.D{
				{"$sum", 1}}}, {"avgSalary", bson2.D{
				{"$avg"."$salary"},
			}},
		}},
	}
	projectStage := bson2.D{
		{"$project", bson2.D{
			{"_id", 0},
			{"age", 1},
			{"sex", 1},
			{"total", 1},
			{"avgSalary", 1}}}},return matchStage, groupStage, projectStage
}

func DataAggregate(age int, resultChan chan bson2.M, wg *sync.WaitGroup) {
	matchStage, groupStage, projectStage := genPipeline(age)
	opts := options.Aggregate().SetMaxTime(15 * time.Second)
	cursor, err := mongoClient.GMongo.Collection.Aggregate(context.TODO(), mongo.Pipeline{matchStage, groupStage, projectStage}, opts)
	iferr ! Var results [] bson2.m = nil {log.fatal (err)} // Prints the document contentsiferr = cursor.All(context.TODO(), &results); err ! = nil { log.Fatal(err) }for _, result := range results {
		resultChan <- result
	}
	wg.Done()
}
Copy the code

GenPipeline method is used to generate each stage of monGO aggregation pipeline. Because go language can return multiple values, multi-value receiving is used in DataAggregate to transmit the aggregated results through channel resultChan to complete aggregation. Sync.waitgroup is set to control the exit of the main function before other Goroutines, which is used to control concurrency.

Since we’re using multiple goroutines running concurrently, we get results that actually depend on how long it takes the slowest goroutine to complete the task. We process the results as follows: sort them, format them as JSON, and then define the output as follows:

// Resultslice. go package output/ / Sort by person. Age from largest to smallesttype OutPut struct {
	Age int32 `json:"age"`
	Sex int32 `json:"sex"`
	Total int32 `json:"total"`
	AvgSalary float64 `json:"avg_salary"`}typeResultSlice [] OutPut func (a ResultSlice) Len() int {// Override Len() methodreturnLen (a)} func (a ResultSlice) Swap(I, j int) {// Rewrite the Swap() method a[I], a[j] = a[j], a[I]} func (a ResultSlice) Less(I, J int) bool {// rewrite Less() to sort from largest to smallestreturn a[j].Age < a[i].Age
}
Copy the code

With the sorting function interface implemented above, we can achieve the output results sorted by age.

Then it becomes clear what the main function does:

func main() {
	dataStatResult := make(chan bson2.M)
	var output output2.ResultSlice
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go aggregate.DataAggregate(i, dataStatResult, &wg)
	}

	for value := range dataStatResult {
		output = append(output, output2.OutPut{
			Age:       value["age"].(int32),
			Sex:       value["sex"].(int32),
			Total:     value["total"].(int32),
			AvgSalary: value["avgSalary"]. (float64),})if len(output) == 200 {
			break}} wg.wait () // sort sort (output)for _, v := range output {
		result, err := json.Marshal(&v)
		iferr ! = nil { fmt.Printf("json.marshal failed, err:", err)
			return
		}
		fmt.Println(string(result))
	}
}
Copy the code

First, define a channel for the main Goroutine to communicate with other concurrent Goroutines and to accept the results calculated by other Goroutines. In this example, 100 goroutines are enabled for group aggregation and the aggregated results are received through dataStatResult channel. After all the work is completed, the results are sorted according to age and formatted into JSON Output. That’s the logic behind concurrent aggregation of massive amounts of data. Here are the results of the author’s aggregation of 0-20 years old (about 2 million data, and the aggregation was completed in 200ms):

{"age": 19,"sex": 0."total": 49773,"avg_salary": 5346.04197054628} {"age": 19,"sex": 1,"total": 49985,"avg_salary": 4677.7744523357005} {"age": 18."sex": 0."total": 48912,"avg_salary": 5335.430671409879} {"age": 18."sex": 1,"total": 50136,"avg_salary": 4540.624461464816} {"age": 17."sex": 0."total": 49609,"avg_salary": 5372.679755689492}...Copy the code

5. Summary

This paper mainly describes the application in the statistical scene of big data aggregation. In fact, no matter whether the scene requires real-time performance, there is the existence of block aggregation idea. The MapReduce polymerization of Mongo and bucketing of ES are both batches of big data aggregation into small tasks to complete one by one and finally complete the target. They are not very efficient.

The original 6.

Juejin. Cn/post / 684490…