Synthesize RDD data operations

  • Requirements:

    Count 5 families of customers

  • Data source: customers. CSV

    Upload the file to HDFS: /input/customer

We need to count the top five most frequent surnames in the third column.

1. Build the Spark project based on IDEA

  • Creating a New Maven Project
  • Change the Java file name in main to Scala
  • Add the Scala compilation tool File->Project Structure->Libraries->+

1 Add the pom.xml dependency


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>day05RDD</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <repositories>
        <repository>
            <id>ali-maven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
        <repository>
            <id>central</id>
            <name>Maven Repository Switchboard</name>
            <layout>default</layout>
            <url>http://repo1.maven.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>The spark - core_2. 12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>The spark - sql_2. 12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <! --streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>The spark - streaming_2. 12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <! -- MySQL driver -->
        <! -- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.18</version>
        </dependency>
    </dependencies>

</project>
Copy the code

The demo test

package cn.com.chinahitech


import org.apache.log4j.{Level.Logger}
import org.apache.spark.sql.SparkSession


/** * @author SJ * @date 2021/2/1 */
object Hello {
  def main(args: Array[String) :Unit = {
    // Set the log display level to filter logs
    Logger.getLogger("org").setLevel(Level.ERROR)
    // 1 Create spark
    val spark = SparkSession.builder().master("local[2]").appName("Hello").getOrCreate()// Get a Spark object
    val sc=spark.sparkContext
    // 2 Create an RDD dataset
    val rdd=sc.parallelize(1 to 9)
    // 3 Perform conversion, action operation
    rdd.map(data => data*2).collect().foreach(println(_))
  }

}

Copy the code

2. Programming implementation

Five families are counted and the final results are stored in mysql

The port of the dataset


9870 is the access address of the web page configured in the file HDFS -site. XML, the default port is 50070, 9870 is changed by ourselves later

package cn.com.chinahitech

import org.apache.log4j.{Level.Logger}
import org.apache.spark.sql.SparkSession

/** * @author SJ * @date 2021/2/1 */

object CustomerAnalysis {
  def main(args: Array[String) :Unit = {
    // Set the log display level to filter logs
    Logger.getLogger("org").setLevel(Level.ERROR)
    // 1 Create spark
    val spark = SparkSession.builder().master("local[2]").appName("Hello").getOrCreate()// Get a Spark object
    val sc=spark.sparkContext

    // Load data from HDFS
    val customerRDD = sc.textFile("HDFS: / / 192.168.0.104:9000 / input/customer/customers. CSV")

    // Check to see if it works
    customerRDD.take(3).foreach(println(_))
  }

}

Copy the code

The data came out, proving that there was no problem accessing the data

Now each row of the data is a string separated by commas. We need to extract the third column, the last name column, and each column is surrounded by double quotation marks. We need to remove the double quotation marks before we specify.

Val tempRDD= customerrdd.map (line=>line.split(","))// Each line of string becomes Array[string] // Extract the second value of the Array tempRDD.map(arr=>arr(2))Copy the code

All together is (operator likes vertical placement)

    //3 Process the source data set
    val tempRDD=customerRDD
      .map(line=>line.split(","))// Each line of string becomes Array[string]
      .map(arr=>arr(2))                 / / Array [String] = > last name

    / / have a look
    tempRDD.take(3).foreach(println(_))
Copy the code

Get rid of the double quotes

    //3 Process the source data set
    val tempRDD=customerRDD
      .map(line=>line.split(","))// Each line of string becomes Array[string]
      .map(arr=>arr(2).replace("\" ".""))    //Array[String]=> Last name, and replace the double quotation marks with empty
Copy the code

And then we convert

    //3 Process the source data set
    val tempRDD=customerRDD
      .map(line=>line.split(","))// Each line of string becomes Array[string]
      .map(arr=>arr(2).replace("\" ".""))    //Array[String]=> Last name, and replace the double quotation marks with empty
      .map(word=>(word,1))  
      .reduceByKey(_+_)       / / code
Copy the code

