As a ten-billion-level flow real-time analysis and statistics system how can there be no PV/UV these two classic super Maria indicators, say 500 years ago, it is the two ancestors, cough… Sorry, back to the main text, big pig in the last article has introduced the small high-performance ETL program design and implementation, by now, our data has been landed on Hbase, and the log time has been written to Mysql, everything is already available, next we are going to improve the index, Let’s start with two classic metrics.

Program flow

Let’s first take a look at the calculation process of the whole program, please see the big picture:

  1. Starting the calculation is our Driver entry point

  2. Check if listening Redis has received program exit notification before starting calculation, if any program ends, otherwise proceed

  3. Start by querying the average time point for the progress of the ETL LogHub log from our previous article

  4. Switch is to judge whether the time difference between Loghub and the index time we calculated last time is enough. Generally, it is defined as 3 minutes later, because the time of Loghub will fluctuate a little

  5. If not, Sleep for 30 seconds. You can control the Sleep range by yourself.

  6. If yes, calculate the end time of the last indicator calculation ~ (LogHub time – 3 minutes log fluctuation)

  7. After calculating the result of the update indicator and updating the calculation time of the indicator, go back to point 2.

Program implementation

Start at the DriverMain entrance

// Listen for redis exit messageswhile (appRunning) {
      val dbClient = new DBJdbc(props.getProperty("jdbcUrl"// Offset val loghubTime = dbClient.query("loghub"). ToLocalDateTime. MinusMinutes (3) / / indicators calculated offset val indicatorTime = dbClient. Query ("indicator").tolocaldatetime // Val betweentime = Duration. Between (indicatorTime, loghubTime).toMinutes val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS") // The difference is enough time to carry out indicator operation, otherwise sleepif(betweenTimeMinutes >= 1) {app.run(Spark, indicatorTime, loghubTime)"offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")}elseTimeunit.seconds.sleep (30)}}Copy the code

From the notes, the overall idea is quite clear.

Now let’s move on and see what the method in run does that’s interesting

conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
conf.set("TableInputFormat.SCAN_ROW_START", start)
conf.set("TableInputFormat.SCAN_ROW_START", end)
val logDS = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat2], classOf[ImmutableBytesWritable], ClassOf [Result]). Map(tp2 => hbaseutil.resulttomap (Tp2._2)). Map(map => {LogCase(// child)caseDt = dt (map.get("time").toLocalDateTimeStr(),
            map.get("time").toLocalDate().toString
          ),
          `type` = map.get("type"),
          aid = map.get("aid"),
          uid = map.get("uid"),
          tid = map.get("tid"),
          ip = map.get("ip")
        )
      }).toDS()

    logDS.cache()
    logDS.createTempView("log"New PV().run() new UV().run()Copy the code

Start and end are the time ranges of logs to be queried

The Hbase time range data is converted to a log table in SparkSQL

You can use this log table in UV and PV calculations

Let’s take a look at what’s going on in these two classic indicators:

spark.sql(
      """ |SELECT | aid, | dt.date, | COUNT(1) as pv |FROM | log |GROUP BY | aid, | dt.date """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
Copy the code

Wow ran a look, big brother you this also write too simple bar

A normal PV algorithm, plus a foreachPartition operation, aggregates the result data upsert foreach row to our COMMON_report index table

Group by follows the dimension to be aggregated. The above is to count the PV of each article every day

Common_report = time+aid; pv = 0

The DDL of a table looks like this:

create table common_report
(
	id bigint auto_increment primary key,
	aid bigint not null,
	pv int default 0 null,
	uv int default 0 null,
	time date not null,
	constraint common_report_aid_time_uindex unique (aid, time)
);
Copy the code

It was all right.

DbClient. Upsert: dbClient. Upsert: dbClient. Upsert: dbClient. Upsert: dbClient.

INSERT INTO common_report (time, aid, pv)
VALUES ('2019-03-26'.'10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;
Copy the code

Big pig that UV is how to achieve? If a user comes in after the first time today, it can’t count again.

This is simple, you can use Redis to remove weights, but we are using Hbase, what do we use it to do, let’s take a look at how it is implemented in UV:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
    import spark.implicits._
    logDS
      .mapPartitions(partitionT => {
        val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
        val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
        partitionT
          .grouped(Consts.BATCH_MAPPARTITIONS)
          .flatMap { tList =>
            tList
              .zip(hbaseClient.incrments(tList.map(md5)))
              .map(tp2 => {
                val log = tp2._1
                log.copy(ext = EXT(tp2._2))
              })
          }
      }).createTempView("uvTable")

    spark.sql(
      """ |SELECT | aid, | dt.date, | COUNT(1) as uv |FROM | uvTable |WHERE | ext.render = 1 |GROUP BY | aid, | dt.date """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
Copy the code

Ext. Render = 1; spark. SQL = PV;

CACHE_TABLE is an Hbase intermediate table for storing users’ UV tags. It can be configured with a TTL of 3 days or 2 days.

create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY'= >'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy'.'KeyPrefixRegionSplitPolicy.prefix_length'= >'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20'.'40'.'60'.'80'.'a0'.'c0'.'e0']
Copy the code

What else?

Don’t panic, don’t panic, the pig slowly explained:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
Copy the code

You can pull the log out of the table by passing it as a parameter.

The following mapPartitions are interesting:

partitionT
    .grouped(1000)
        .flatMap { tList =>
          tList
            .zip(hbaseClient.incrments(tList.map(md5)))
            .map(tp2 => {
              val log = tp2._1
              log.copy(ext = EXT(tp2._2))
            })
        }
Copy the code

In fact, it is to process the data of each partition, that is, to convert the data. Every time we receive a piece of data, we will go to Hbase for incrment once, and the returned result is render. The number of incrment times for the user today is the corresponding number.

What’s the use of that? I directly take out the data from Hbase GET and then judge whether there is such a user. If there is no such user, it will be the first time today. Then I PUT the user into Hbase and mark it, so easy.

In fact, we did so at the beginning, and later found that it is better to write business things together in SQL, easy to maintain, and incrment benefits a lot, because it is with transactions, can be modified by multiple threads.

In addition, you have also found that GET and PUT are two request operations, which can not guarantee transactions. There are tens of millions of indicators missing a few pieces of data. You have no idea how hard I had to find them.

Render = 1 render = 1 render = 1 Render = 2; render = 2; render = 2; render = 2; render = 2;

Let’s see what the Incrments method does

def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
    if (incs.isEmpty) {
      Seq[Long]()
    } else {
      require(incs.head.length == 32, "pk require 32 length")
      val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
      val results = new Array[Object](convertIncs.length)
      table.batch(convertIncs.asJava, results)
      results.array.indices.map(
        ind =>
          Bytes.toLong(
            results(ind)
              .asInstanceOf[Result]
              .getValue(
                Bytes.toBytes(family),
                Bytes.toBytes(incs(ind).takeRight(24))
              )
          )
      )
    }
  }
Copy the code

This method implements batch processing of incrment. As we tested it in the production environment online, batch processing is hundreds of times better than single processing, so this is why it is listed in mapPartitions, because batch data is only converted in this method. ForeachPartition is a batch processing operation, foreach and map are a single operation can not be used, we have used in the output report to Mysql.

The big pig has written such a long article without knowing it

To close the calculator, just send redis a Stop message

RedisUtil().getResource.publish("computeListenerMessage"."stop")
Copy the code

No more copying code, no more appearing to be supported by code.

Welfare complete project source code