Flink+Hologres 100 million users of real-time UV precision de-duplication best practices

UV and PV calculations, due to different business requirements, are usually divided into two scenarios:

• Off-line computing scenario: T+1 is the main one, and historical data is calculated • Real-time computing scenario: daily newly added data is calculated in real time, and user tags are de-weighed

For offline computing scenarios, Hologres provides ultra-high cardinality UV calculation based on RoaringBitmap. It only needs to carry out a finest-grained pre-aggregation calculation, and only generates a finest-grained pre-aggregation result table, so as to reach sub-second query. See the previous article > on how >Hologres supports ultra-high cardinality UV computing (implemented based on RoaringBitmap) for details.

For real-time computing scenarios, Flink+Hologres can be used, and based on RoaringBitmap, real-time de-duplication of user tags can be used. In this way, the user’s UV and PV data can be obtained in fine granularity in real time, and the minimum statistical window (such as the last 5 minutes of UV) can be adjusted according to the demand, so as to achieve the effect of similar real-time monitoring and better BI display on the large screen. Compared with the de-weighting of days, weeks, months, etc., it is more suitable for finer-grained statistics on activity dates, and statistical results of larger time units can also be obtained through simple aggregation.

The juche idea

  1. Flink converts streaming data to table and dimension table for JOIN operation, and then converts streaming data. In this way, the insertifNotExists feature of the Hologres dimension table can be used in combination with the self-incremeting field to achieve efficient UID mapping.
  2. Flink processes the associated result data according to the time window, aggregates it with RoaringBitmap according to the query dimension, stores the query dimension and the aggregated UID in the aggregation result table, and puts the aggregated UID results into the RoaringBitmap field of Hologres.
  3. When querying, similar to offline mode, the aggregated result table is directly queried according to the query conditions, and the number of corresponding users can be obtained after the OR operation of the key RoaringBitmap field and the statistics of the cardinal number.
  4. The processing process is shown in the figure below

Solution best practices

1. Create related base tables

1) Create table UID_MAPPING as UID mapping table for mapping UID to 32-bit int type.

• The RoaringBitmap type requires that the user ID must be 32-bit int and the denser the better (i.e. the user ID is contiguous). Many user IDs in common business systems or buried sites are strings or Long types, so you need to use the UID_MAPPING type to build a mapping table. The mapping table utilizes Hologres SERIAL type (self-incrementing 32-bit int) to achieve automatic management and stable mapping of user mappings.

• Since it is real-time data, set this table as row save table to improve QPS of Flink dimension table real-time JOIN.

BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); Call set_table_property(' public-uid_mapping ', 'clustering_key', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;

2) Create the table DWS_APP as the basic aggregation table, which is used to store the aggregated results on the basic dimension.

• RoaringBitmap Extention needs to be created before using RoaringBitmap, and the Hologres instance is required to be version 0.10

CREATE EXTENSION IF NOT EXISTS roaringbitmap;

• For better performance, it is recommended to set the Shard number reasonably according to the data volume of the base aggregate table, but it is recommended that the Shard number of the base aggregate table should not exceed the Core number of computing resources. The following method is recommended for setting the number of shards from the Table Group

Mysql > create Table Group with shard count 16; mysql > create Table Group with shard count 16; CREATE TABLE tg16 (a int); --Table Group call set_table_property('tg16', 'shard_count', '16'); COMMIT;

• Compared to the offline result table, this result table adds a timestamp field to implement statistics in Flink window cycles. The result table DDL is as follows:

BEGIN; CREATE TABLE DWS_APP (COUNTRY TEXT, PROV TEXT, CITY TEXT, YMD TEXT NOT NULL, UID32_BITMAP ROARINGBITMAP can be used to record UV PRIMARY KEY (COUNTRY, PROV, CITY, YMD, Timetz)-- query dimension and time as primary keys to prevent repeated data inserts); CALL set_table_property('public.dws_app', 'orientation', 'column'); Call set_table_property(' public-dws_app ', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); Table group call set_table_property('public. DWS_APP ', 'colocate_with', 'tg16'); SET GROUP BY CALL SET_TABLE_PROPERTY (' PUBLIC.DWS_APP ', 'Distribution_KEY ',' COUNTRY, PROV, CITY '); COMMIT;

