From building a big data environment to the pits encountered when executing WordCount

[TOC]

The background that

Recently (20 December 2020) to understand the big data related architecture and technology system.

Although it is just understanding, it is not necessary to build an environment and perform the corresponding job personally.

But technology. Is to rely on stupid efforts, little by little accumulation. We still have to do what we need to do.

So, I started with the environment (based on Docker) until I successfully executed a WordCount job based on YARN scheduling.

During this period, I encountered many pits and filled them one by one. It took about 10 hours.

I hope I can share this lesson with those who need it. It takes less time to complete the process.

Note: The local environment of the individual ismacOS Big Sur.

Based on thedocker composeBig data environment

Set up a big data environment by referring to Docker-Hadoop-Spark-Hive to quickly build your Big Data environment, adjusting some parameters for MAC OS.

There are mainly five documents as follows:

. ├ ─ ─ the copy - jar. Sh# Spark YARN support├ ─ ─ docker - compose. YmlDocker compose file├ ─ ─ the hadoop - hive. EnvEnvironment variable configuration├ ─ ─ run. Sh# Start script└ ─ ─ stop. Sh# Stop script
Copy the code

Note:mac osthedockerThere is a pit point where I can’t access the container directly from the host, which I useNetwork Problems and Solutions of Docker for Mac (New method 4)In method four.

Note: This needs to be configured on the hostdockerContainer correspondingipThat’s the guaranteejobSuccessful execution, and when the individual services are accessed by the host, the jump will not cause problems. The pit is deep,Take trample.

# switch_local

172.21.0.3 namenode
172.21.0.8 resourcemanager
172.21.0.9 nodemanager
172.21.0.10 historyserver
Copy the code

docker-compose.yml

version: '2' 
services:
  namenode:
    image: Bde2020 / hadoop namenode: 1.1.0 - hadoop2.8 - java8
    container_name: namenode
    volumes:
      - ~/data/namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - 50070: 50070
      - 8020: 8020
  resourcemanager:
    image: Bde2020 / hadoop - the resourcemanager: 1.1.0 - hadoop2.8 - java8
    container_name: resourcemanager
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - 8088: 8088
  historyserver:
    image: Bde2020 / hadoop - historyserver: 1.1.0 - hadoop2.8 - java8
    container_name: historyserver
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - 8188: 8188
  datanode:
    image: Bde2020 / hadoop - datanode: 1.1.0 - hadoop2.8 - java8
    depends_on: 
      - namenode
    volumes:
      - ~/data/datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    ports:
      - 50075: 50075
  datanode2:
    image: Bde2020 / hadoop - datanode: 1.1.0 - hadoop2.8 - java8
    depends_on: 
      - namenode
    volumes:
      - ~/data/datanode2:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    ports:
      - 50076: 50075
  datanode3:
    image: Bde2020 / hadoop - datanode: 1.1.0 - hadoop2.8 - java8
    depends_on: 
      - namenode
    volumes:
      - ~/data/datanode3:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    ports:
      - 50077: 50075
  nodemanager:
    image: Bde2020 / hadoop - nodemanager: 1.1.0 - hadoop2.8 - java8
    container_name: nodemanager
    hostname: nodemanager
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - 8042: 8042
  hive-server:
    image: Bde2020 / hive: 2.1.0 postgresql -- metastore
    container_name: hive-server
    env_file:
      - ./hadoop-hive.env
    environment:
      - "HIVE_CORE_CONF_javax_jdo_option_ConnectionURL=jdbc:postgresql://hive-metastore/metastore"
    ports:
      - "10000:10000"
  hive-metastore:
    image: Bde2020 / hive: 2.1.0 postgresql -- metastore
    container_name: hive-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    ports:
      - 9083: 9083
  hive-metastore-postgresql:
    image: Bde2020 / hive - metastore - postgresql: 2.1.0
    ports:
      - 5432: 5432
    volumes:
      - ~/data/postgresql/:/var/lib/postgresql/data
  spark-master:
    image: Bde2020 / spark - master: 2.1.0 - hadoop2.8 hive -- java8
    container_name: spark-master
    hostname: spark-master
    volumes:
      - ./copy-jar.sh:/copy-jar.sh
    ports:
      - 18080: 8080
      - 7077: 7077
    env_file:
      - ./hadoop-hive.env
  spark-worker:
    image: Bde2020 / spark - worker: 2.1.0 - hadoop2.8 hive -- java8
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
    ports:
      - "18081:8081"
    env_file:
      - ./hadoop-hive.env
