business

Requirements: count PV per hour

The data collection

  • hdfs
  • hive

Data Cleaning (ETL)

Used to describe the process of extracting, transforming, and loading data from the source to the destination

  • Field filtering

    • “31/Aug/2015:00:04:37 +0800”
    • “GET /course/view.php? Id = 27 HTTP / 1.1”
  • Field completion

    • User information, commodity information – “RDBMS
  • Field formatting

    • The 20150831000437 00:04:37 2015-08-31

The data analysis

  • MapReduce
  • Hive
  • Spark

    Exporting the data

Hive is introduced

Open source project by Facebook to solve the data statistics of large structured logs

Essence: Convert HQL to a MapReduce program

Hive’s real time HDFS directories and files

Hive installation mode

  • The embedded mode

    The metadata information is stored in its own Deybe database

    Only one connection is allowed to be created

    Many used for Demo

  • Local mode

    The metadata information is stored in the MySQL database

    The MySQL database runs on the same physical machine as Hive

    Mostly used for development and testing

  • Remote mode

    The metadata information is stored in the MySQL database

    The MySQL database runs on a different physical machine than Hive

    Used to actually build the environment

Linux MySQL installation

1) the unloading

​ $ rpm -qa | grep mysql

$sudo RPM -e mysql-libs-5.1.71-1.el6.x86_64 –nodeps

2) installation

$sudo cp-r /opt/software/x86_64/ /var/cache/yum/

​ $ sudo yum install -y mysql-server mysql mysql-devel

3) Start the MySQL service

​ $ sudo service mysqld start

4) Set a password

$/usr/bin/ mysqladmin-u root password $/usr/bin/ mysqladmin-u root password

5) Boot and start

​ $ sudo chkconfig mysqld on

6) Authorize root privileges and set remote login

The login

​ $ mysql -u root -p

authorization

Grant all privileges on *.* to 'root'@'%' identified by 'insert '; Grant all privileges on *.* to 'root'@'linux01' identified by 'mysql '; -- Must have this sentence, % includes all

This privileges is provided by all users

.all tables for all databases

‘root’@’%’ Log in as root on any host

‘root’@’linux03.ibf.com’ Log in as root on linux03 host

By ‘root’ use root as the password

7) Refresh the authorization

mysql> flush privileges;

8) Test whether you can log in in Windows

 mysql -h linux03.ibf.com -u root -p

Hive Environment Setup: Local mode

You must first install HDFS and YARN

1) installation:

$tar-zxvf /opt/software/hive -0.13.1-bin.tar.gz-c /opt/modules/

Rename the Hive folder name

​ $ cd /opt/modules

$mv apache – hive – 0.13.1 – bin/hive – 0.13.1 /

2) Create the TMP directory and the Hive repository on HDFS

​ $ bin/hdfs dfs -mkdir -p /user/hive/warehouse

$bin/ HDFS dfs-mkdir/TMP # already exists

​ $ bin/hdfs dfs -chmod g+w /user/hive/warehouse

​ $ bin/hdfs dfs -chmod g+w /tmp

3) Modify configuration

$CD hive – 0.13.1 /

​ $ cp conf/hive-default.xml.template conf/hive-site.xml

​ $ cp conf/hive-log4j.properties.template conf/hive-log4j.properties

​ $ cp conf/hive-env.sh.template conf/hive-env.sh

3-1) modify the hive – env. Sh

JAVA_HOME=/opt/modules/jdk1.7.0_67 =/opt/modules/hadoop-2.5.0 export HIVE_CONF_DIR = / opt/modules/hive - 0.13.1 / conf

3-2) modify the hive. Site. XML

<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://linux01:3306/metastore? createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property>

3-3) Modify the log configuration hive-log4j.properties

Hive. Log. Dir = / opt/modules/hive – 0.13.1 / logs

3-4) Copy the JDBC driver to the Hive lib directory

$cp/opt/software/mysql connector – Java – 5.1.34 – bin. Jar/opt/modules/hive – 0.13.1 / lib /

4) Make sure YARN and HDFS are started

​ $ jps

​ 6468 ResourceManager

​ 6911 Jps

​ 6300 RunJar

​ 6757 NodeManager

​ 2029 NameNode

​ 2153 DataNode

Use bin/hive to get into hive

Hive startup and basic use

Enter the hive directory

$CD/opt/modules/hive – 0.13.1 /

Into the hive

​ bin/hive

Basic commands

​ show databases;

​ create database mydb;

​ use mydb;

​ show tables;

Create tables and load data

create table student (
id int comment 'id of student',
name string comment 'name of student',
age int comment 'age of student',
gender string comment 'sex of student',
addr string
)    
comment 'this is a demo'
row format delimited fields terminated by '\t';

By default, tables are created in /user/hive/warehouse

Through the hive. Metastore. Warhouse. Dir configuration

  • See the table

    desc student; View table fields

    or

    desc formatted student; You can view the metadata

  • At this point, MySQL’s Metastore database was in condition

    mysql> select * from TBLS;

    +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+-------------- ------+--------------------+ | TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+-------------- ------+--------------------+ | 1 | 1556132119 | 6 | 0 | chen | 0 | 1 | student | MANAGED_TABLE | NULL | NULL | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+-------------- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + 1 row in the set (0.00 SEC)

    mysql> select * from COLUMNS_V2;

    +-------+-----------------+-------------+-----------+-------------+ | CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX | +-------+-----------------+-------------+-----------+-------------+ | 1 | NULL | addr | string | 4 | | 1 |  age of student | age | int | 2 | | 1 | sex of student | gender | string | 3 | | 1 | id of student | id | int | 0 | | 1 | name of student | name | string | 1 | +-------+-----------------+-------------+-----------+-------------+ 5 rows in Set (0.00 SEC)
  • Load data (create student.log in your home directory)
