The previous Spark real-time streaming data processing program requires that data is written to Kafka and HDFS in two ways after receiving data from Kafka. The part of writing data to Kafka has been summarized before. Now, the part of writing data to HDFS has been summarized again. But it’s a little long, so try to be complete. Note: The versions used in this article are SPARk2.2.1 and 2.6.0-CDH5.11.0


background

In the work, the data received from Kafka needs to be processed, and then stored in Kafka and HDFS in two ways for downstream use. Therefore, when Spark Streaming is used to write HDFS according to service needs, the final version is formed after several changes.

The most basic direct way – directly use saveAsTextFile

Spark provides a ready-made operator, saveAsTextFile, for storing data directly to HDFS or local files.

def saveAsTextFile(path: String): Unit def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): The Unit saveAsTextFile is used to save the RDD to the file system as a text file.Copy the code

However, there are a number of issues with using saveAsTextFile directly. First, saveAsTextFile requires that the saved directory not previously exist, otherwise an error will be reported. Therefore, it is best to check if the directory exists before saving in the program. The following code example does not determine whether the directory exists and needs to be adjusted. Here are some problems with using saveAsTextFile directly:

  1. File Content Overwritten If you use saveAsTextFile for the first time when you are learning Spark, the file content may be overwritten. For the first time, you might want to write:

    Savedstream. foreachRDD(RDD => {//EmptyRDD is not partition-free, so call partitions. If (rdd.partitions. IsEmpty) {logInfo(" No Data in this batchInterval --------")} else {val start_time = System.currentTimeMillis() rdd.saveAsTextFile("hdfs://cdh5hdfs/savepath") competeTime(start_time, "Processed data write to hdfs") } })Copy the code

    Once you actually test it, you’ll find that the files in the savepath directory are overwritten each time, saving only the contents of the last saveAsTextFIle. This is because the name of saveAsTextFile used in foreachRDD is saved by default as part-0000_… In the Spark Streaming program, the following batches of data may be overwritten by the following files in the same path. Therefore, the contents of the files may be overwritten. So, to avoid this problem, you can save by adding a timestamp to the end of the folder.

    Savedstream. foreachRDD(RDD => {//EmptyRDD is not partition-free, so call partitions. If (rdd.partitions. IsEmpty) {logInfo(" No Data in this batchInterval --------")} else {val start_time = System.currentTimeMillis() val curDay=new Date(start_time) val date: String =dateFormat.format(curDay) rdd.saveAsTextFile("hdfs://cdh5hdfs/savepath/"+date+"/"+start_time) competeTime(start_time, "Processed data write to hdfs") } })Copy the code

    That is: RDD. SaveAsTextFile (” HDFS: / / cdh5hdfs/savepath/” + date + “/” + start_time), after this, save the path will be generated according to the current time, Assuming that run to this period of time in the process is “2018-06-30” 18:30:20, so file storage directory is “HDFS: / / cdh5hdfs savepath / 2018-06-30/1530354620”, the following display:

    dcos@d8pccdsj3[~]$hadoop fs -ls /savepath/2018-06-30/1530354620
    Found 8 items
    -rw-r--r--   3 test cgroup          0 2018-06-30 18:30 /savepath/2018-06-30/1530354620/_SUCCESS
    -rw-r--r--   3 test cgroup 2201736217 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00000
    -rw-r--r--   3 test cgroup 2201037065 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00001
    -rw-r--r--   3 test cgroup 2202157942 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00002
    -rw-r--r--   3 test cgroup 2202523100 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00003
    -rw-r--r--   3 test cgroup 2202310836 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00004
    -rw-r--r--   3 test cgroup 2202639458 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00005
    -rw-r--r--   3 test cgroup 2201906597 2018-06-30 18:30 /savepath/2018-06-30/1530354620/part-00006
    Copy the code

    This solves the problem that the contents of the file will be overwritten. However, in this case, we will find that under the folder 2018-06-30 named by the date, a timestamp named directory will be generated according to each batch, and the name of the file in the directory is part-0000N, so it is not enough to solve this, this way has other problems that need to be paid attention to.

  2. Use saveAsTextFile After the preceding operations, the problem of overwriting files will not occur. However, when you actually run the sparkStreming program, you will find that using this method generates one time-stamped directory per batch. There are too many directories. And there will be many many files in the directory, the name is part-00000… If the file size is too small or too large, the HDFS block space will be wasted. Therefore, you need to adjust the file size to the same size as the block size. The reason why so many files are generated is that when Spark is running, spark generates multiple files based on the number of executed tasks. Spark divides data into many parts, and each part corresponds to a task. A partation data is saved by task, so that when calling saveAsTextFile, each partation data will be saved as a part-0000N file. Therefore, you can use the coalesce or repartition operator to save a file to one or fewer files. If a file is generated, you can run the coalesce(1,true).saveastextFile () command on the RDD, which means that the data is collected to a partition after the calculation, and then the saving action is performed. Obviously, Spark only starts one task to perform the saving action for a partition. Only one file was produced. Alternatively, you can call repartition(1), which is a wrapper for coalesce and defaults to true as the second argument. That is:

    True, RDD. Coalesce (1) saveAsTextFile (HDFS: / / cdh5hdfs savepath "/" + date + "/" + start_time) / / or //rdd.repartition(1).saveAsTextFile("hdfs://cdh5hdfs/savepath/"+date+"/"+start_time)Copy the code

    However, there are great hidden dangers in writing this way when there is too much data: In general, the Spark is facing a large amount of data, and is executed in parallel, at the time of data too much, if forced the last only one partition, bound to lead to a lot of disk I/o and network I/o, memory and ultimately operation nodes will also bear a big test, may be the problem of insufficient memory or a single node and its low efficiency. Therefore, it is proposed that the HDFS disk merge operation, known as HDFS getMerge operation, can be used.

    // Hadoop fs-getMerge Source path Destination path Hadoop fs-getmerge/HDFS /output /hdfs2/output. TXT // or cat >Copy the code
  3. Data can be written to HDFS after the preceding operations, and the number of files will not be too large. However, after the above tests, we found a problem. In order to prevent the contents of the files from being overwritten, we used a timestamp, which is equivalent to generating a different directory. Using this method in sparkStreming will generate a timestamp named directory for each batch. In that directory, the file is still named part-0000n, and the name cannot be customized. So, we wondered if we could start with the file name and customize the file name instead of the directory name.

Use HDFS API-Append directly (test, non-production)

In order to customize file names, reduce directory hierarchy, and append files, the article suggests that you can directly call HDFS apI-append. In fact, data shows that appending writing has been introduced in the API since Version 1.0.4 of Hadoop, but it is not recommended for production, but we can test it. If you need to use this method, you need to modify the configuration file, enable this function, and set the parameter of dfs.support.appen to true. Otherwise, an error will be reported when the client writes the file:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException: java.io.IOException: Append to hdfs not supported. Please refer to dfs.support.append configuration parameter.
Copy the code

Modify HDFS -site. XML on the namenode node:

  <property>  
       <name>dfs.support.append</name>  
       <value>true</value>  
  </property>
Copy the code

Or, you can set it directly in the program code:

Private val conf = new Configuration() conf.setBoolean("dfs.support.append", true)  //var fs=FileSystem.get(uri, conf)Copy the code

Note that if you use Append to append to a file, if the file does not exist, you need to create it first. If this happens, you might write something like this:

val path=new Path(strpath) val uri = new URI(strpath) var fs=FileSystem.get(uri, conf) if (! fs.exists(path)) { fs.create(path) } val out = fs.append(path) out.write(record_sum1.getBytes())Copy the code

In this case, the following error will be reported:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: Failed to create file/ HDFS /testfile/file for DFSClient_-14565853217 on client 132.90.130.101 because current leaseholder is trying to recreate file.Copy the code

After the file is created, close the stream FS and get a new FS:

val path=new Path(strpath) val uri = new URI(strpath) var fs=FileSystem.get(uri, conf) if (! fs.exists(path)) { fs.create(path) fs.close() fs=FileSystem.get(uri, conf) } val out = fs.append(path) out.write(record_sum1.getBytes())Copy the code

By the practical test, the above method is still may be an error: “. Org. Apache hadoop. HDFS. Protocol. AlreadyBeingCreatedException “, the reason of abnormal: FSDataOutputStream CREATE (Path F) Generates an output stream that needs to be closed after creation. Solution: After creating the file, close the FSDataOutputStream. Therefore, try using the following:

Var fs: FileSystem =FileSystem. Get (conf) println("append to file:"+dirPath) if (! Fs.exists (path)) {println("path not exist") // create(path f) generates an output stream, Fs.create (path).close()} //fs.append Val out: FSDataOutputStream = fs.append(path)Copy the code

See Common operations and Precautions for Hadoop HDFS Files (Update). Problem 2: HDFS write exception: An exception occurs when the appended file is thrown for the first time.

(Suppose that the Iterator[(String,String)] tuple of the test data needs to be written to the second field). When spark is invoked, each cache entry is written once:

def writeToHDFS(strpath: String,iter: Iterator[(String,String)], cache: Int): Try[Unit] = Try {var record_sum1="" var count_sum1=0 var record="" val path=new path (strpath) val URI = new URI(strpath) var fs=FileSystem.get(uri, conf) if (! fs.exists(path)) { fs.create(path) fs.close() fs=FileSystem.get(uri, conf) } val out = fs.append(path) while (iter.hasNext) { record=iter.next()._2 record_sum1 += record+"\n" count_sum1 = count_sum1 + 1 if (count_sum1 == cache) { out.write(record_sum1.getBytes()) record_sum1 = "" count_sum1 = 0 } } if (! record_sum1.isEmpty) { out.write(record_sum1.getBytes()) } out.flush() out.close() fs.close() }Copy the code

In the Spark program, you can directly use the RDD as follows:

if (! Rdd.isempty ()){// Note that there is no need to return a value, so it is better to use foreachPartition to land the storage directly. ForeachPartition is a collect operator, and mapPartitions is a transofmation. val start_time = System.currentTimeMillis() val curDay=new Date(start_time) val date: String =dateFormat.format(curDay) rdd.foreachPartition(iter=>{ val strpath="hdfs://cdh5hdfs/savepath/"+date+"/"+start_time writeToHDFS(strpath,iter,300) }) //rdd.repartition(1).mapPartitions(iter=>{ // val str=List[String]() // val strpath="hdfs://cdh5hdfs/savepath/"+date+"/"+start_time // writeToHDFS(strpath,iter,300) // str.iterator // }).collect() }Copy the code

So, directory level becomes a “HDFS: / / cdh5hdfs savepath/current date/timestamp files” content that is directly added in the file, named timestamp that reduced the directory hierarchy, and the ability to customize the file name. However, using this method is equivalent to a partation to establish a HDFS connection, the execution will be very slow, whether you can consider to Kafka like connection pool method to improve efficiency, but I did not experiment, after all, Append is said to be a test version, not recommended for production.

Need to change

In the preceding operations, data is directly written to a file without any distinction. But then our needs changed because of business needs. The data we process is also received from the upstream, and the generation time of this data is recorded in each data. However, the actual time we receive may be later than the generation time, so it is not only required to customize the storage file name, but also according to the data content (the generation time field of the data content). To determine which file in which folder to write data to. For example, the data format looks like this:

0 | 18610000000 | 460010000000000 | 2018 | 07 | 28 16-21-35 | 41003 | | 22002 | 35004007800300 | 0000 | | | | | 0 | | | 16:21:35 2018-07-28Copy the code

Therefore, according to our requirements, the generated directory format needs to be:

dcos@d8pccdsj3[~]$hadoop fs -ls /savepath/2018-07-28 Found 11 items drwxr-xr-x - user1 cgroup 0 2018-07-28 10:42 /savepath/2018-07-28/00 drwxr-xr-x - user1 cgroup 0 2018-07-28 09:48 /savepath/2018-07-28/01 drwxr-xr-x - user1 cgroup 0  2018-07-28 09:40 /savepath/2018-07-28/02 drwxr-xr-x - user1 cgroup 0 2018-07-28 08:54 /savepath/2018-07-28/03 drwxr-xr-x - user1 cgroup 0 2018-07-28 10:26 /savepath/2018-07-28/04 drwxr-xr-x - user1 cgroup 0 2018-07-28 10:30 /savepath/2018-07-28/05 drwxr-xr-x - user1 cgroup 0 2018-07-28 10:38 /savepath/2018-07-28/06 drwxr-xr-x - user1 cgroup 0  2018-07-28 10:26 /savepath/2018-07-28/07 drwxr-xr-x - user1 cgroup 0 2018-07-28 10:42 /savepath/2018-07-28/08 drwxr-xr-x - user1 cgroup 0 2018-07-28 10:42 /savepath/2018-07-28/09 drwxr-xr-x - user1 cgroup 0 2018-07-28 11:42 /savepath/2018-07-28/10 ... drwxr-xr-x - user1 cgroup 0 2018-07-28 16:40 /savepath/2018-07-28/16 ... drwxr-xr-x - user1 cgroup 0 2018-07-29 00:30 /savepath/2018-07-28/23Copy the code

That is, the directory must be /savepath/ date/hour/file name. The file name needs to identify the current time when this file is written (that is, the system time), and the date and hour of the directory name need to be determined according to the service time in the data, that is, the 18th field of the above example data. The above data needs to be stored in a file under /savepath/2018-07-28/16 (e.g. 16-16-06-00). Therefore, according to this requirement, we cannot use saveAsTextFile directly, because the file name needs to be customized; When I try to use Append directly, I have a lot of trouble. I need to decide which directory and file to put in according to the content of each piece of data, and the data may be delayed. For example, it’s now 10 o ‘clock, and I can get a few pieces of 3 o ‘clock data here and there. So when we use Append, we process the date and other information of each piece of data into triples and then use the writeToHDFS method to extract the time to determine the directory and file name, but it is still complicated to use and the processing and writing time is too long, which affects the real-time. And the Append test method is not suitable for production. Therefore, the following final method was finally found.

The final method – the RDDMultipleTextOutputFormat saveAsHadoopFile + custom

Analysis of the

In order to solve the above needs, I need to find another solution. To start with spark’s own operators, take a look at the source code for saveAsTextFile:

/**
   * Save this RDD as a text file, using string representations of elements.
   */
  def saveAsTextFile(path: String): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an Ordering for `NullWritable`. That's why the compiler will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }
  /**
   * Save this RDD as a compressed text file, using string representations of elements.
   */
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
  }
Copy the code

SaveAsTextFile is dependent on saveAsHadoopFile, which accepts PairRDD, Therefore, use the rddToPairRDDFunctions in the saveAsTextFile function to convert the RDD of NullWritable,Text type, and then use the saveAsHadoopFile function to implement the corresponding write operation.

Reference: [SparkJavaAPI] Action(6) — saveAsTextFile, saveAsObjectFile.

When saveAsTextFile calls saveAsHadoopFile, you can see that Spark uses Hadoop to write files internally. TextOutputFormat[NullWritable, Text] is used by default. For TextOutputFormat, the default output of multiple files in MapReduce of Hadoop is TextOutputFormat, so the default output is part-r-00000 and part -R-00001 incrementing file names. All graphs homework output a set of files, not like our demand, according to the output file content file or more groups to a data set is divided into multiple data set (for example will belong to different lines of business inside a log log to separate output, and to the related lines of business, or as we this time according to the data file is stored). In Hadoop, Hadoop’s multi-file output needs to write records belonging to different types to different files according to different keys or values. TextOutputFormat can be replaced by MultipleOutputFormat or MultipleOutputs.

“Outputs as MultipleOutputFormat and MultipleOutputs as Outputs as MultipleOutputFormat” Outputs, outputs, and MultipleOutputFormat (2)”

Therefore, since Spark’s internal file writing method calls Hadoop stuff, we can use saveAsHadoopFile and customize the OutputFormat class of MultipleOutputFormat.

An aside: Since saveAsTextFile uses TextOutputFormat to output files to HDFS by default, saveAsTextFile uses TextOutputFormat to output files to HDFS. The default output file name is part-00000 and part-00001, and hadoop output is part-r-00000 by default. I want to know when the file name changed, so I tracked the spark source learning, and analyzed the source code of this problem. Write an article later when you have time.

The code

Now let’s start the formal implementation of multi-file output using Spark, divide the file storage according to the file content and customize the file name.

Spark Streaming Implements file name customization based on file content and apends file content, and Spark MultipleOutputFormat.

SaveAsHadoopFile operator

First look at the official description of the saveAsHadoopFile operator:

/** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile[F <:  OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class supporting the key and value types K and V in this RDD. Compress the result with the  supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class supporting the key and value types K and V in this RDD. Compress with the supplied codec. */ def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class supporting the key and value types K and V in this RDD. * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do not use output committer that writes data directly. * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad result of using direct output committer with speculation enabled. */ def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): UnitCopy the code

Def saveAsHadoopFile(path: String,keyClass: Class[],valueClass: Class[],outputFormatClass: Class[_ <: OutputFormat[ , _]],conf: JobConf = new JobConf(self.context.hadoopConfiguration),codec: Option[Class[ <: CompressionCodec]] = None): Unit The parameters to be passed in this operator are file path, key type, value type, and outputFormat in sequence. Before using saveAsTextFile in org. Apache. Spark. RDD. RDD class, while to saveAsHadoopFile operator org. Apache. Spark. RDD. PairRDDFunctions class, The parameter that needs to be received is PairRDD, so we need to do the map operation of the original RDD before using it, and change it into the form of (key, value). I will not elaborate here, but I will say it again after the code posted at the end. (K, V) type: classOf[String], classOf[String] (K, V) type: classOf[String] We’re going to customize a MultipleOutputFormat.

MultipleOutputFormat analysis

Let’s take a look at the MultipleOutputFormat source code:

/**
 * This abstract class extends the FileOutputFormat, allowing to write the
 * output data to different output files. There are three basic use cases for
 * this class. 
 * Case one: This class is used for a map reduce job with at least one reducer.
 * The reducer wants to write data to different files depending on the actual
 * keys. It is assumed that a key (or value) encodes the actual key (value)
 * and the desired location for the actual key (value).
 * Case two: This class is used for a map only job. The job wants to use an
 * output file name that is either a part of the input file name of the input
 * data, or some derivation of it.
 * Case three: This class is used for a map only job. The job wants to use an
 * output file name that depends on both the keys and the input file name,
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MultipleOutputFormat<K, V>
extends FileOutputFormat<K, V> {
  /**
   * Create a composite record writer that can write key/value data to different
   * output files
   * @param fs
   *          the file system to use
   * @param job
   *          the job conf for the job
   * @param name
   *          the leaf file name for the output file (such as part-00000")
   * @param arg3
   *          a progressable for reporting progress.
   * @return a composite record writer
   * @throws IOException
   */
  public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {

    final FileSystem myFS = fs;
    final String myName = generateLeafFileName(name);
    final JobConf myJob = job;
    final Progressable myProgressable = arg3;

    return new RecordWriter<K, V>() {

      // a cache storing the record writers for different output files.
      TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();

      public void write(K key, V value) throws IOException {

        // get the file name based on the key
        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);

        // get the file name based on the input file name
        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);

        // get the actual key
        K actualKey = generateActualKey(key, value);
        V actualValue = generateActualValue(key, value);

        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
        if (rw == null) {
          // if we don't have the record writer yet for the final path, create
          // one
          // and add it to the cache
          rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
          this.recordWriters.put(finalPath, rw);
        }
        rw.write(actualKey, actualValue);
      };

      public void close(Reporter reporter) throws IOException {
        Iterator<String> keys = this.recordWriters.keySet().iterator();
        while (keys.hasNext()) {
          RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
          rw.close(reporter);
        }
        this.recordWriters.clear();
      };
    };
  }
  /**
   * Generate the leaf name for the output file name. The default behavior does not change the leaf file name (such as part-00000) 
   * @param name
   *          the leaf file name for the output file
   * @return the given leaf file name
   */
  protected String generateLeafFileName(String name) {
    return name;
  }
  /**
   * Generate the file output file name based on the given key and the leaf file
   * name. The default behavior is that the file name does not depend on the
   * key. 
   * @param key
   *          the key of the output data
   * @param name
   *          the leaf file name
   * @return generated file name
   */
  protected String generateFileNameForKeyValue(K key, V value, String name) {
    return name;
  }
  /**
   * Generate the actual key from the given key/value. The default behavior is that
   * the actual key is equal to the given key 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual key derived from the given key/value
   */
  protected K generateActualKey(K key, V value) {
    return key;
  }
  /**
   * Generate the actual value from the given key and value. The default behavior is that
   * the actual value is equal to the given value 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual value derived from the given key/value
   */
  protected V generateActualValue(K key, V value) {
    return value;
  }
  /**
   * Generate the outfile name based on a given anme and the input file name. If
   * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
   * the given name is returned unchanged. If the config value for
   * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
   * name is returned unchanged. Otherwise, return a file name consisting of the
   * N trailing legs of the input file name where N is the config value for
   * "num.of.trailing.legs.to.use". 
   * @param job
   *          the job config
   * @param name
   *          the output file name
   * @return the outfile name based on a given anme and the input file name.
   */
  protected String getInputFileBasedOutputFileName(JobConf job, String name) {
    String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
    if (infilepath == null) {
      // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
      // then return the given name
      return name;
    }
    int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
    if (numOfTrailingLegsToUse <= 0) {
      return name;
    }
    Path infile = new Path(infilepath);
    Path parent = infile.getParent();
    String midName = infile.getName();
    Path outPath = new Path(midName);
    for (int i = 1; i < numOfTrailingLegsToUse; i++) {
      if (parent == null) break;
      midName = parent.getName();
      if (midName.length() == 0) break;
      parent = parent.getParent();
      outPath = new Path(midName, outPath);
    }
    return outPath.toString();
  }
  /**
   * @param fs
   *          the file system to use
   * @param job
   *          a job conf object
   * @param name
   *          the name of the file over which a record writer object will be
   *          constructed
   * @param arg3
   *          a progressable object
   * @return A RecordWriter object over the given file
   * @throws IOException
   */
  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
      JobConf job, String name, Progressable arg3) throws IOException;
}
Copy the code