2.Flink reads the data in real time and updates the DWS_APP basic aggregation table

See Alibabacloud – Hologres-Connectors Examples for the complete source code

1) Flink stream reads data source (DataStream) and converts it into source Table (Table)

DatastreamSource odsStream = env. CreateInput (CSVInput, TypeInfo); // join with dimension table to add proctime; See https://help.aliyun.com/document_detail/62506.html for Table odsTable = tableEnv. FromDataStream (odsStream, $(" uid "), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); / / register to the catalog environment tableEnv createTemporaryView (" odsTable, "odsTable);

2) Associate the source table with the Hologres dimension table (UID_Mapping)

The dimension table uses insertifNotExists parameter, that is, it inserts itself when data cannot be queried, and the UID_INT32 field can be automatically created by using Hologres serial type.

// create the Hologres dimension table, String format("create table uid_mapping_dim(" + "uid) String, "+" uid_int32 INT "+") with (" + "' connector '=' hologres'," + "' dbname '=' % s', / / hologres DB name +" " 'tablename' = '% s', "/ / Hologres table name +"' username '=' % s', "/ / current account access id +" '"' = '% s', / / current account to access the key + "" 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // join String odsJoindim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid =  dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3) The correlation results were converted into DataStream, processed through Flink time window, and aggregated in combination with RoaringBitmap

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source // Prov, city, ymd).keyBy(0, 1, 2, 3) // Scrolling time window; Here due to the use of read CSV analog input flow, USES the ProcessingTime, EventTime can be used in practical use. The window (TumblingProcessingTimeWindows. Of (Time. Minutes (5))) / / trigger, Can get aggregated results. At the end of the window not trigger (ContinuousProcessingTimeTrigger. Of (Time) you (1))). The aggregate (/ / aggregation function, according to the key dimensions By screening, New AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) {// Add a 32-bit UID to Roaringbitmap to deduplicate acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); }}, // window function, New WindowFunction< RoaringBitmap, Tuple6<String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // Optimize RoaringBitmap result.runopTimize (); / / RoaringBitmap can be converted to an array of bytes in the deposited in the Holo byte [] byteArray containing = new byte [result. SerializedSizeInBytes ()]; result.serialize(ByteBuffer.wrap(byteArray)); // Tuple6.f4(Timestamp) represents the window length as the period for statistics. Collect (new Tuple6<>(Keys.getField (0), Keys.getField (1), Keys.getField (2), Keys.getField (3), Keys.getField (2), Keys.getField (3), Keys. new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); }});

4) Write the result table

It's important to note that, Table Restable = TableEnv.FromDataStream (ProcessSedSource, ProcessSedSource, ProcessSedSource, ProcessSedSource) $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); // create the Hologres result table; String createHoLogRestable = String.format(" Create Table Sink (" + "country) string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // write the result of calculation to tableenv.executeSQL ("insert into sink select * from "+ resTable);

3. Data query

When querying, perform aggregation calculation according to the query dimension from the basic aggregation table (DWS_APP), query the bitmap cardinal number, and obtain the number of users under the condition of GROUP BY

• Check the UV of each city on a certain day

Run RB_AGG; Set hg_experimental_enable_force_three_stage_agg= OFF SELECT COUNTRY, PROV, CITY FROM EXPERIMENTAL_ENABLE_FORCE_THREE_STAGE_AGG = OFF SELECT COUNTRY, PROV, CITY FROM EXPERIMENTAL_ENABLE_FORCE_THREE_STAGE_AGG FROM ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;

• Check the UV of each province in a certain period

Run RB_AGG; Set hg_experimental_enable_force_three_stage_agg=off SELECT country,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19  19:00:00+08' GROUP BY country ! [image.png](/img/bVcS3Mq)

This article is the original content of Aliyun, shall not be reproduced without permission.