This article will detail the use of ES and Hive direct data interaction; Using Hive external tables, you can quickly map ES index data to Hive and use Hive SQL to further process data.

I. Development environment

1. Component versions

  • CDH cluster version: 6.0.1
  • ES version: 6.5.1
  • Hive version: 2.1.1
  • Es-hadoop version: 6.5.1

2. Hive Overview

Hive acts as a data warehouse in the Hadoop ecosystem, facilitating data aggregation, AD hoc queries, and analysis of large data sets stored in Hadoop file systems.

Hive abstracts data on Hadoop using SQL like language (HSQL), so that users can define, organize, manipulate, and analyze data using SQL statements. In Hive, data sets are defined by tables that define data types. Users can load, query, and convert data using built-in operators or user-defined functions (UDFs).

3. Hive install ES-Hadoop

Official recommended installation methods:

useadd jar

add jar /path/elasticsearch-hadoop.jar
Copy the code

usehive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar
Copy the code

Modifying the configuration (hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>
Copy the code

Cdh6.x Recommended installation method

Copy elasticSearch-hadoop. jar to Hive’s auxlib directory and restart Hive.

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/
Copy the code

Data interaction between Hive and ElasticSearch

1. Comparison table of data types

Note that the type in ES is the data type in index/_mapping, not the data type in _source.

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2. Create Hive external tables

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc'.'es.query'='? q=status:001'
    'es.nodes'='sky-01'.'es.port'='9200'.'es.net.http.auth.user'='sky'.'es.net.http.auth.pass'='jointsky'.'es.date.format'='yyyy-MM-dd HH:mm:ss'.'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);
Copy the code

3. Description of configuration items

es.resource

Es. Resource Is used to set the location of es resources. By default, this configuration item sets both read and write indexes.

  • es.resource.read: Sets the read position;
  • es.resource.write: Sets the write location.

es.query

Es. Query Sets query filtering conditions. Currently, uri Query, Query DSL, and External Resource are supported.

# uri (or parameter) queryes.query = ? q=costinl# query dsl
es.query = { "query" : { "term" : { "user" : "costinl"}}}# external resource
es.query = org/mypackage/myquery.json
Copy the code

es.mapping.names

Es. Mapping. names Is used to set the field mapping relationship between Hive and ES. If this parameter is not set, the default field names (the field names defined in the Data type area) remain unchanged. In addition, this part is used to define data mapping types from Hive to ES.

'es.mapping.names' = 'date:@timestamp , url:url_123 ')
Copy the code

For descriptions of other common fields, see article: Write Spark Streaming Data to ES using ES-Hadoop

4. Custom date type resolution

Currently, when mapping the date type of ES to the TIMESTAMP type of Hive, the ES-Hadoop component can only recognize the date string in the TIMESTAMP format or the standard XSD format:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}
Copy the code

About the XSD (XML Schema Date/Time Datatypes) available reference articles: www.w3schools.com/xml/schema_…

To be compatible with custom date formats, we need to write a custom date-reading class:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if(value ! = null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        returnsuper.parseDate(value, richDate); } /** ** Parses the date according to the specified format.<br> * Returns null * @param stringDate date string * @param format Date format * @returnPrivate static Date parseDate(String stringDate, String format) {private static Date parseDate(String stringDate, String format) {if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("(+ [-] [0-9] [0-9]) : ([0-9] [0-9]) $"."The $1$2");
            }
            Date date = parser.parse(str2, pos);
            if(date ! = null && pos.getIndex() == str2.length()) {return date;
            }
        }
        throw new ParseException("Unable to parse the date: "+ str, -1); }}Copy the code

Maven dependencies for the above code

<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> </version> <scope>provided</scope> </dependency> <dependency> <groupId> org.elasticSearch </groupId> < artifactId > elasticsearch - hadoop < / artifactId > < version > 6.5.4 < / version > < scope > provided < / scope > < / dependency > </dependencies>Copy the code

Deployment of custom date resolution packages

After the code is written, package the code, place the jar package in Hive’s Auxlib directory, and restart Hive. This step is the same as the installation step of ES-Hadoop.

When you write Spark to read data from Hive, you need to add the dependency on this package as well as the dependency on ES-Hadoop.

Third, summary

HSQL insert into table XXX select * from XXXXX; HSQL insert into table XXX select * from XXXXX; Read data from ES and write it to HDFS; Of course, data can be processed with more complex HSQL and written back to ES or stored in HDFS.

By making full use of the query, filtering and aggregation of ES, ETL processes such as data standardization, data cleaning and data distribution can be well served.


Any Code, Code Any!

Scan code to pay attention to “AnyCode”, programming road, together forward.