At the beginning of the source code, we see a description of the MultipleOutputFormat<K, V>, which can output similar records to the same data set. We can see that in writing prior to each record, MultipleOutputFormat will call generateFileNameForKeyValue method to determine the need to write the name of the file.

In getRecordWriter, the generateLeafFileName method is called to generate the name of the file, and it just passes in “name” to generate the leaf name of the file (myName 1 in the figure). The value of this name can also be found in the parent FileOutputFormat or MultipleOutputs class to find out why this default value is passed in. Or refer to the following source documentation for Spark, which will not be explained here); And then generate myName will be introduced to generateFileNameForKeyValue method, this method takes three parameters, can according to the k, v, and the incoming name again to generate a KeyBasePath namely file name label in (figure 2), After obtaining the KeyBasePath again as a parameter to generate finalPath getInputFileBasedOutputFileName method. You can see by default, we see generateLeafFileName method and the generateFileNameForKeyValue method is directly“Return name”, so the default is name==myName==KeyBasePath, and the file name is part-0000.

So, in order to determine the name of the file to write based on the content, generateLeafFileName is only related to the name (test generateLeafFileName in a subsequent example), And generateFileNameForKeyValue associated with content key and the value, the name, so we rewrite generateFileNameForKeyValue method directly in your own classes.

