1. Install the Scala plug-in

Flink provides apis based on Java language and Scala language respectively. If you want to use Scala language to develop Flink programs, you can install Scala plug-ins in IDEA to provide syntax hints, code highlighting and other functions. To open IDEA, click File => Settings => plugins in sequence to open the plug-in installation page, search for the Scala plug-in and install it. After the installation is complete, restart IDEA to take effect.

Second, Flink project initialization

2.1 Use the official Script to build

Flink supports the use of Maven and Gradle build tools to build Flink projects based on Java language. Support to build Flink projects based on Scala language using SBT and Maven build tools. Maven is used as an example because it supports both Java language and Scala language project builds. Note that Flink 1.9 only supports Maven 3.0.4 or later. Once Maven is installed, you can build projects in one of the following ways:

1. Build directly from Maven Archetype

Use the following MVN statements directly to build, and then follow the interactive information prompt, input groupId, artifactId, package name and other information and wait for initialization to complete:

$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.9.0Copy the code

Note: If you want to create a project based on the Scala language, simply replace flink-Quickstart-java with Flink-Quickstart-Scala, as described below.

2. Use the official script to quickly build

To facilitate the initialization of the project, a quick build script is provided, which can be invoked directly with the following command:

$The curl https://flink.apache.org/q/quickstart.sh | 1.9.0 bash - s
Copy the code

This mode can be initialized by running maven archetype. The script content is as follows:

PACKAGE=quickstart mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -darchetypeartifactid =flink-quickstart-java \ -darchetypeversion =${1:-1.8.0} \ -dgroupid =org.myorg.quickstart \ -dartifactid =$PACKAGE \ -dversion =0.1 \ -dpackage =org.myorg.quickstart \ -dinterActivemode =falseCopy the code

It can be seen that the groupId, artifactId, version and other information are specified in this method.

2.2 Using IDEA for Construction

If you are using IDEA, you can go to the project creation page and select Maven Flink Archetype to initialize the project:

If your IDEA doesn’t have Archetype, you can ADD it by clicking on “ADD Archetype” in the upper right corner and fill in the information you need from the Archetype: Generate statement. Click OK to save this Archetype and it will always be in your IDEA. After that, when you create a project, just select this Archetype:

Check Flink Archetype and click the NEXT button. Everything will be the same as normal Maven projects.

Iii. Project structure

3.1 Project Structure

The auto-generated project structure after creation is as follows:

Where, BatchJob is the sample code for batch processing, the source code is as follows:

import org.apache.flink.api.scala._

object BatchJob {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
      ....
    env.execute("Flink Batch Scala API Skeleton")}}Copy the code

GetExecutionEnvironment indicates the execution environment of the batch processing. If the batch is executed locally, the local execution environment is obtained. If you run it on a cluster, the result is the cluster’s execution environment. If you want to obtain the flow process execution environment, then only need to replace ExecutionEnvironment StreamExecutionEnvironment, the corresponding code sample in StreamingJob:

import org.apache.flink.streaming.api.scala._

object StreamingJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
      ...
    env.execute("Flink Streaming Scala API Skeleton")}}Copy the code

Note that env.execute() is required for stream processing projects, otherwise the stream handler will not be executed, but is optional for batch projects.

3.2 Main Dependencies

Projects created based on Maven’s skeleton provide the following core dependencies: Flink-Scala is used to support the development of batch programs; Flink-streaming-scala is used to support the development of stream handlers; Scala-library is used to provide the class libraries required by the Scala language. If the Java language is selected when using Maven skeleton creation, flink-Java and Flink-Streaming-Java dependencies are provided by default.

<! -- Apache Flink dependencies -->
<! -- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<! -- Scala Library, provided by Flink as well. -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    <scope>provided</scope>
</dependency>
Copy the code

Note that the scope tags for the above dependencies are all identified as provided, which means that none of these dependencies will be pushed into the final JAR package. Because these dependencies are already provided in Flink’s installation package, located in its lib directory called flink-dist_*.jar, it contains all of Flink’s core classes and dependencies:

The scope tag being identified as provided causes you to throw a ClassNotFoundException when you start the project in IDEA. For this reason, the following profile configuration is also automatically generated when creating a project with IDEA:

<! -- This profile helps to make things run out of the box in IntelliJ -->
<! -- Its adds Flink's core classes to the runtime class path. -->
<! -- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
    <profile>
        <id>add-dependencies-for-IDEA</id>

        <activation>
            <property>
                <name>idea.version</name>
            </property>
        </activation>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </profile>
</profiles>
Copy the code

In the profile with the id add-dependencies-for-IDEA, all the core dependencies are marked as compile. You don’t need to change any of the code, just check the profile in the Maven panel of IDEA. To run the Flink project directly in IDEA:

Fourth, word frequency statistics case

After the project is created, you can first write a simple word frequency statistics case to try to run the Flink project. The following uses the Scala language as an example to introduce the programming example of the stream processor and the batch processor respectively:

4.1 Batch Processing Example

import org.apache.flink.api.scala._

object WordCountBatch {

  def main(args: Array[String]): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = benv.readTextFile("D:\\wordcount.txt")
    dataSet.flatMap { _.toLowerCase.split(",")}
            .filter (_.nonEmpty)
            .map { (_, 1) }
            .groupBy(0)
            .sum(1)
            .print()
  }
}
Copy the code

The contents of wordcount.txt are as follows:

a,a,a,a,a
b,b,b
c,c
d,d
Copy the code

This machine does not need to configure any other Flink environment, directly run the Main method, the result is as follows:

4.2 Examples of Flow processing

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCountStreaming {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229".9999.'\n')
    dataStream.flatMap { line => line.toLowerCase.split(",") }
              .filter(_.nonEmpty)
              .map { word => (word, 1) }
              .keyBy(0)
              .timeWindow(Time.seconds(3))
              .sum(1)
              .print()
    senv.execute("Streaming WordCount")}}Copy the code

To enable the port service, run the following command:

nc -lk 9999
Copy the code

You can then enter the test data to observe how the flow handler works.

Use the Scala Shell

For everyday Demo projects, if you don’t want to constantly launch IDEA to see the test results, you can use the Scala Shell just like Spark to run the program, which is more intuitive and saves time for daily learning. The Flink installation package can be downloaded as follows:

https://flink.apache.org/downloads.html
Copy the code

Most versions of Flink are available as Scala 2.11 and Scala 2.12 installments:

After downloading the Scala Shell, decompress it. The Scala Shell is in the bin directory of the installation directory. Run the following command to start the Scala Shell in local mode:

./start-scala-shell.sh local
Copy the code

Once the command line is started, it provides a runtime environment for batch processing (BENv and BTENv) and stream processing (senv and STENV). You can run Scala Flink directly, as shown in the following example:

Finally, a common exception: the version of Flink I’m using here is 1.9.1, which will throw the following exception on startup. According to the official instructions, all Scala 2.12 installation packages do not support the Scala Shell, so if you want to use the Scala Shell, you have to choose the Scala 2.11 installation package.

[root @ hadoop001 bin] #. / start the scala - shell. Sh local error: cannot find or unable to load the main class org. Apache. Flink. API. Scala. FlinkShellCopy the code