  //4 Perform data position swap and sort
    val resultRDD=tempRDD.map(data=>data.swap) ////(surname, quantity) => (quantity, surname)
      .sortByKey(false)// in descending order
      .map(data=>data.swap) // //(quantity, surname)=>(surname, quantity)
      
Copy the code

CREATE TABLE `customer` (
  `name` varchar(255) NOT NULL,
  `count` int DEFAULT NULL,
  PRIMARY KEY (`name`)
) ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Copy the code

Mysql creates tables to store data

Problems arise:

Case class People(name: String, age: Int) definition needs to be placed outside the scope of the method (i.e. Java member variable position)

Blog.csdn.net/zhao8974261…

    //5 Convert RDD to DataFrame (containing data + data format) DataFrame=RDD+Schema
    // First you need to conduct a package
    import spark.implicits._
    // The RDD we have now is in this format (String,int), we want to convert this data to formatted
    // We need to use the sample class. The purpose of the sample class is to implement the encapsulation conversion of data format
    //[The sample class is defined outside the method]

    val resultDF=resultRDD.map(data=>Customer(data._1,data._2)).toDF()
    // View the format of the dataset
    resultDF.printSchema()
    // View the data. 20 entries are displayed by default
    resultDF.show()
    
Copy the code

// Write a member method to save the dataframe to mysql

  @param: @param: @param: @param: @param: @param: @param: @param: @param: @param: @param: @param: @param: @param: Execute the stored output message */
  def saveToMySQL(df: DataFrame,tableName:String,msg:String) :Unit= {val url="JDBC: mysql: / / 192.168.0.104:3306 / customerdb? characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false"
    val props = new java.util.Properties()
    props.put("user"."root")
    props.put("password"."ROOTroot_1")
    props.put("driver"."com.mysql.cj.jdbc.Driver")

    // Perform write
    // The modes are overwrite and append
    df.repartition(1).write.mode("overwrite").jdbc(url,tableName,props)
    // Displays a prompt message
    println(msg)

  }
Copy the code

Call it in main

    //6 Execute the storage to mysql
    saveToMySQL(resultDF,"customer"."Write successful!")
Copy the code

Package the upload JAR to Linux

Delete the top table

1.

Be clear about unnecessary dependencies and keep this one

3. Compile the project to generate bytecode

4. Pack your homework


Upload the file and execute it

[hadoop@hadoop01 ~]$CD /opt/data/ [hadoop@hadoop01 data]$ll Total usage 37820 drwxrwxr-x 4 Hadoop Hadoop 173 January 26 14:091_22 -rw-rw-r-- 1 hadoop Hadoop 34723590 1月 22 22:22 bjmarket-0.0.1 -snapshot. jar -rw-rw-r-- 1 Hadoop Hadoop 1177355 1月 31 Customers. CSV -rw-rw-r-- 1 hadoop hadoop 2358298 2月 1 21:34 day05rdd.jar drwxrwxr-x 2 Hadoop Hadoop 87 1月 26 20:55 Film_rating drwxrwxr-x 2 Hadoop hadoop 202 1月 26 13:26 hive_dir -rw-rw-r-- 1 hadoop Hadoop 8857 1月 22 13:39 market-.sql -rw------- 1 hadoop hadoop 7257 January 22 22:56 nohup.out -rw-rw-r-- 1 hadoop hadoop 439966 January 21 21:01 world.sql [hadoop@hadoop01 data]$spark-submit day05rdd. jar 21/02/01 21:38:40 WARN NativeCodeLoader Unable to load native-hadoop library for your platform... Using builtin- Java classes where applicable [hadoop@hadoop01 data]$Copy the code

You can also specify the main class execution, CPU, and memory as the teacher does

Error for this

It’s a configuration problem

Just reboot the computer when you’re done