Is, however, we need to write the text, so normally, we can directly inherited MultipleTextOutputFormat class, to complete the implementation generateFileNameForKeyValue method to return the file name of each output key/value pairs. MultipleTextOutputFormat is also an inherited MultipleOutputFormat class, which can be described in the official documentationMultipleTextOutputFormatYou can also see the inheritance of the class.

The official description of MultipleTextOutputFormat is as follows:

/**
 * This class extends the MultipleOutputFormat, allowing to write the output
 * data to different output files in Text output format.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultipleTextOutputFormat<K, V> extends MultipleOutputFormat<K, V> {
  private TextOutputFormat<K, V> theTextOutputFormat = null;

  @Override
  protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {
    if (theTextOutputFormat == null) {
      theTextOutputFormat = new TextOutputFormat<K, V>();
    }
    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
  }
}  
Copy the code

By default, it can output data in text format to a different directory, and what we need to output is text, but we need to customize it to write to a directory, so we just need to inherit from MultipleTextOutputFormat and define a class.

Test generateLeafFileName (supplementary, can be skipped)

We said the MultipleOutputFormat above, there are two decision method of file name generateLeafFileName and generateFileNameForKeyValue, we before the formal code, Write generateLeafFileName directly to the file system test first, and skip it.

  1. A custom name for TestMultipleTextOutputFormat OutputFormat.

    package com.ileaf.test
    import java.text.SimpleDateFormat
    import java.util.Date
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    class TestMultipleTextOutputFormat  extends MultipleTextOutputFormat[Any, Any]{
      private val HOURFORMAT = new SimpleDateFormat("HH-mm-ss")
      private val YMDFORMAT = new SimpleDateFormat("yyyy-mm-dd")
      private val start_time = System.currentTimeMillis()
      private val curDay=new Date(start_time)
      private val fileName=HOURFORMAT.format(curDay)
      private val dirName=YMDFORMAT.format(curDay)
      override def generateLeafFileName(name: String):String={
     val filename=dirName+"/"+fileName+"_"+name
     filename
      }
    }
    Copy the code
  2. Used in the Spark program

    import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TestSparkSave { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("flatMap Demo") val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(List("0|18610000000|460010000000000|2018|07|21|16-21-35|41003|22002|35004007800300|0000|||||0|||2018-07-2 1 16:21:35 ", "0 18610000001 | | 460010000000001 | 2018 | 07 | 21 15-21-35 | 41000 | | 22000 | 35004007800301 | 0000 | | | | | 0 | | | 2018-07-21 15:21:35", "0 | 18610000002 | 460010000000002 | 2018 | 07 | | 21 15-20-35 41000 | | 22001 | 35004007800302 | 0000 | | | | | 0 | | | 2018-07-21 15:20:35", "0|18610000003|460010000000003|2018|07|21|17-21-35|41001|22002|35004007800303|0000|||||0|||2018-07-21 17:21:35")) val rdd1: RDD[(String, String)] =rdd.map(x=>(x,"")) rdd1.rapartation(2).saveAsHadoopFile("/Users/yeziming/test/saveDir", classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat]) } }Copy the code

Then we run the program and you can see the generated file. Whatever) :

yezm:saveDir yeziming$ ls 2018-24-28 _SUCCESS yezm:saveDir yeziming$ cd 2018-24-28/ yezm:2018-24-28 yeziming$ ls -l total 16 -rw-r--r-- 1 yeziming staff 222 7 28 11:24 11-24-44_part-00000 -rw-r--r-- 1 yeziming staff 222 7 28 11:24 11-24-44_part-00001 yezm:2018-24-28 yeziming$ cat 11-24-44_part-00000 0 | 18610000000 | 460010000000000 | 2018 | 07 | 21 16-21-35 | 41003 | | 22002 | 35004007800300 | 0000 | | | | | 0 | | | 16:21:35 2018-07-21 0 | 18610000002 | 460010000000002 | 2018 | 07 | | 21 15-20-35 41000 | | 22001 | 35004007800302 | 0000 | | | | | 0 | | | 15:20:35 2018-07-21 yezm:2018-24-28 yeziming$ cat 11-24-44_part-00001 0 | 18610000001 | 460010000000001 | 2018 | 07 | 21 15-21-35 | 41000 | | 22000 | 35004007800301 | 0000 | | | | | 0 | | | 15:21:35 2018-07-21 0 | 18610000003 | 460010000000003 | 2018 | 07 | 21 17-21-35 | 41001 | | 22002 | 35004007800303 | 0000 | | | | | 0 | | | 17:21:35 2018-07-21Copy the code

Val filename=dirName+ “/” + filename + “_” +name “; val filename=dirName+ “/” + filename + “_” +name The following number is the partation label, so we can use the default name label to distinguish between different partations.

Formal coding

Next, we start to write the formal code based on the business, which is easy to write after the above analysis. (For business requirements, see 4 requirements changes above)

  1. We use the saveAsHadoopFile operator in SparkStreaming. Since the previous data is RDD, and the saveAsHadoopFile needs pairRDD, we use map to transform the data, data content as the key, empty string “” as the value, And follow-up is needed here we custom RDDMultipleTextOutputFormat generate corresponding to the file name. We custom RDDMultipleTextOutputFormat, follow-up and file path, the key format, the value into saveAsHadoopFile operator in the format.

    / /... Val saveDstream: DStream[String] = writedStrem. repartition(numHdfsFile_Repartition) savedStream. foreachRDD(RDD => {// Run on the driver side, involving the following operations: Initialization and update of broadcast variables // Where the data is generated once in a batch, Val start_time = System.currentTimemillis () if (rdd.isEmpty) {logInfo(" No Data in this BatchInterval --------")} else {// val a should be converted by map because saveAsHadoopFile needs to accept pairRDD: RDD[(String, String)] =rdd.map(x=>(x,"")) a.saveAsHadoopFile(hdfsPath+"/", classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat]) competeTime(start_time, "Processed data write to hdfs") } })//foreachRDD //...Copy the code
  2. Custom RDDMultipleTextOutputFormat inheritance MultipleTextOutputFormat [Any, Any] because we inherited is MultipleTextOutputFormat here, Already getRecordWriter rewrite good for me, so we are very convenient and simple to rewrite a generateFileNameForKeyValue, to divided according to the data file directory.

    import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any]{ private val pre_flag="\|" private val HOURFORMAT = new SimpleDateFormat("HH-mm-ss") private val start_time = System.currentTimeMillis() private val curDay=new Date(start_time) private val fileName=HOURFORMAT.format(curDay) override def generateFileNameForKeyValue(key: Any, value: Any, name: String):String ={ //0|18610000000|460010000000000|2018|07|21|16-21-35|41003|22002|35004007800300|0000|||||0|||2018-07-21 16:21:35 val line = key.tostring val split=line.split(pre_flag) If time is null, we might get an error, but we can do it when we have time, Val time=split(18) val ymd= time.subString (0, time.indexof (" ")) val hour=time.substring(time.indexof (")) val hour=time.substring(time.indexof (")) ")+1,time.indexOf(":")) val service_date=ymd+"/"+hour+"/"+fileName+"-"+name.substring(name.length()-2)//.split("-")(0) service_date } }Copy the code

    In generateFileNameForKeyValue method, we use the map according to the above transformation “RDD. The map (x = > (x,” “)) “, is the key content for the data. Therefore, the key is split and the date in the 18th field is used as the basis for directory generation. And we intercept the last two digits of the default name to use as the partation number, so our final generated file path is named “file base path (i.e. HdfsPath passed in saveAsHadoopFile above)/service year/month/service hour/file write time _part number”.

Therefore, the final display style can be as follows:

// This is the data delay, dcos@d8pccdsj3[~]$hadoop fs -ls /encrypt_data/4g_info_c60/2018-07-28/12 Found 40 items -rw-r--r-- 3 user1  cgroup 6541003 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-05-00 -rw-r--r-- 3 user1 cgroup 6555677 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-05-01 -rw-r--r-- 3 user1 cgroup 6538441 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-05-02 -rw-r--r-- 3 user1 cgroup 6567709 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-05-03 -rw-r--r-- 3 user1 cgroup 6570481 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-05-04 -rw-r--r-- 3 user1 cgroup 29486262 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-22-00 -rw-r--r-- 3 user1 cgroup 29477123 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-22-01 -rw-r--r-- 3 user1 cgroup 29476448 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-22-02 -rw-r--r-- 3 user1 cgroup 29425074 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-22-03 -rw-r--r-- 3 user1 cgroup 29435407 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-22-04 -rw-r--r-- 3 user1 cgroup 66757368 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-43-00 -rw-r--r-- 3 user1 cgroup 66862913 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-43-01 -rw-r--r-- 3 user1 cgroup 66854597 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-43-02 -rw-r--r-- 3 user1 cgroup 66809042 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-43-03 -rw-r--r-- 3 user1 cgroup 66777279 2018-07-28 12:12 /encrypt_data/4g_info_c60/2018-07-28/12/12-12-43-04 -rw-r--r-- 3 user1 cgroup 73288280 2018-07-28 12:13 /encrypt_data/4g_info_c60/2018-07-28/12/12-13-03-00 -rw-r--r-- 3 user1 cgroup 73285173 2018-07-28 12:13 /encrypt_data/4g_info_c60/2018-07-28/12/12-13-03-01 -rw-r--r-- 3 user1 cgroup 73355198 2018-07-28 12:13 /encrypt_data/4g_info_c60/2018-07-28/12/12-13-03-02 -rw-r--r-- 3 user1 cgroup 73337561 2018-07-28 12:13 /encrypt_data/4g_info_c60/2018-07-28/12/12-13-03-03 -rw-r--r-- 3 user1 cgroup 73320851 2018-07-28 12:13 /encrypt_data/4g_info_c60/2018-07-28/12/12-13-03-04Copy the code

And we’re done!

Supplement – Requirement 2 implementation -validateOutputSpecs parameter -2018-09-03

Requirements describe

Data is accessed from many interfaces. The first field of each data indicates the source of the interface. The Spark streaming program is used to write the data to the corresponding file in the CORRESPONDING HDFS directory in real time according to the interface annotations. Data style:

60201-08-14 10:16:50. Eight 062211990201-08-14 10:17:28. 652398109,0,6... 61201-08-14 10:16:47. Eight 095155954201-08-14 10:17:21. 435997962,0,52... 62201-08-14 10:17:05. Eight 457761049201-08-14 10:17:28. 077723979,0,6... .Copy the code

To solve

This requirement with the above problem solutions are the same, is to rewrite their own RDDMultipleTextOutputFormat can. The following is a code example:

Rewrite the RDDMultipleTextOutputFormat

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat  extends MultipleTextOutputFormat[Any, Any]{
  private final val PREFLAG=","
  private final val YMDFORMAT = new SimpleDateFormat("yyyyMMdd")
  private final val YMDHMFORMAT=new SimpleDateFormat("yyyyMMddHHmm")
  private val cur_time = System.currentTimeMillis()
  private val curDay=new Date(cur_time)
  private val dirName=YMDFORMAT.format(curDay)
  private val fileName=YMDHMFORMAT.format(curDay)
  private val filePrefix="xx-xxxxx-events-"
  private val fileSuffix=".txt"

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String):String ={
    val line = key.toString
    val split=line.split(PREFLAG)
    val filePath=dirName+"/"+filePrefix+split(0)+"-"+fileName+name.substring(name.length()-2)+fileSuffix
    filePath
  }
}
Copy the code

Native test call

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** * Create by Liv on 2018/8/31. */ object TestXXOriFile { def main(args:  Array[String]): Unit = {val sparkConf = new sparkConf ().setMaster("local").setAppName("flatMap Demo") /*  Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: The Output directory file: / Users/yeziming/test/xxdata already exists this parameter meaning: if set to true, saveAsHadoopFile verifies the Output directory exists. Set this parameter to false to ignore file exceptions, but you are advised to manually delete the output directory using the Hadoop file system API. This parameter is ignored when Spark Streaming StreamingContext is used because the existing file will be overwritten when checkpoint recovery is performed. */ sparkConf.set("spark.hadoop.validateOutputSpecs","false") val sc = new SparkContext(sparkConf) val fileName = "/Users/yeziming/IdeaProjects/SparkAclKafka/resource/table_data.properties" val rdd: RDD[String] =sc.textFile(fileName) val pairRdd: RDD[(String, Null)] = rdd.map(x=>(x,null)) pairRdd.repartition(2).saveAsHadoopFile("/Users/yeziming/test/xxdata"+"/",classOf[String],classOf[String],classOf[RDDMul tipleTextOutputFormat]) } }Copy the code

