This is the 7th day of my participation in the August Text Challenge.More challenges in August

Past: juejin. Cn/column / 6994…

12. Processing business data
public class BaseDBApp {

   /**
    * 业务数据 topic
    */
   private static final String TOPIC_BASE = "ods_base_db_m";
   private static final String BASE_GROUP_ID = "ods_dwd_base_log_db";

   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(4);

       //检查点,省略

       //拿到数据流
       DataStreamSource<String> dataStreamSource = env.addSource(KafkaUtil.ofSource(TOPIC_BASE, BASE_GROUP_ID));
       //转化格式,ETL
       SingleOutputStreamOperator<JSONObject> filteredDs = dataStreamSource
               .map(JSON::parseObject)
               .filter(new RichFilterFunction<JSONObject>() {
                   @Override
                   public boolean filter(JSONObject jsonObject) throws Exception {
                       return StringUtils.isNotBlank(jsonObject.getString("table"))
                               && jsonObject.getJSONObject("data") != null;
                   }
               });
       // Flink CDC 读取配置流
       DataStreamSource<String> ruleSource = env.addSource(MySQLSource.<String>builder()
               .hostname("hadoop3")
               .port(3306)
               .username("root")
               .password("密码")
               .databaseList("tmall_realtime")
               .tableList("tmall_realtime.table_process")
               .deserializer(new DebeziumDeserializationSchema<String>() {
                   @Override
                   public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
                       Struct value = (Struct) sourceRecord.value();
                       /*
                        * Struct{
                        *      after=Struct{
                        *              source_table=111,
                        *              operate_type=tses,
                        *              sink_type=1,
                        *              sink_table=1111
                        *      },
                        *      source=Struct{
                        *              db=tmall_realtime,
                        *              table=table_process
                        *      },
                        *      op=c
                        *      }
                        */
                       Struct source = value.getStruct("source");

                       JSONObject jsonObject = new JSONObject();
                       jsonObject.put("database", source.getString("db"));
                       jsonObject.put("table", source.getString("table"));
                       jsonObject.put("type", CDCTypeEnum.of(value.getString("op")).toString().toLowerCase());
                       Struct after = value.getStruct("after");
                       JSONObject data = new JSONObject();
                       for (Field field : after.schema().fields()) {
                           data.put(field.name(), after.get(field.name()));
                       }
                       jsonObject.put("data", data);

                       collector.collect(jsonObject.toJSONString());
                   }

                   @Override
                   public TypeInformation<String> getProducedType() {
                       return BasicTypeInfo.STRING_TYPE_INFO;
                   }
               })
               .startupOptions(StartupOptions.initial())
               .build()
       );

       //配置流状态
       MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

       //广播配置流状态
       BroadcastStream<String> broadcast = ruleSource.broadcast(mapStateDescriptor);

       //定义侧输出流,存放DIM数据
       OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dim_tag") {
       };

       //正式处理数据
       SingleOutputStreamOperator<JSONObject> dwdDs = filteredDs
               //合并 配置流
               .connect(broadcast)
               .process(new BroadcastProcessFunction<JSONObject, String, JSONObject>() {

                   private Connection connection;

                   @Override
                   public void open(Configuration parameters) throws Exception {
                       Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
                       connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
                   }

                   @Override
                   public void close() throws Exception {
                       connection.close();
                   }

                   /**
                    * 处理 ODS 数据流
                    * @param jsonObject
                    * @param readOnlyContext
                    * @param collector
                    * @throws Exception
                    */
                   @Override
                   public void processElement(JSONObject jsonObject, ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
                       //获取 配置流状态
                       ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);

                       String table = jsonObject.getString("table");
                       String type = jsonObject.getString("type");
                       // MaxWell 处理历史数据 insert 的操作类型是 bootstrap-insert 需要修正一些
                       if ("bootstrap-insert".equals(type)) {
                           type = "insert";
                           jsonObject.put("type", type);
                       }

                       //拿到配置
                       String key = table + ":" + type;
                       TableProcess tableProcess = broadcastState.get(key);

                       if (tableProcess != null) {
                           //目标表放进去
                           jsonObject.put("sink_table", tableProcess.getSinkTable());
                           jsonObject.put("sink_pk", tableProcess.getSinkPk());
                           //过滤字段
                           HashSet<String> columnSet = Sets.newHashSet(tableProcess.getSinkColumns().split(","));
                           jsonObject.getJSONObject("data").entrySet().removeIf(e -> !columnSet.contains(e.getKey()));
                           //发送位置
                           String sinkType = tableProcess.getSinkType();
                           if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
                               collector.collect(jsonObject);
                           } else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
                               readOnlyContext.output(dimTag, jsonObject);
                           }
                       } else {
                           //没有配置
                           System.out.println("NO this Key in TableProcess" + key);
                       }
                   }

                   /**
                    * 处理 配置流 数据
                    * @param s
                    * @param context
                    * @param collector
                    * @throws Exception
                    */
                   @Override
                   public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
                       JSONObject jsonObject = JSON.parseObject(s);
                       TableProcess tableProcess = jsonObject.getObject("data", TableProcess.class);
                       String sourceTable = tableProcess.getSourceTable();
                       String operateType = tableProcess.getOperateType();
                       String sinkType = tableProcess.getSinkType();
                       String sinkPk = StringUtils.defaultString(tableProcess.getSinkPk(), "id");
                       String sinkExt = StringUtils.defaultString(tableProcess.getSinkExtend());
                       String sinkTable = tableProcess.getSinkTable();
                       String sinkColumns = tableProcess.getSinkColumns();

                       //如果是维度数据,需要通过Phoenix创建表
                       if (TableProcess.SINK_TYPE_HBASE.equals(sinkType) && CDCTypeEnum.INSERT.toString().toLowerCase().equals(operateType)) {
                           StringBuilder sql = new StringBuilder();
                           sql.append("create table if not exists ").append(TmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append(" ( ");
                           String[] columns = sinkColumns.split(",");
                           for (int i = 0; i < columns.length; i++) {
                               String column = columns[i];
                               if (sinkPk.equals(column)) {
                                   sql.append(column).append(" varchar primary key ");
                               } else {
                                   sql.append("info.").append(column).append(" varchar ");
                               }
                               if (i < columns.length - 1) {
                                   sql.append(" , ");
                               }
                           }
                           sql.append(" ) ")
                                   .append(sinkExt);
                           System.out.println(sql);
                           try (PreparedStatement preparedStatement = connection.prepareStatement(sql.toString())) {
                               preparedStatement.execute();
                           }
                       }

                       //写入状态进行广播
                       BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
                       broadcastState.put(sourceTable + ":" + operateType, tableProcess);
                   }
               });

       //处理 DIM 侧输出流 存入 HBase
       dwdDs.getSideOutput(dimTag).addSink(new RichSinkFunction<JSONObject>() {

           private Connection connection;

           @Override
           public void open(Configuration parameters) throws Exception {
               Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
               connection = DriverManager.getConnection(TmallConfig.PHOENIX_SERVER);
           }

           @Override
           public void close() throws Exception {
               connection.close();
           }

           @Override
           public void invoke(JSONObject value, Context context) throws Exception {
               JSONObject data = value.getJSONObject("data");
               StringBuilder sql = new StringBuilder();
               //目标表名
               String sinkTable = value.getString("sink_table");
               sql.append("upsert into ").append(TmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append(" (")
                       .append(StringUtils.join(data.keySet(), ","))
                       .append(") ")
                       .append("values( '")
                       .append(StringUtils.join(data.values(), "','"))
                       .append("' ) ");
               //入库
               try (PreparedStatement preparedStatement = connection.prepareStatement(sql.toString())) {
                   preparedStatement.execute();
                   // 默认不自动提交,需要手动提交
                   connection.commit();
               }
               //删除缓存
               String type = value.getString("type");
               if ("update".equals(type) || "delete".equals(type)) {
                   String sinkPk = value.getString("sink_pk");
                   DimUtil.delDimCache(sinkTable, data.getString(sinkPk));
               }
           }
       });

       // 处理 DWD 主流数据,存入 Kafka
       dwdDs.addSink(KafkaUtil.ofSink(new KafkaSerializationSchema<JSONObject>() {
           @Override
           public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
               return new ProducerRecord<>(jsonObject.getString("sink_table"), jsonObject.getJSONObject("data").toJSONString().getBytes(StandardCharsets.UTF_8));
           }
       }));

       //执行
       env.execute("db_ods_to_dwd");
   }
}
Copy the code
13. Process log data
Public class BaseLogApp {private static final String TOPIC_BASE = "ods_base_log"; private static final String BASE_GROUP_ID = "ods_dwd_base_log_app"; Private static final String TOPIC_START = "dwd_start_log"; Private static final String TOPIC_PAGE = "dwd_page_log"; private static final String TOPIC_PAGE = "dwd_page_log"; Private static final String TOPIC_DISPLAY = "dwd_display_log"; private static final String TOPIC_DISPLAY = "dwd_display_log"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreamSource<String> DataStreamSource = env.addSource(kafkautil. ofSource(TOPIC_BASE, BASE_GROUP_ID)); // Process the new user field, To prevent the front-end data error SingleOutputStreamOperator < JSONObject > midWithNewFlagDS = dataStreamSource. The map (JSON: : parseObject). KeyBy (j - > j.getJSONObject("common").getString("mid")) .map(new RichMapFunction<JSONObject, JSONObject>() { private ValueState<String> newMidDateState; private SimpleDateFormat yyyyMMdd; @Override public void open(Configuration parameters) throws Exception { newMidDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("newMidDateState", String.class)); yyyyMMdd = new SimpleDateFormat("yyyyMMdd"); } @Override public JSONObject map(JSONObject jsonObject) throws Exception { String isNew = jsonObject.getJSONObject("common").getString("is_new"); If ("1".equals(isNew)) {String newMidDate = newMidDatestate.value (); String ts = yyyyMMdd.format(new Date(jsonObject.getLong("ts"))); if (StringUtils.isEmpty(newMidDate)) { newMidDateState.update(ts); } else { if (! newMidDate.equals(ts)) { jsonObject.getJSONObject("common").put("is_new", "0"); } } } return jsonObject; }}); OutputTag<String> startTag = new OutputTag<String>("startTag") {}; // OutputTag<String> displayTag = new OutputTag<String>("displayTag") {}; / / determine different types of logs Output to the various flow SingleOutputStreamOperator < String > pageDStream = midWithNewFlagDS. The process (new ProcessFunction<JSONObject, String>() { @Override public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception { JSONObject start = jsonObject.getJSONObject("start"); if (start ! = null) { context.output(startTag, jsonObject.toJSONString()); } else { collector.collect(jsonObject.toJSONString()); JSONArray displays = jsonObject.getJSONArray("displays"); if (! CollectionUtil.isNullOrEmpty(displays)) { for (int i = 0; i < displays.size(); i++) { JSONObject displaysJsonObject = displays.getJSONObject(i); String pageId = jsonObject.getJSONObject("page").getString("page_id"); displaysJsonObject.put("page_id", pageId); context.output(displayTag, displaysJsonObject.toJSONString()); }}}}}); // write kafka pagedstream.addsink (kafkautil.ofsink (TOPIC_PAGE)); pageDStream.getSideOutput(startTag).addSink(KafkaUtil.ofSink(TOPIC_START)); pageDStream.getSideOutput(displayTag).addSink(KafkaUtil.ofSink(TOPIC_DISPLAY)); env.execute("log_ods_to_dwd"); }}Copy the code

Next period: DWM layer

The column continues to be updated at 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