load data local inpath '/home/hadoop/student.log' into table student;
  • Mysql > load from HDFS (select student.data from HDFS and select student.data from HDFS);

    load data inpath ‘/input/student.data’ into table student;

Set the configuration from within the command line

Restart the invalid

set hive.cli.print.header=true; # column names

set hive.cli.print.current.db=true; # the name of the table

reset; reset

Restart the effective

<property>    

​    <name>hive.cli.print.header</name>

​    <value>true</value>

</property>

<property>

​    <name>hive.cli.print.current.db</name>

​    <value>true</value>
</property>

With Linux interaction

! ls

! pwd

Interact with hadoop

dfs -ls /

dfs -mkdir /hive

The script for the hive

– e SQL execution

-f Executes SQL files

-S silent execution

hive -e

$ bin/hive -e “select *from test_db.emp_p”

hive -f

$ bin/hive -S -f /home/hadoop/emp.sql > ~/result.txt

Delete table

drop table user;

Clear the table

truncate table user;

Table type

Internal table (management table MANAGED_TABLE)

create table emp(
empId int,
empString string,
job string,
salary float,
deptId int
)
row format delimited fields terminated by '\t';
load data inpath '/input/dept.txt' into table dept;
# load data local inpath '/home/hadoop/dept.txt' into table dept;

External table (EXTERNAL_TABLE)

create external table emp_ex (
empId int,
empName string,
job string,
salary float,
deptId int
)
row format delimited fields terminated by '\t'
location '/hive/table/emp';

Moves the data to the table location

hive (mydb)> dfs -mv /input/emp.txt /hive/table/emp/emp.txt 

Server load

hive (mydb)> load data local inpath '/home/hadoop/emp.data' into table emp;

Or use the DFS command directly to move the data into the Hive table directory

hive (mydb)> dfs -put /home/hadoop/emp.data  /hello/table/emp;

Differences between internal and external tables

  1. Create a table

    External table When creating a table, we need to use EXTERNAL

  2. Delete table

    When an external table is deleted, it will delete only the table’s metadata information, not the table’s data.

    When an internal table is deleted, both the metadata information and the table data are deleted

  3. Internal table data is managed by Hive itself and external table data is managed by HDFS
  4. Internal table is the location of the data storage hive. Metastore. Warehouse. The dir (default: / user/hive/warehouse), the external table data storage location shall be formulated by himself;

The partition table

Create partition table

create table emp_part(
empno int,
empname string,
empjob string,
mgrno int,
birthday string,
salary float,
bonus float,
deptno  int
)
partitioned by (province string)
row format delimited fields terminated by '\t';

Load data to the partitioned table

Specify the partition value explicitly

load data local inpath '/home/user01/emp.txt' into table emp_part partition (province='CHICAGO');

To partition

Check the partition

show partitions emp_part;

Add the partition

alter table emp_part add partition (province='shanghai');

Deleted partitions

alter table emp_part drop partition (province='shanghai');

Add data to the partition

Load data local inpath 'into table emp_part partition (type =' Shanghai ');

Query partition data

select * from emp_part where province='henan';

Secondary partition

Create a secondary partition

create table emp_second(
id int ,
name string,
job string,
salary float,
dept int
)
partitioned by (day string,hour string)
row format delimited fields terminated by '\t';

Add the partition

alter table emp_second add partition (day='20180125',hour='16');

Deleted partitions

alter table emp_second drop partition (day='20180125');

Specify the partition when adding data (no partition will be created)

load data local inpath '/home/hadoop/emp.log' into table emp_second partition (day='20180125',hour='17');

Bucket list

Join two tables that have buckets divided on the same column, using a Map Side Join

Make sampling more efficient

Set hive. Enforce. Bucketing =true

create table bucketed_users(id int, name string)
clustered by (id) into 4 buckets

The bucket into which the data is sorted is determined by taking the hash value of the specified column mod the number of buckets

Summary of import mode

The local import

LOAD DATA LOCAL INPATH ‘INTO TABLE

Bin/HDFS dfs-put (HDFS, HDFS, HDFS, HDFS)

HDFS import

LOAD DATA INPATH ‘HDFS’ INTO TABLE

Covered writing

LOAD DATA INPATH ‘HDFS path’ overwrite into table name

LOAD DATA LOCAL INPATH ‘OVERWRITE INTO TABLE

Insert the results of a SELECT into a table with an INSERT statement

insert into table test_tb select * from emp_p;

Load the data when the table is created

create external table test_tb (

    id int,

    name string

)

row format delimited fields terminated by '\t';

location "/hive/test_tb";

Sqoop way

Export way

Hive script

hive -e

bin/hive -e “use test_db; select * from emp_p” > /home/hadoop/result.txt

Hive -f executes SQL files

Bin /hive -f >> /home/hadoop/result.txt

