KAFKA writes producers and consumers, Scala

Project description

The experimental project

Objective: To display real-time streaming data calculation in flink HA high availability mode

Architecture: Flink + Kafka + ZooKeeper + Hadoop + Web visualization (to be determined)

The data format

{” ID “: 978,” from “, “fujian province”, “to” : “in qinghai province”, “time”, “the 2021-06-21 14:52:47”, “value” : 1458}

Id indicates the serial number of data generation (cumulative), time indicates the data generation time (the default value is the data generation time), from and to indicates the randomly generated province, and value indicates the randomly generated value

To be continued, random provinces and cities have not yet opened the path of Flink

For now, single-broker mode is only on Master 10.0.20.181, and multi-nodes will be tried later.

process

Create a Maven project in IDEA

Install ZooKeeper and Kafka on the server and start ZK and Kafka

Create a topic in Kafka, open a consumer, and start maven. Listen for port 9092

pox.xml

<? xml version="1.0" encoding="UTF-8"? ><project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.mn</groupId>
    <artifactId>projet_test_juin</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - scala_2. 11</artifactId>
            <version>1.12.3</version>
        </dependency>
        <! -- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - scala_2. 11</artifactId>
            <version>1.12.3</version>
            <! --<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - clients_2. 11</artifactId>
            <version>1.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>11 kafka_2.</artifactId>
            <version>Against 2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>Against 2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - connector - kafka_2. 11</artifactId>
            <version>1.12.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <! This plugin is used to compile Scala code into a class file.
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <! -- Declare the compile phase for maven -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.test.MainClassName</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Copy the code

Maven IDEA Flink and Kafka dependencies. This program does not involve Flink dependencies. You can ignore flink dependencies when writing dependencies and add Kafka

producers

package kafka

import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import scala.util.Random

object KafkaProducer {

  def main(args: Array[String]): Unit = {

    val props = new Properties()
    props.put("bootstrap.servers"."10.0.20.181:9092")  // Broker cluster address
    props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer")   // Can be serialized
    props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer")  // value serializes

    val rand = new Random()
    val max_records = 1000 / / 1000 data

    val producer = new KafkaProducer[String.String](props) // Create a kafka object
    val startTime = System.currentTimeMillis();// milliseconds

    for (i <- 1 to max_records) {

      val province = List("Beijing"."Tianjin"."Hebei Province"."Shanxi Province"."Inner Mongolia Autonomous Region"."Liaoning".Jilin Province."Heilongjiang province"."Shanghai".Jiangsu Province.Zhejiang Province.Anhui Province."Fujian"."Jiangxi"."Shandong Province".Henan Province.Hubei Province."Hunan Province"."Guangdong province".Guangxi Zhuang Autonomous Region.Hainan Province."Chongqing".Sichuan Province."Guizhou".Yunnan Province."Tibet Autonomous Region"."Shaanxi Province".Gansu Province."Qinghai Province"."Ningxia Hui Autonomous Region"."Xinjiang Uygur Autonomous Region".Taiwan Province.Hong Kong Special Administrative Region)

      //val city = List(" 中 国 ", "中 国 "," 中 国 ")
      / / the from provincial + city
      var from = province(rand.nextInt(33))
      / / + city to province
      var to = province(rand.nextInt(33))
      var value = rand.nextInt(5000)
      //time
      val time = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date)
      //println(i + "," +from + "," + to + "," + time + "," + value)
      // Convert to json string
      val message = new JSONObject()
      message.put("ID", i);
      message.put("from".from);
      message.put("to", to);
      message.put("time", time);
      message.put("value", value);
      //println(message)
      Thread.sleep(100 + rand.nextInt(100))

      val data = new ProducerRecord[String.String] ("test_1",message.toString() )

      val futureCallback=producer.send(data)
      val answer = futureCallback.get()
      val topic =answer.topic();
      println("messege has sended : "+topic)
    }
    System.out.println("send per second: "+max_records*1000/(System.currentTimeMillis()-startTime))
    producer.close()
  }
}
Copy the code

consumers

package kafka

import java.time.Duration
import java.util.concurrent._
import java.util.{Collections, Properties}
import scala.collection.JavaConversions._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

/** * Create the entity class */ that consumes the message
class KafkaProducerConsumer(val brokers:String.val groupId:String.val topic:String ) {  / / associated classes
  val props  = createCunsumerConfig(brokers,groupId)
  val consumer = new KafkaConsumer[String.String](props)

  def shutdown() = {  // Close the function
    if(consumer ! =null)
      consumer.close()
  }
  // Define a method in a class
  def createCunsumerConfig(brokers: String.groupId: String): Properties ={  // Define a method in a class
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers) // Set the kafaka cluster address
    props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") // Set automatic commit
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    return props
  }
  // Define a method that returns no value
  def run()={
    consumer.subscribe(Collections.singletonList(this.topic)) // A topic array

    Executors.newSingleThreadExecutor.execute(new Runnable {
      override def run(): Unit = {  // Override the run method
        while (true) {
          val records = consumer.poll(Duration.ofSeconds(1000)) // One pull per second
          for (record <- records) {
            System.out.println("Received message: (" + record.key() + "," + record.value() + ") at offset " + record.offset())
          }
        }
      }
    })
  }
}
object KafkaProducerConsumer extends App {  // Associated object
  override def main(args: Array[String]): Unit = {
    val brokers = "10.0.20.181:9092"
    val groupId ="org.mn"
    val topic ="test_1"
    val test = new KafkaProducerConsumer(brokers,groupId,topic)
    test.run()
  }
}
Copy the code

There are a lot of great blogs

  • Blog.csdn.net/u014635374/…

  • Blog.csdn.net/xiaoxaoyu/a…

  • www.cnblogs.com/Springmoon-…

  • Blog.csdn.net/weixin_3939…