In order to test the file directory of RDDMultipleTextOutputFormat written format is correct, so he wrote in the machine use sparkcore tested the spark program, but in the process of test, in the first time, Not set spark. Hadoop. ValidateOutputSpecs, so after every once, if you don’t delete the original and may be an error is as follows:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/yeziming/test/xxdata already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
Copy the code

Because in the spark is hadoop, so Mr, save the file in the org.. Apache hadoop. The mapred. FileOutputFormat. CheckOutputSpecs method will check whether the output directory is legal, If not specified, the thrown InvalidJobConfException, throw FileAlreadyExistsException file already exists. Therefore, because again, directory exists, so will quote us FileAlreadyExistsException mistakes. Spark Streaming does not have this error. When asking how to fix this error, most people say they can simply delete the directory. Later by querying the spark of parameter Settings, and discovered the “spark. Hadoop. ValidateOutputSpecs” this parameter. It is explained as follows:

The spark. Hadoop. ValidateOutputSpecs default is true. If set to true, saveAsHadoopFile verifies that the output directory exists. Set this parameter to false to ignore file exceptions, but you are advised to manually delete the output directory using the Hadoop file system API. This parameter is ignored when Spark Streaming StreamingContext is used because the existing file will be overwritten when checkpoint recovery is performed. If set to true, validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop's  FileSystem API to delete output directories by hand. This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since data may need to be rewritten to pre-existing output directories during checkpoint recovery.Copy the code