hive>

  1. Export to local (default delimiter is ASSII 001)

    insert overwrite local directory ‘/home/hadoop/data’ select * from emp_p;

    insert overwrite local directory ‘/home/hadoop/data’ row format delimited fields terminated by ‘^’ select * from emp_p;

  2. To the HDFS

    hive > insert overwrite directory ‘/data’ select * from emp_p;

  3. The export and import (HDFS)

    hive > export table emp_p to ‘/input/export’ ;

    hive > import table emp_imp from ‘hdfs_path’ ;

HQL

http://hive.apache.org/

Common grammar

Wildcard * Specifies the field

select id,name from emp;

where

Conditions of the query

select * from emp_p where salary > 10000;

between and

select * from emp_p where sal between 10000 and 15000;

is null| is not null

select * from user where email is not null;

in () | not in ()

Select * from emp_p where did in (1,2,3);

Aggregation function

count max min sum avg

select count(1) personOfDept from emp_p group by job;

select sum(sal) from emp_p;

distinct

select distinct id from emp_part;

select distinct name, province from emp_part;

The subquery

Select id,ename,salary,did from emp where emp. Did in (select id,ename,salary,did from emp where emp. Did in (select id from dept where dname=’ 1 ‘);

Table joins

Emp. Eid emp. Ename emp. Salary emp. Did 1001 Jack 10000.0 1 1002 Tom 2000.0 2 1003 Lily 20000.0 3 1004 aObama 10000.0 5 1005 Yang 10000.0 6
Dept. did dept.dname dept.dtel 1 Personnel Department 021-456 2 Finance Department 021-234 3 Technology Department 021-345 4 BI Department 021-31 5 Product Department 021-232

product

select * from dept, emp;

select * from emp, dept where emp.did=dept.did;

join

select t1.eid, t1.ename, t1.salary,t2.did ,t2.dname from emp t1 join dept t2 on t1.did=t2.did;

Even outside the query

left join

select eid,ename, salary,t2.did, t2.dname from emp t1 left join dept t2 on t1.did = t2.did;

right join

select eid,ename, salary,t2.did, t2.dname from emp t1 right join dept t2 on t1.did = t2.did;

All connection

select eid,ename, salary,t2.did, t2.dname from emp t1 full join dept t2 on t1.did = t2.did;

Four sorts of HQL

Sort all data By global Order

select * from emp_part order by salary;

Set the number of reduce to 3, also only one file

set mapreduce.job.reduces=3;

Sort by each reduce

The bottom layer is done before the reduce function

Set the number of reduce

set mapreduce.job.reduces=2;

insert overwrite local directory ‘/home/hadoop/result’ select * from emp_part sort by salary; # Reduce the order by number by 1

Partition sort (distribute by sets partitions and sort by sets intra-partition sort)

set mapreduce.job.reduces=3;

Partial partitions are used here, and salaries are sorted

insert overwrite local directory ‘/home/hadoop/result’ select * from emp_part distribute by deptno sort by salary;

Cluster By (distribute By and sort By are used when conditions are identical)

The connection method

configuration

Modify the hive – site. XML

<property>
<name>hive.server2.long.polling.timeout</name>
<value>5000</value>
</property>

<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>

<property>
<name>hive.server2.thrift.bind.host</name>
<value>bigdata.ibf.com</value>
</property>

Mysql > create user from mysql. mysql > create user from mysql. SQL

CREATE USER 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456'; GRANT ALL ON metastore.* TO 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456'; GRANT ALL ON metastore. GRANT ALL ON metastore.* TO 'hadoop'@'%' IDENTIFIED BY '123456'; 3) Refresh authorization flush privileges;

beeline

Start the service

$bin/ hiveServer2 & $bin/hive --service hiveServer2 &

The connection

$ bin/beeline beeline>! Connect to jdbc:hive2://bigdata.ibf.com: 10000 enter the mysql mysql user name input password

Introduction to SQOOP, installation and deployment

Function: for data import and export between HDFS and RDBMS

All imports and exports are based on HDFS

Data analysis process

Data acquisition log; RDBMS; Use SQOOP to collect the data that needs to be analyzed to HDFS data cleaning field filtering field completion - "import the fields that need to be analyzed to HDFS field formatting data analysis to store the analyzed data in HDFS export the result data from HDFS to MySQL data display to read data from the RDBMS

SQOOP supports: HDFS, Hive, HBase

The bottom layer of sqoop

"SQOOP" uses SQOOP command to achieve different requirements with different parameters - "SQOOP" parses and passes to the underlying MapReduce template according to different parameters - "SQOOP" makes the encapsulated MapReduce into JAR package and submit it to yarn for execution - "This MapReduce has only MapTask. There is no reducetask

version

- SQOOP1 - SQOOP2: - Added a security mechanism to SQOOP1

Install the deployment

Download decompression

Tar-zxvf /opt/software/sqoop-1.4.5-cdh5.3.6.tar.gz -c /opt/cdh-5.3.6/

Modifying configuration files

$PWD /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6 $cp conf/sqoop-env-template.sh conf/sqoop-env.sh

Modify sqoop – env. Sh

Export HADOOP_COMMON_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6 #Set path to where hadoop-*-core.jar is available export HADOOP_MAPRED_HOME=/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6 #Set the path to where bin/hive is available export HIVE_HOME = / opt/CDH - 5.3.6 / hive - 0.13.1 - cdh5.3.6