Copy the code

hadoop-hive.env

HIVE_SITE_CONF_javax_jdo_option_ConnectionURL=jdbc:postgresql://hive-metastore-postgresql/metastore
HIVE_SITE_CONF_javax_jdo_option_ConnectionDriverName=org.postgresql.Driver
HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
HIVE_SITE_CONF_hive_metastore_uris=thrift://hive-metastore:9083
HIVE_SITE_CONF_hive_metastore_warehouse_dir=hdfs://namenode:8020/user/hive/warehouse

CORE_CONF_fs_defaultFS=hdfs://namenode:8020
CORE_CONF_fs_default_name=hdfs://namenode:8020
CORE_CONF_hadoop_http_staticuser_user=root
CORE_CONF_hadoop_proxyuser_hue_hosts=*
CORE_CONF_hadoop_proxyuser_hue_groups=*

HDFS_CONF_dfs_webhdfs_enabled=true
HDFS_CONF_dfs_permissions_enabled=false

YARN_CONF_yarn_log___aggregation___enable=true
YARN_CONF_yarn_resourcemanager_recovery_enabled=true
YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
YARN_CONF_yarn_timeline___service_enabled=true
YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
YARN_CONF_yarn_timeline___service_hostname=historyserver
YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle
Copy the code

run.sh

#! /bin/bash

# Start containerdocker-compose -f docker-compose.yml up -d namenode hive-metastore-postgresql docker-compose -f docker-compose.yml up -d  datanode datanode2 datanode3 hive-metastore docker-compose -f docker-compose.yml up -d resourcemanager docker-compose -f docker-compose.yml up -d nodemanager docker-compose -f docker-compose.yml up -d historyserver sleep 5 docker-compose -f docker-compose.yml up -d hive-server docker-compose -f docker-compose.yml up -d spark-master spark-workerGet the IP address and print it to the console
my_ip=`ifconfig | grep 'inet.*netmask.*broadcast' |  awk '{print $2; exit}'`
echo "Namenode: http://${my_ip}: 50070"
echo "Datanode: http://${my_ip}: 50075"
echo "Spark-master: http://${my_ip}: 18080"

# Execute script, spark YARN support
docker-compose exec spark-master bash -c "./copy-jar.sh && exit"
Copy the code

copy-jar.sh

#! /bin/bash

cd/ opt/hadoop - 2.8.0 / share/hadoop/yarn/lib / && cp jersey - core - 1.9. Jar jersey - the client - 1.9 jar/spark/jars / && rm - rf / spark/jars/jersey - the client - 2.22.2. JarCopy the code

stop.sh

#! /bin/bash
docker-compose stop
Copy the code

Based on theIDEAsubmitMapReducetoyarn

The reference list

  1. IDEA submits MapReduce jobs to the Hadoop cluster
  2. Java Operation hadoop HDFS to upload files and download Demo
  3. IDEA Encountered ClassNotFoundException: Mapper when the MapReduce job was submitted to Linux remotely. Procedure

Note: Before submitting toyarn“, type the codejarPackage, otherwise an error will be reportedClassNotFoundExeption. For details, see ClassNotFoundException: Mapper When IDEA Is Remote Submitted to Linux.

pom.xml


      

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.switchvov</groupId>
    <artifactId>hadoop-test</artifactId>
    <version>1.0.0</version>

    <name>hadoop-test</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>
</project>
Copy the code

log4j.properties

log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%m%n
Copy the code

words.txt

this is a tests
this is a tests
this is a tests
this is a tests
this is a tests
this is a tests
this is a tests
this is a tests
this is a tests
Copy the code

HdfsDemo.java

package com.switchvov.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;

/ * * *@author switch
 * @since2020/12/18 * /
public class HdfsDemo {
    /** * Hadoop fs configuration file */
    private static final Configuration CONFIGURATION = new Configuration();

    static {
        // Specify the address of Hadoop FS
        CONFIGURATION.set("fs.default.name"."hdfs://namenode:8020");
    }

