Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

Flink oxknife pilot series catalogue

  • Flink -Flink cluster operation principle and deployment, and Yarn operation mode
  • Flink knife test -Flink Window type and the use of principle case combat
  • Flink Broadcast and Accumulators application case
  • Flink and SparkStreaming Accumulators
  • Flink -Flink Distributed Cache application case
  • Flink -Flink state management and checkPoint data fault tolerance mechanism in-depth analysis
  • Flink test -Flink Window analysis and Watermark in-depth analysis of out-of-order data mechanism
  • Flink Restart Strategies -Flink Restart Strategies
  • Flink CheckPoint state point recovery and savePoint mechanism comparison and analysis
  • Let’s go to watch the 2018 Chinese Super League
  • Flink -Flink based on kafka-connector data stream fault-tolerant playback mechanism and code case combat
  • [Flink test -Flink DataStreamAPI and DataSetAPI application case]
  • [Flink test -Flink Parallel and A Deep analysis of the principle of Slots relationship]
  • Flink cluster HA Configuration and high availability mechanism in-depth Analysis
  • [Flink test -Flink batch processing and stream processing case in-depth analysis]
  • [Flink knife Test -Flink comprehensive application case practice and in-depth analysis of vertical business]

1 Kafka-connector again intimate hand in hand Flink

  • The partition mechanism in Kafka is deeply combined with the parallelism mechanism of Flink to achieve data recovery.
  • Kafka can serve as the source and sink of Flink, the cow is here.
  • If the task fails, restore the application by setting kafka’s offset

Review Spark Streaming using techniques for Kafka

// set the checkpoint directory ssc.checkpoint("./streaming_checkpoint") ConfigurationManager custom method) val broker_list = ConfigurationManager. Config. Get string (kafka. Broker. "the list") val switchable viewer = ConfigurationManager. Config. Get string (" kafka. Switchable viewer ") / / kafka consumer configuration val kafkaParam = Map (" the bootstrap. The servers "- > Broker_list,// Used to initialize address for link to cluster "key.deserializer" -> classOf[StringDeserializer], "Value. Deserializer" -> classOf[StringDeserializer], // Is used to identify which consumer group the customer belongs to. "Latest" is automatically reset to the latest offset "auto-.offset. reset" -> "latest", // If true, The offset of the consumer is automatically submitted in the background "enable.auto.mit" -> (false: Java.lang.Boolean) // Create a DStream and return the input data received // LocationStrategies: According to the given topic and the cluster address create consumer / / LocationStrategies PreferConsistent: continuous distribution partition between all Executor / / ConsumerStrategies: Select how to create and configure the Driver and Executor Kafka Consumer / / ConsumerStrategies Subscribe: Subscribe to a series of theme val adRealTimeLogDStream = KafkaUtils. CreateDirectStream [String, String] (SSC, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))Copy the code

3. Flink Kafka Consumer

3.1 Theoretical time

  • SetStartFromGroupOffsets ()

    By default, the last offset saved is read. If the application is started for the first time and the last offset cannot be read, the data is consumed based on the value of auto-.offset. Reset

  • SetStartFromEarliest () starts consumption from the earliest data, ignoring the stored offset information

  • SetStartFromLatest () consumes from the latest data, ignoring the stored offset information

  • setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)

  • When checkpoint is enabled, KafkaConsumer periodically stores the Kafka offset information along with the status information of other operators. When a job fails and restarts, Flink recovers data from the last checkpoint and consumes data in Kafka again.

  • Checkpoint env.enablecheckPointing (5000); // Checkpoint every 5s

  • Kafka Consumers Offset auto-commit has the following two methods, which can be differentiated according to whether job checkpoint is enabled:

    (1) Checkpoint: Set the following two parameters

    enable.auto.commit

    auto.commit.interval.ms

    (2) Checkpoint enabled: Offset is saved only when Checkpoint is executed. This ensures that kafka’s offset is consistent with its Checkpoint status offset. You can set it with this parameter

    setCommitOffsetsOnCheckpoints(boolean)

    This parameter defaults to true. The offset is committed at checkpoint, at which point the automatic commit mechanism in Kafka is ignored

3.2 Actual Case