Place the MySQL connection driver in the SQOOP lib directory

$cp/opt/software/mysql connector - Java - 5.1.34 - bin. Jar/opt/CDH - 5.3.6 / sqoop - 1.4.5 - cdh5.3.6 / lib /

Use the test

View command information

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop help

View the database

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop list - databases \ - connect jdbc:mysql://linux03.ibf.com: 3306 / --username root \ --password 123456

Fixed error reporting issue with sqoop1.4.6-cdh-5.14.2

In SQOOP-1.4.6, you need to add the java-json package

$cp/opt/software/Java - json. Jar/opt/cdh5.14.2 / sqoop - 1.4.6 - cdh5.14.2 / lib /

Solved an issue where the Hive warehouse could not be found

$ cp ${HIVE_HOME}/conf/hive-site.xml ${SQOOP_HOME}/conf/

Append Hive dependencies to HADOOP_CLASSPATH

$ sudo vi /etc/profile #HADOOP_CLASSPATH export HADOOP_CLASSPATH = $HADOOP_CLASSPATH: / opt/cdh5.14.2 / hive - 1.1.0 - cdh5.14.2 / lib / * source/etc/profile

The import of sqoop

Bin /sqoop import –help # View the command prompt

Imported into HDFS

MySQL > source: a table from MySQL

Target: HDFS for a path

Create a test table in MySQL

Add data to MySQL

use test_db; create table user( id int primary key, name varchar(20) not null, salary float )charset=utf8; Insert into user values(1, 1, 2, 1); Insert into user values(2, 1,1); Insert into user values(2, 5,6,6);

Example 1: mysql-> HDFS (import to default path)

Test_db. In the mysql user import to the HDFS, default in HDFS: / / linux01:8020 / user/hadoop /

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop import \ > - connect jdbc:mysql://linux03.ibf.com: 3306 / mydb \ > --username root \ > --password 123456 \ > --table user

When there is no reduce, a few maps have several output files

Example 2: mysql-> HDFS to set path and map number

– “specifies the HDFS output directory: –target-dir -” specifies the number of maps: -m

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop import \ > - connect jdbc:mysql://linux03.ibf.com: 3306 / test_db \ > --username root \ > --password root \ > --table user \ > --target-dir /toHdfs \ > -m 1

Case 3:

– > modify export delimiter –fields-terminated — by – > direct import faster – > remove output directory ahead of time

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop import \ > - connect jdbc:mysql://linux03.ibf.com: 3306 / test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ >  --fields-terminated-by '\t' \ > -m 1

Case 4: Import the specified columns: — Columns

[hadoop @ linux03 sqoop - 1.4.5 - cdh5.3.6] $bin/sqoop import \ > - connect jdbc:mysql://linux03.ibf.com: 3306 / mydb \ > --username root \ > --password 123456 \ > --table user \ > --columns name,salary \ > --fields-terminated-by '-' \ > --target-dir /sqoop \ > --delete-target-dir \ > --direct \ > -m 1

Import the result of SQL statement execution into -e,–query

bin/sqoop import \
--connect jdbc:mysql://bigdata01.com:3306/test \
--username root \
--password 123456 \
-e 'select * from user where salary>9000 and $CONDITIONS' \
--target-dir /toHdfs \
--delete-target-dir \
-m 1

Where $CONDITIONS must be included in the -e query above,

$CONDITIONS' where salary>9000 and $CONDITIONS'

You can set a password file (change –password to –password-file)

SQoop reads the entire password-file, including Spaces and return, and generates a password file using the echo -n command, such as echo -n “secret” > password-file

$ echo -n 'root' > /home/hadoop/mysqlpasswd && chmod 400 /home/hadoop/mysqlpasswd
bin/sqoop import \
--connect jdbc:mysql://bigdata01.com:3306/test \
--username root \
--password-file  file:///home/hadoop/mysqlpasswd \
-e 'select * from toHdfs where $CONDITIONS' \
--target-dir /sqoop \
--delete-target-dir \
-m 1

Imported into the HIVE

Hive, which specifies that a table is created if it is not in the database

bin/sqoop import \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
-P \
--table user \
--fields-terminated-by '\t' \
--delete-target-dir \
-m 1 \
--hive-import \
--hive-database test_db \
--hive-table user

Delta import

Process MapReduce Imports Data to HDFS User's Home Directory Imports Data from Home Directory to Hive Table Incremental Import Append: Based on the last value of the last import in a column, determine the timestamp of the append data: --check-column <column> Source column to check for incremental change --incremental <import-type> Define an incremental import of type 'append' or 'lastmodified' --last-value <value> Last imported value in the incremental check column

This file is created if it is not available on HDFS

bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user  \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3

sqoop job

Create an SQOOP job that automatically creates an increment.

There are two SQOOP job-related commands:

bin/sqoop job

bin/sqoop-job

You can use either one

Create job: –create

Delete job: –delete

Job: –exec

Job: –show

List job: –list

Create a job

bin/sqoop-job \
--create your-sync-job \
-- import \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
-P \
--table user \
-m 1 \
--target-dir /hive/incremental \
--incremental append \
--check-column id \
--last-value 1 

Look at the job

bin/sqoop-job –show your-sync-job

bin/sqoop job –show your-sync-job