    /** * Upload the local file (filePath) to the specified path (DST) of the HDFS server */
    public static void uploadFileToHDFS(String filePath, String dst) throws Exception {
        // Create a file system
        FileSystem fs = FileSystem.get(CONFIGURATION);
        Path srcPath = new Path(filePath);
        Path dstPath = new Path(dst);
        long start = System.currentTimeMillis();
        fs.copyFromLocalFile(false, srcPath, dstPath);
        System.out.println("Time:" + (System.currentTimeMillis() - start));
        System.out.println("________ ready to upload files" + CONFIGURATION.get("fs.default.name") + "____________ hold");
        fs.close();
    }

    /** * Download file */
    public static void downLoadFileFromHDFS(String src) throws Exception {
        FileSystem fs = FileSystem.get(CONFIGURATION);
        Path srcPath = new Path(src);
        InputStream in = fs.open(srcPath);
        try {
            // COPY the file to standard output (i.e. console output)
            IOUtils.copyBytes(in, System.out, 4096.false);
        } finally{ IOUtils.closeStream(in); fs.close(); }}public static void main(String[] args) throws Exception {
        String filename = "words.txt";
// uploadFileToHDFS(
// "/Users/switch/projects/OtherProjects/bigdata-enviroment/hadoop-test/data/" + filename,
// "/share/" + filename
/ /);
        downLoadFileFromHDFS("/share/output12/" + filename + "/part-r-00000"); }}Copy the code

WordCountRunner.java

package com.switchvov.hadoop.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


/ * * *@author switch
 * @since2020/12/17 * /
public class WordCountRunner {

    /** * LongWritable line number Type * Text Input value type * Text output key type * IntWritable output value type **@author switch
     * @since2020/12/17 * /
    public static class WordCountMapper extends Mapper<LongWritable.Text.Text.IntWritable> {

        / * * *@paramThe key line number *@paramThe first line of value says something like this is a tests *@paramThe context output *@throwsIOException exception *@throwsInterruptedException * /
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            // Get an array of strings separated by Spaces
            String[] words = line.split("");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1)); }}}/** * Text Type of input key * IntWritable type of input value * Text Type of output key * IntWritable type of output value **@author switch
     * @since2020/12/17 * /
    public static class WordCountReducer extends Reducer<Text.IntWritable.Text.IntWritable> {
        / * * *@paramKey Key * of the map@paramValues Value * of the map@paramThe context output *@throwsIOException exception *@throwsInterruptedException * /
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            context.write(key, newIntWritable(count)); }}public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // Cross-platform to ensure that Mr Job can be submitted under Windows
        conf.set("mapreduce.app-submission.cross-platform"."true");
        // Configure yarn scheduling
        conf.set("mapreduce.framework.name"."yarn");
        // Set the host name of Resourcemanager
        conf.set("yarn.resourcemanager.hostname"."resourcemanager");
        // The default namenode access address is configured
        conf.set("fs.defaultFS"."hdfs://namenode:8020");
        conf.set("fs.default.name"."hdfs://namenode:8020");
        / / configuration code jar package, can appear otherwise ClassNotFound exceptions, reference: https://blog.csdn.net/qq_19648191/article/details/56684268
        conf.set("mapred.jar"."/Users/switch/projects/OtherProjects/bigdata-enviroment/hadoop-test/out/artifacts/hadoop/hadoop.jar");
        / / task name
        Job job = Job.getInstance(conf, "word count");
        / / Class
        job.setJarByClass(WordCountRunner.class);
        // Specify a Mapper Class
        job.setMapperClass(WordCountMapper.class);
        // Specify the Combiner Class. The computation logic is the same as that of reduce
        job.setCombinerClass(WordCountReducer.class);
        // Specify the Reucer Class
        job.setReducerClass(WordCountReducer.class);
        // Specify the format of the output KEY
        job.setOutputKeyClass(Text.class);
        // Specify the format of the output VALUE
        job.setOutputValueClass(IntWritable.class);
        Set the Reducer number to 1 by default
        job.setNumReduceTasks(1);
        Mapper
      
        The output format must be the same as the last two output types of the inherited class
      ,>
        String filename = "words.txt";
        String args0 = "hdfs://namenode:8020/share/" + filename;
        String args1 = "hdfs://namenode:8020/share/output12/" + filename;
        // Enter the path
        FileInputFormat.addInputPath(job, new Path(args0));
        // Output path
        FileOutputFormat.setOutputPath(job, new Path(args1));
        System.exit(job.waitForCompletion(true)?0 : 1); }}Copy the code

Share and record what you learn and see