Spark Streaming will not cause this problem. Therefore, you can set this parameter if this problem occurs when you use the Spark Core program and you want to overwrite the directory. Note: Files are not overwritten depending on the file name. After setting this parameter, run the program I wrote above again. The files in the directory are as follows:

yezm:20180903 yeziming$ ls
xx-xxxxx-events-60-20180903132300.txt
xx-xxxxx-events-60-20180903132301.txt
xx-xxxxx-events-60-20180903132400.txt
xx-xxxxx-events-60-20180903132401.txt
xx-xxxxx-events-60-20180903134300.txt
xx-xxxxx-events-60-20180903134301.txt
xx-xxxxx-events-61-20180903132300.txt
xx-xxxxx-events-61-20180903132301.txt
xx-xxxxx-events-61-20180903132400.txt
xx-xxxxx-events-61-20180903132401.txt
xx-xxxxx-events-61-20180903134300.txt
xx-xxxxx-events-61-20180903134301.txt
...
Copy the code

That is, my output files are named after minutes, so running multiple times in different minutes will generate multiple files in that directory and will not overwrite the files. However, you are advised not to set this parameter because old files in the HDFS need to be deleted. For details about how to delete the HDFS directory in Spark, see Deleting HDFS files on Spark.


At this point, this section is complete. This paper started from 2018-07-02 and was completed on 2018-07-28. It took almost one month, during which I took time to sort out the documents intermittently, which was very fruitful. 2018-09-03 Update 6 Supplementary requirement 2.