bin/sqoop job –exec your-sync-job

bin/sqoop job –list

bin/sqoop job –delete my-sync-job

The export of sqoop

Export data from Hive (files and directories on HDFS),HDFS, to MySQL

use mydb
create table user_export(
id int primary key,
name varchar(20) not null,
salary float
); 

You need to create tables in the database now

bin/sqoop export \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
-P \
--table user_export \
--export-dir /hive/incremental \
--input-fields-terminated-by ',' \
-m 1  

Use sqoop – options – the file

  1. Create file vi sqoopScript (all parameters one line)

Edit the file sqoopScript

export
--connect 
jdbc:mysql://linux03.ibf.com:3306/test_db
--username
root
-P
--table
emp
-m
1
--export-dir
/input/export
--fields-terminated-by
"\t"
  1. Execute the export ()

bin/sqoop –options-file ~/sqoopScript

content

Hive simple case requirements analysis and results export

The introduction and use of dynamic partition

Load dynamically into Hive tables using scripts

Hive function

Simple log traffic example

1 Demand and analysis

demand

Analyze and count PV and UV counts per hour per day

Analysis of the

Create data source table

Create partition table (day, hour)/ load data

Data cleaning

Create a hive table

Field filtering

Id url guid field completion (none) field formatting (none)

The data analysis

Pv: count(url) uv: count(distinct guid)

Save the result

Date (day) hours PV UV

The results derived

Export to MySQL

2. Specific Implementation

The original data table

1) create table

create database if not exists hive_db;

user hive_db;

create table tracklogs(

id string,

url string,

referer string,

keyword string,

type string,

guid string,

pageId string,

moduleId string,

linkId string,

attachedInfo string,

sessionId string,

trackerU string,

trackerType string,

ip string,

trackerSrc string,

cookie string,

orderCode string,

trackTime string,

endUserId string,

firstLink string,

sessionViewNo string,

productId string,

curMerchantId string,

provinceId string,

cityId string,

fee string,

edmActivity string,

edmEmail string,

edmJobId string,

ieVersion string,

platform string,

internalKeyword string,

resultSum string,

currentPage string,

linkPosition string,

buttonPosition string

)

partitioned by (date string,hour string)

row format delimited fields terminated by ‘t’;

2) Load data

load data local inpath ‘/opt/datas/2015082818′ into table tracklogs partition(date=’20150828′,hour=’18’);

load data local inpath ‘/opt/datas/2015082819′ into table tracklogs partition(date=’20150828′,hour=’19’);

Analysis of the

1) Establish data analysis tables

create table clear (

id string,

url string,

guid string

)

partitioned by (date string, hour string)

row format delimited fields terminated by ‘t’;

2) Filter the data

insert into table clear partition(date=’20150828′,hour=’18’) select id,url,guid from tracklogs where date=’20150828′ and hour=’18’;

insert into table clear partition(date=’20150828′,hour=’19’) select id,url,guid from tracklogs where date=’20150828′ and hour=’19’;

3) Indicator analysis

pv : select date,hour,count(url) as pv from clear group by date,hour;

uv: select date,hour, count(distinct guid) as uv from clear group by date,hour;

Save the result to result

create table result as select date,hour, count(url) pv, count(distinct guid) as uv from clear group by date,hour;

When creating a table with no delimiter specified, the default delimiter is 001 and the result is exported to MySQL

# to create table

create table result(

day varchar(30),

hour varchar(30),

pv varchar(30) not null,

uv varchar(30) not null,

primary key(day,hour)

);

# export data

[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$bin/sqoop export \

–connect jdbc:mysql://linux03.ibf.com:3306/mydb \

–username root \

–password root \

–table result \

–export-dir /user/hive/warehouse/hive_db.db/result \

–input-fields-terminated-by ‘001’ \

-m 1

Dynamic partition table

Enable dynamic partitions

set hive.exec.dynamic.partition=true;

set hive.exec.dynamic.partition.mode=nonstrict;

When dynamic partitioning is enabled, strict and nonstrict mode are optional. Strict requires at least one static partition column, but nonstrict does not.

Create a table

create table clear_dynamic (

id string,

url string,

guid string

)

partitioned by (date string, hour string)

row format delimited fields terminated by ‘t’;

Dynamically loaded data

Load 20180129 directly for all hour data

insert into table clear_dynamic partition(date=’20180129′,hour) select id,url,guid,hour from tracklogs where date=’20180129′;

Auto partition according to hour

I used to write it this way

insert into table clear partition(date=’20150828′,hour=’18’) select id,url,guid from tracklogs where date=’20150828′ and hour=’18’;

insert into table clear partition(date=’20150828′,hour=’19’) select id,url,guid from tracklogs where date=’20150828′ and hour=’19’;

Load dynamically into Hive tables using scripts

20180129 /

​ 2018012900

​ 2018012901

​ 2018012902

​ 2018012903

​ 2018012904

​ 2018012905

1) write shell_ script (bin/hive -e “”)

2) Test scripts

show partitions tracklogs; # View partition

alter table tracklogs drop partition(date=’20150828′,hour=’18’); Deleted partitions

alter table tracklogs drop partition(date=’20150828′,hour=’19’);

select count(1) from tracklogs; # Check the number of records

3) Use shell script (bin/hive -f)

4) test