Dependency introduction: < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - statebackend - rocksdb_2. 11 < / artifactId > < version > 1.6.1 < / version > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink connector - kafka - 0.11 _2. 11 < / artifactId > < version > 1.6.1 < / version > < / dependency > < the dependency > < the groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 0.11.0.3 < / version > < / dependency > Case study: Public class StreamingKafkaSource {public static void main(String[] args) throws Exception {// Obtain the operating environment of Flink StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); / / checkpoint configuration env. EnableCheckpointing (5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION); / / set statebackend / / env. SetStateBackend (new RocksDBStateBackend (" HDFS: / / hadoop100:9000 / flink/checkpoints ", true)); String topic = "kafkaConsumer"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","SparkMaster:9092"); prop.setProperty("group.id","kafkaConsumerGroup"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets(); // Default consumer policy DataStreamSource<String> text = env.addsource (myConsumer); text.print().setParallelism(1); env.execute("StreamingFromCollection"); }}Copy the code

4. Flink Kafka Producer

4.1 Theoretical time

  • Kafka Producer fault tolerance -Kafka 0.9 and 0.10

  • FlinkKafkaProducer09 and FlinkKafkaProducer010 provide at-least-once semantics. FlinkKafkaProducer09 and FlinkKafkaProducer010 provide at-least-once semantics.

    setLogFailuresOnly(false)

    setFlushOnCheckpoint(true)

  • Note: It is recommended to change the number of retries of kafka producers.

  • Kafka 0.11 is fault-tolerant of KafkaProducer. If Flink checkpoint is enabled, it can provide exactly-once semantics for FlinkKafkaProducer011, but specific semantics need to be selected

    Semantic.NONE

    AT_LEAST_ONCE [default]

    Semantic.EXACTLY_ONCE

4.2 KafkaSink case actual combat

Public class StreamingKafkaSink {public static void main(String[] args) throws Exception {// Obtain the Flink operating environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); / / checkpoint configuration env. EnableCheckpointing (5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION); / / set statebackend / / env. SetStateBackend (new RocksDBStateBackend (" HDFS: / / SparkMaster: 9000 / flink/checkpoints ", true)); DataStreamSource<String> text = env.socketTextStream("SparkMaster", 9001, "\n"); String brokerList = "SparkMaster:9092"; String topic = "kafkaProducer"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers",brokerList); // Prop.setProperty ("transaction.timeout.ms",60000*15+""); The second solution is to set the maximum transaction timeout for Kafka, mainly kafka configuration file Settings. //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); Kafkaproducer011 <String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); text.addSink(myProducer); env.execute("StreamingFromCollection"); }}Copy the code

5 conclusion

Kafka is indispensable, there is a lot more to say about Kafka, please refer to my Kafka business environment combat series.

Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

  • Kafka business environment combat – Kafka production environment planning
  • Kafka Business Environment in action – Kafka producer and consumer throughput test
  • Kafka business environment combat – Kafka Producer parameter setting and parameter tuning suggestions
  • Kafka business environment combat – Kafka cluster management important operation instructions operation and maintenance book
  • Kafka cluster Broker parameter Settings and tuning guidelines
  • Kafka Business Environment – Kafka Producer synchronous and asynchronous message sending and transaction idempotency case application
  • Kafka Polling mechanism and consumer group rebalanced zoning strategy analysis
  • Kafka Rebalance and Consumer Rebalance
  • Kafka business environment – Kafka cluster message format V1 version to V2 version smooth transition details
  • Kafka ISR design and an in-depth analysis of the replica synchronization mechanism between kafka ISR and Leader Epoch
  • Kafka log index storage and Compact mechanism in-depth analysis
  • Kafka business environment combat – Kafka precise semantic EOS principle in-depth analysis]
  • Kafka message idempotence and transaction support mechanism
  • [Kafka business environment combat – Kafka cluster Controller election and responsibility design ideas architecture detail]
  • Kafka Cluster log file System design and retention mechanism and Compact in-depth study
  • [Kafka Business Environment combat – Kafka cluster Consumer Group state machine and Coordinaor management in-depth analysis]
  • [Kafka Commercial Environment practice – Kafka tuning process in throughput, persistence, low latency, availability and other indicators of compromise study]

Qin Kaixin in Shenzhen 20181127023