This article describes how to write Spark data to ES using ES-Hadoop.

I. Development environment

1. Component versions

  • CDH cluster version: 6.0.1
  • Spark version: 2.2.0
  • Kafka version: 1.0.1
  • ES version: 6.5.1

Maven dependency

<! -- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<! -- Spark base dependencies -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>The spark - core_2. 11</artifactId>
  <version>2.2.0</version>
</dependency>

<! -- Spark-streaming -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>The spark - streaming_2. 11</artifactId>
  <version>2.2.0</version>
</dependency>

<! -- Spark - Streaming -kafka -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>Spark - streaming - kafka - 0-10 _2. 11</artifactId>
  <version>2.2.0</version>
</dependency>

<! -- ZooKeeper dependencies -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5 - cdh6.0.1</version>
</dependency>

<! -- spark-es dependencies -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>Elasticsearch - spark - 20 _2. 11</artifactId>
    <version>6.5.4</version>
</dependency>

<! -- Spark-es dependent HTTP transport component -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>
Copy the code

3. Precautions

If the CDH version of Spark is used, the following errors occur during debugging and actual deployment:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol
Copy the code

Maven’s poM file contains commons-httpclient-3.1.jar. The Maven poM file contains commons-httpclient-3.1.jar. Maven’s POM file contains commons-httpclient-3.1.jar.

Second, the ES – Hadoop

1, the introduction of

Eshadoop implements data interaction between Hadoop ecosystem (Hive, Spark, Pig, Storm, etc.) and ElasticSearch. With this component, you can write Hadoop ecosystem data into ES. Then, with the help of ES, the data can be quickly searched, filtered, aggregated and other analysis, and further data visualization can be realized through Kibana.

At the same time, ES can also be used as the data storage layer (similar to the Stage layer or ODS layer of data warehouse), and data processing tools of Hadoop ecosystem (Hive, MR, Spark, etc.) can be used to write the processed data to HDFS.

Using ES as the storage layer of original data can perform data deduplication and data quality analysis well, and also provide some real-time data services, such as trend display and summary analysis.

Of 2,

ES-Hadoop is an integrated component that encapsulates the APIS for various components of the Hadoop ecosystem to interact with ES. If you only need part of the functionality, you can use the subdivided components:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • Elasticsearch – spark – 20 _2. 10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

Third, elasticsearch – spark

1, configuration,

Es-hadoop core uses restful interfaces provided by ES for data interaction. The following are some important configuration items. For more configuration information, please refer to the official description:

  • es.nodes: Es nodes to be connected (you do not need to configure all nodes, because other available nodes are automatically discovered by default).
  • es.port: HTTP communication port of the node.
  • es.nodes.discovery: The default value is true, indicating that available nodes in the cluster are automatically discovered.
  • es.nodes.wan.only: Defaults to false. When set to true, automatic discovery for nodes is turned off and only usedes.nodesThe declared node performs data read and write operations. If you needThrough the domain nameFor data access, set this option to true; otherwise, be sure to set it to false.
  • es.index.auto.create: Whether to automatically create indexes that do not exist. The default value is true.
  • es.net.http.auth.user: User name for Basic authentication.
  • es.net.http.auth.pass: Password for Basic authentication.
val conf = new SparkConf().setIfMissing("spark.app.name"."rt-data-loader").setIfMissing("spark.master"."local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY."true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE."true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY."false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers"."ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict"."com.jointsky.bigdata.handler.IgnoreConflictsHandler")
Copy the code

The configuration item that needs special attention is es.nodes.wan.only. In the cloud server environment, configuration files usually use Intranet addresses, but local debugging usually uses external addresses. Finally, the node cannot be found (because the internal network address configured by the node is used for connection) :

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]
Copy the code

Set es.nodes.wan.only to true. You are advised to use the domain name during test development. In cluster deployment, set this parameter to false.

2. Mask write conflicts

If data is repeated, data writing conflicts often occur when ES is written. In this case, there are two solutions.

Method 1: Set es.write.operation to upsert. If it exists, it will be updated, and if it does not exist, it will be inserted. The default value of this configuration item is index.

Handlers use custom classes to handle errors, such as ignoring conflicts:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code."); }}Copy the code

Method two can mask version conflicts such as writing versions that are smaller than expected.

3, RDD write ES

EsSpark provides two main methods for data writing:

  • saveToEs: The RDD content isSeq[Map], that is, a collection of Map objects, each Map corresponds to a document;
  • saveJsonToEs: The RDD content isSeq[String], a collection of strings, each String being a JSON String representing a record (the _source for ES).

Data writing can specify a lot of configuration information, such as:

  • es.resource: Sets the index and type to be written, both index and type nameSupport for dynamic variables;
  • es.mapping.id: Sets the field name corresponding to the document _id.
  • es.mapping.exclude: Sets the fields to be ignored when writing. Wildcard characters are supported.
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("In process:" + topic + "-" + line.partition() + ":" + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*"))EsSpark.saveToEs(itemRdd, mapConf)
Copy the code

Es.mapping. exclude Only supports saveToEs. If the RDD is a SET of Json strings, an unsupported error message is displayed. This configuration item is very useful, such as myTmpId as the document ID, so there is no need to store it in _source again, you can configure it to exclude it from _source.


Any Code, Code Any!

Scan code to pay attention to “AnyCode”, programming road, together forward.