show partitions tracklogs; # View partition

alter table tracklogs drop partition(date=’20150828′,hour=’18’); Deleted partitions

alter table tracklogs drop partition(date=’20150828′,hour=’19’);

select count(1) from tracklogs; # Check the number of records

Hive function

User-defined functions that implement business logic processing that cannot be implemented in Hive

Type:

UDF: One in, one out

UDAF: sum,count, etc

UDTF: One in many out column conversion

Write the UDF:

Writing a UDF must inherit from it

At least one evaluale method must be implemented

There must be a return type, which can be NULL

The Hadoop serialization type is recommended

Requirement: date conversion

​ 31/Aug/2015:00:04:37 +0800 –> 2015-08-31 00:04:37

Implementation steps

1) Custom class implementation UDF class

2) Do not specify the main class for packaging

3) Add to Hive

Imports Hadoop packages and Hive packages in Maven

< the dependency > < groupId > org. Apache. Hive < / groupId > < artifactId > hive - exec < / artifactId > < version > 1.2.2 < / version > </dependency>

Examples of concrete implementation

package com.myudf; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class DateFormate extends UDF { SimpleDateFormat inputDate = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat outDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 31/Aug/2015:00:04:37 +0800 --> 2015-08-31 00:04:37 public Text evaluate(Text str) { if(str == null) { return null; } if(StringUtils.isBlank(str.toString())) { return null; } Date date = null; String val = null; try { date = inputDate.parse(str.toString()); val = outDate.format(date); } catch (ParseException e) { e.printStackTrace(); } return new Text(val); } public static void main(String[] args) { Text val = new DateFormate().evaluate(new Text("31/Aug/2015:00:04:37 + 0800 ")); System.out.println(val); }}

hive (test_db)>add jar /home/hadoop/DDD.jar;

hive (test_db)> CREATE TEMPORARY FUNCTION removequote as ‘com.myudf.date.RemoveQuoteUDF’;

hive (test_db)> show functions;

Hive Compression Format

Compressed format

Bzip2, gzip, lzo, snappy, etc

Compression ratio: bzip2>gzip>lzo bzip2

Compression decompression speed: LZO > GZIP > BZIP2 LZO

The compressed format supported by Hadoop

bin/hadoop checknative  -a

http://google.github.io/snappy/

Configuration of compression

  • Compile Hadoop source code:
mvn package -Pdist,native,docs -DskipTests -Dtar  -Drequire.snappy
  • Replace the $HADOOP_HOME/lib/native

Shut down Hadoop related processes

Unzip cdh5.xx-snappy-lib-native.tar.gz to $HADOOP_HOME/lib

$tar-zxvf native hadoop-cdh5.14.2.tar.gz-c /opt/modules/hadoop-2.6.0-cdh5.14.2/lib
  • Unzip is supported in the second check
You can observe that $bin/ Hadoop CheckNative-A is already supported
  • Configure Hadoop (JobHistory can view all configuration information)

Mapred – site. The XML configuration

<property>
    <name>mapreduce.map.output.compress</name>
    <value>true</value>
</property>

<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>    
  • test

Run PI program: $bin/yarn jar share/hadoop/graphs/hadoop – graphs – examples – 2.5.0 – cdh5.3.6. Jar 1 2 PI

Observe the compressed configuration in the Configuration for this task from Host :19888

  • Configure the compression format through Hive

The SHUFFLE phase enables compression

set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

Reduce the output of the result file for compression

set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

File storage formats in Hive

format

create table (
...
)
row format delimited fields terminated by ''
STORED AS file_format

The file format is as follows

TEXTFILE

RCFILE

ORC

PARQUET

AVRO

INPUTFORMAT

Common file format (default is textfile)

| ORC – (Note: the Available in Hive 0.11.0 and later)

| PARQUET, PARQUET is based on the data model and algorithm of Dremel. This is a very common one

Data storage type

According to the row storage

Write fast

According to the storage

Read fast

  1. You can skip data that doesn’t qualify and read only the data you need, reducing the amount of IO data.
  2. Compression coding reduces disk storage space. Since the data types of the same column are the same, you can use more efficient compression encoding to further save storage space.
  3. Read only the required columns, support vector operation, can obtain better scan performance.

Verify storage format and compression

Using the given log file (18.1MB)

Use different storage formats to store the same data and determine the file size

