The data set

Suppose we have two streaming datasets in different Kafka topics (source, target), and we need to know the data quality of the target dataset based on the source dataset.

For simplicity, assume that the data for both topics are JSON strings, as shown below

{"id": 1."name": "Apple"."color": "red"."time": "The 2018-09-12 _06:00:00"}
{"id": 2."name": "Banana"."color": "yellow"."time": "The 2018-09-12 _06:01:00"}...Copy the code

Environment to prepare

Prepare the environment for the Apache Griffin measurement module, including the following components:

  • The JDK (+ 1.8)
  • Hadoop (server +)
  • The Spark (2.2.1 +)
  • Kafka (0.8.x)
  • Zookeeper (+ 3.5)

For details on how to configure these components, see Griffin /griffin-doc/deploy. This article assumes that the above environments have been configured. For information on version matching, see github.com/apache/grif…

Build Apache Griffin measurement module

1. Download the Apache Griffin source package here. 2. Decompress the source package.

unzip griffin-0.4. 0-source-release.zip
cd griffin-0.4. 0-source-release
Copy the code

3. Build the Apache Griffin JAR

mvn clean install
Copy the code

And move the built Apache Griffin JAR package to the project path

mv measure/target/measure-0.4. 0.jar <work path>/griffin-measure.jar
Copy the code

Data preparation

To get started quickly, we use the Kafka shell to create two Kafka themes (source, target) and generate data for them in JSON string format.

# create topics
# Note: it just works for kafka 0.8
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target
Copy the code

The data format looks something like this:

{"id": 1."name": "Apple"."color": "red"."time": "The 2018-09-12 _06:00:00"}
{"id": 2."name": "Banana"."color": "yellow"."time": "The 2018-09-12 _06:01:00"}
Copy the code

For topic sources and targets, there may be some different data from each other. You can download the demo data and run the./streaming-data.sh script to generate JSON string data files and generate them into a Kafka theme.

Define data quality metrics

Apache Griffin Environment Configuration Environment configuration file: env.json

{
  "spark": {
    "log.level": "WARN"."checkpoint.dir": "hdfs:///griffin/checkpoint"."batch.interval": "20s"."process.interval": "1m"."init.clear": true."config": {
      "spark.default.parallelism": 4."spark.task.maxFailures": 5."spark.streaming.kafkaMaxRatePerPartition": 1000."spark.streaming.concurrentJobs": 4."spark.yarn.maxAppAttempts": 5."spark.yarn.am.attemptFailuresValidityInterval": "1h"."spark.yarn.max.executor.failures": 120."spark.yarn.executor.failuresValidityInterval": "1h"."spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"type": "console"
    },
    {
      "type": "hdfs"."config": {
        "path": "hdfs:///griffin/persist"}}, {"type": "elasticsearch"."config": {
        "method": "post"."api": "http://es:9200/griffin/accuracy"}}]."griffin.checkpoint": [{"type": "zk"."config": {
        "hosts": "zk:2181"."namespace": "griffin/infocache"."lock.path": "lock"."mode": "persist"."init.clear": true."close.clear": false}}}]Copy the code

Define Griffin Data Quality (DQ) DQ profile: dq.json

{
  "name": "streaming_accu"."process.type": "streaming"."data.sources": [{"name": "src"."baseline": true."connectors": [{"type": "kafka"."version": "0.8"."config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
            },
            "topics": "source"."key.type": "java.lang.String"."value.type": "java.lang.String"
          },
          "pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}}]]."checkpoint": {
        "type": "json"."file.path": "hdfs:///griffin/streaming/dump/source"."info.path": "source"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-5m"."0"]."updatable": true}}, {"name": "tgt"."connectors": [{"type": "kafka"."version": "0.8"."config": {
            "kafka.config": {
              "bootstrap.servers": "kafka:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
            },
            "topics": "target"."key.type": "java.lang.String"."value.type": "java.lang.String"
          },
          "pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}}]]."checkpoint": {
        "type": "json"."file.path": "hdfs:///griffin/streaming/dump/target"."info.path": "target"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-1m"."0"]}}],"evaluate.rule": {
    "rules": [{"dsl.type": "griffin-dsl"."dq.type": "accuracy"."out.dataframe.name": "accu"."rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time"."details": {
          "source": "src"."target": "tgt"."miss": "miss_count"."total": "total_count"."matched": "matched_count"
        },
        "out":[
          {
            "type":"metric"."name": "accu"
          },
          {
            "type":"record"."name": "missRecords"}]}]},"sinks": ["CONSOLE"."HDFS"]}Copy the code

Quality of measured data

Submit the measurement job to Spark with the configuration file path as the parameter.

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default\ -driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
Copy the code

Report data quality indicators

The calculation log is available from the console, and results metrics are printed per minute as the job runs. Relevant results will be stored in HDFS: HDFS: / / / griffin/persist / /, and named after the timestamp in computing tasks listed in different directory.

Optimize data quality reporting

Data quality measures can be further improved based on the results, as well as actual business needs

For details about measurement parameters, see Griffin /griffin-doc/measure