Enable compression during the shuffle phase of MapReduce (compression of intermediate data reduces the amount of data transferred between Map and Reduce tasks. For IO type jobs, speed up.

set hive.exec.compress.intermediate=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; 

Compress the output

set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

Create table FILE_TEXT and load the data

create table if not exists file_text(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored  as  textfile;
load data local inpath '/home/hadoop/page_views.data' into table file_text;

Compare the default format with the file_orc_snappy data size comparison

create table if not exists file_orc_snappy( t_time string, t_url string, t_uuid string, t_refered_url string, t_ip string, t_user string, t_city string ) row format delimited fields terminated by '\t' stored as ORC tblproperties("orc.compression"="Snappy"); insert into table file_orc_snappy select * from file_text; Load is an HDFS put, so it can't be compressed. You have to insert into MapReduce to make the compression work

Compare the default format with the data size comparison in Parquet format

create table if not exists file_parquet(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as parquet;

insert into table file_parquet select * from file_text;

Snappy compresses data size by comparing the default format with the Parquet format

create table if not exists file_parquet_snappy(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as parquet
tblproperties("parquet.compression"="Snappy");

insert into table file_parquet_snappy select * from file_text;
hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet_snappy;
hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet;       

Regular loading data is used in Hive

Loading complex format log files by regular matching

1 regular

2 Load data from the log

The log

“27.38.5.159” “-” “31 / Aug / 2015:00:04:53 + 0800” “the GET/course/view. PHP? Id = 27 HTTP / 1.1 “” 200″ “7877”, “http://www.ibf.com/user.php?act=mycourse&testsession=1637” “Mozilla / 5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36” “-” “

Create a table

CREATE TABLE apachelog ( remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_set string, request_body string, http_referer string, http_user_agent string, http_x_forwarded_for string, host string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "(\"[^ ] * \ ") (\ "- | [^] * \") (\ "* \ [^ \]]") (\ "* \ [^ \]]") (\ "[0-9] * \" (\ [0-9] * \ "") (- | (^) *) (\ [^] * \" ") (\ [^ \] * \ "") (\" - | [^] * \ ")  (\"[^ ]*\")" ) STORED AS TEXTFILE;
load data local inpath '/home/hadoop/moodle.ibf.access.log' into table apachelog;

Hive optimization

Split large tables into small tables

External table + partitioned table

Storage Format & Data Compression

SQL optimization

Parallel execution

//Whether to execute jobs in parallel

set hive.exec.parallel=true;

//How many jobs at most can be executed in parallel

set hive.exec.parallel.thread.number=8; # can be increased to improve parallelism efficiency

mapreduce

  • Reduce the number of

set mapreduce.job.reduces=1

  • The JVM reuse

Graphs. The job. The JVM. Numtasks = 1 1 by default

  • Speculated that perform

Hive configuration, default to true

set hive.mapred.reduce.tasks.speculative.execution=true;

hadoop

mapreduce.map.speculative true

mapreduce.reduce.speculative true

Set the output file merge

Size of merged files at the end of the job

Merge small files to avoid reducing HDFS performance by storing large numbers of small files

set hive.merge.size.per.task=256000000;

Strict mode

set hive.mapred.mode=strict; The nonstrict default

In strict mode,

Partition table, must add partition field filter condition

For ORDER BY, you must use LIMIT

Queries that limit Cartesian product (join with WHERE instead of ON)

hive join

map join

If one of the two tables in the associated query is included in a map join, add the small table to memory

Hive. Mapjoin. Smalltable. Filesize = 25000000 default size

Hive. Auto.Convert.Join = True Enable by default

If MAPJOIN is not enabled, make small tables using a statement and use MAPJOIN

select /+ MAPJOIN(time_dim) / count(1) from

store_sales join time_dim on (ss_sold_time_sk = t_time_sk)

reduce join

Join two large tables

Group the associated keys

smb join

Sort-Merge-Bucket join

Solve the problem of slow join between large tables

The hash value of the bucket field is used to take the remainder of the number of buckets and divide the buckets

set hive.enforce.bucketing=true;

CREATE TABLE; CREATE TABLE;

field

)

Clustered by(ID) into (ID) buckets;

Such as

create table student(

id int,

age int,

name string

)

clustered by (id) into 4 bucket

row format delimited fields terminated by ‘,’;

Small file processing

  1. Recreate the table and reduce the amount of reduce when creating the table
  2. Set the number of map/reduce by adjusting the parameters

// Maximum input size per Map (this determines the number of merged files)

set mapred.max.split.size=256000000;

// The minimum size of a split on a node (this value determines whether files on multiple DataNodes need to be merged).

set mapred.min.split.size.per.node=100000000;

// The minimum size of a split on a switch (this value determines whether files on multiple switches need to be merged)

set mapred.min.split.size.per.rack=100000000;

// Merge small files before executing the Map

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

// Set the map-side output to merge. Default is true

set hive.merge.mapfiles = true

// Set the reduce side output to merge. Default is false

set hive.merge.mapredfiles = true

// Set the size of the merged file

set hive.merge.size.per.task = 256000000

// When the average output file size is less than the set value, start the merge operation. This setting will only be valid for the operation if hive.merge.mapfiles or hive.merge.mapredfiles is set to true.

set hive.merge.smallfiles.avgsize=16000000

Data skew

Essential reason: the uneven distribution of key

The Map side is partially aggregated, equivalent to a Combiner

hive.map.aggr=true

Load balancing is performed when there is data skew

hive.groupby.skewindata=true

If the selection is set to true, the generated query plan will have two Mr Jobs. In the first Mr Job, the output result set of Map will be randomly distributed to Reduce, and each Reduce will perform partial aggregation operation and output results. The result of such processing is that the same Group By Key may be distributed to different Reduce. So as to achieve the purpose of load balance; The second Mr Job will distribute the preprocessed data results to Reduce according to Group By Key (this process can ensure that the same Group By Key is distributed to the same Reduce), and finally complete the final aggregation operation.

Hive Case Study: Log Analysis

Nn 1) UV: count(distinct guid) Visits a computer client of your site as a visitor. The same client will only be counted once between 00:00 and 24:00. PV: Page View– — Count (URL) is the number of Page views or clicks, the user each refresh is counted once. 3) Number of visitors: the number of visitors who log in to the website [members], the number of endUserid with value 4) Number of visitors: the number of people who have no access to the website, the number of people whose endUserid is empty 5) Average visit time:

Average time visitors spend on the site trackTime --> max-min

6) Two-hop rate: PV >1 page view/total page view

Average number of users viewing 2 or more pages (PV >1)/total number of users (Discont GUID) 1 click

The concept of two-hop rate is that the first click a user makes on a website page after it has expanded is called “two hops”, and the number of two hops is called “two hops”. The ratio of the number of second hops to the number of page views is called the page’s second hop rate.

count(case when pv >=2 then guid else null end ) / discont (guid)

7) Independent IP: — Count (DISTINCT IP) Independent IP represents the number of visits to your website by computers with a particular unique IP address. Because this statistical method is relatively easy to achieve and has a high degree of authenticity, it becomes an important indicator for most organizations to measure website traffic. You are ADSL dial-up Internet access, for example, you’ve dialed a number automatically assigned an IP, so that you entered the site, that even if an IP, when you cut off without clear cookies, then dial the number, and automatically assigned to an IP, you came in the site again, then again counted one IP, but the UV (unique visitors) does not change, Because you entered the site both times.

The date of uv pv The number of login The number of tourists Average visit time Second rate Independent IP number

Window function analysis function

Over the statement

Prepare test data

hive (db_analogs)> create database ts;
hive (db_analogs)> use ts;
hive (ts)> create table testscore(gender string,satscore int, idnum int) row format delimited fields terminated by '\t';
hive (ts)> load data local inpath '/opt/datas/TESTSCORES.csv' into table testscore;

Over with standard aggregates: COUNT, SUM, MIN/MAX, AVG

Requirements: 1

The SATSCORE scores are sorted (in descending order) by gender, and the last column shows the highest score in the group

Female  1000    37070397        1590
Female  970     60714297        1590
Female  910     30834797        1590
Male    1600    39196697        1600
Male    1360    44327297        1600
Male    1340    55983497        1600

The answer to SQL:

hive (ts)>  select gender,satscore,idnum,max(satscore) over(partition by gender order by satscore desc) maxs  from testscore;

Note:

Partition by is used for grouping

Analysis of the function

Request topN

They are grouped by gender, SatScore sorted (in descending order), and the last column shows the rank in the group

Requirements: 1

The same score is different in rank, and the rank is increased according to the number of rows

Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 3 Female 1490 9589297 4 Female 1390 99108497 5 Female Female 1380 81994397 has the same score as Female 1380 81994397

Requirement 2:

The score is the same, the rank is the same, and the rank increases according to the number of rows

Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 2 Female 1490 9589297 4 Female 1390 99108497 5 Female Female 1380 81994397 has the same score as Female 1380 81994397

Requirements:

Score the same rank the same, the rank increases continuously

Female 1590 23573597 1 Female 1520 40177297 2 Female 1520 73461797 2 Female 1490 9589297 3 Female 1390 99108497 4 Female  1380 23048597 5 Female 1380 81994397 5

SQL

sql1 hive (ts)> select gender,satscore,idnum,row_number() over(partition by gender order by satscore desc) maxs from testscore; -- ROW_NUMBER() starts at 1, in order, Generated within the group record the sequence of sql2 select gender, satscore, idnum, rank () over (partition by gender order by satscore desc) maxs the from testscore; -- RANK() generates the RANK of the item in the group; Ranked equal will leave empty seats in the rankings sql3 select gender, satscore, idnum, dense_rank () over (partition by gender order by satscore desc) maxs the from testscore; -- DENSE_RANK() generates the rank of a data item in the group. Equal rank will not leave a space in the rank

The window function

The window clause default is “RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW” AND “ORDER BY” BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.

# when the order by clause AND the window is not, the default window clause ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING behind (from start to finish)

UNBOUNDED PRECEDING

UNBOUNDED FOLLOWING

1 PRECEDING

1 FOLLOWING

CURRENT ROW

Window contrast

select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sums  from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND unbounded following) sums  from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN 1 PRECEDING AND  CURRENT ROW) sums from testscore;

Current row data amplitude after +1 range

LAG

Laggard value (n values), default to one value behind if no number of laggards is specified (data is displayed from top down, laggard is the value displayed before the current value)

Scenario: Analyze the user’s page browsing order

sql

hive (ts)> select gender,satscore,idnum, lag(satscore) over(partition by gender order by satscore desc) as lastvalue from testscore;

requirements

Gender Satscore idnum lastvalue Female 1590 23573597 NULL SELECT * FROM 'Female 1520' WHERE (' Female 1520 '); SELECT * FROM 'Female 1490' WHERE (' Female 1520 '); SELECT * FROM 'Female 1520' WHERE (' Female 1490 ') 9589297 1520 Female 1390 99108497 1490

LEAD

In the opposite direction of LAG (the lower N is put aside), the previous value (lead value) defaults to one value ahead (data is displayed from top to bottom; lead is the value shown after the current value).

sql

hive (ts)> select gender,satscore,idnum, lead(satscore, 1, 0) over(partition by gender order by satscore desc) as nextvalue from testscore;

The results of

gender satscore idnum nextvalue ... Female 1060 59149297 1060 Female 1060 46028397 1000 Female 1000 37070397 970 Female 970 60714297 910 Female 910 30834797  0