Doris practice

This is the seventh day of my participation in the August More Text Challenge. For details, see: August More Text Challenge;


CentOS/Ubuntu 7.1 and above /16.04 and above Java 1.8 and above Python 2.7 and above

Doris deployment

1 use Docker to develop image compilation


1.1 Pulling a Mirror

Docker pull apachedoris/Doris - dev: build - env - 1.3Copy the code

1.2 Running an Image

Docker run-it --name Doris apachedoris/doris-dev:build-env-1.3Copy the code

1.3 Checking whether the Mirror Is Running successfully

docker ps -a
Copy the code

As shown in the figure, the operation succeeds. Into the container

1.4 the mkdir create Doris

mkdir doris
Copy the code

1.5 to Doris.

Wget HTTP: / / https://mirrors.bfsu.edu.cn/apache/incubator/doris/0.13.0-incubating/apache-doris-0.13.0-incubating-src.tar.gzCopy the code

1.6 unzip

The tar ZXVF - apache - Doris - 0.13.0 - incubating - SRC. Tar. GzCopy the code

1.7 build

You need to modify the Maven configuration before compiling

. Enter the maven

cd usr/share/maven/conf
Copy the code

. Change the warehouse address of setting. XML

<mirror>
        <id>alimaven</id>
        <name>aliyun maven</name>
        <url>https://maven.aliyun.com/repository/public</url>
        <mirrorOf>central</mirrorOf>
    </mirror>
    <mirror>
        <id>alimaven-central</id>
        <name>aliyun maven central</name>
        <url>https://maven.aliyun.com/repository/central</url>
        <mirrorOf>central</mirrorOf>
    </mirror>
    <mirror>
        <id>alimaven-spring</id>
        <name>aliyun maven-spring</name>
        <url>https://maven.aliyun.com/repository/spring</url>
        <mirrorOf>central</mirrorOf>
    </mirror>
    <mirror>
        <id>central</id>
        <name>Maven Repository Switchboard</name>
        <url>https://repo1.maven.org/maven2/</url>
        <mirrorOf>central</mirrorOf>
    </mirror>
    <mirror>
        <id>jboss-public-repository-group</id>
        <mirrorOf>central</mirrorOf>
        <name>JBoss Public Repository Group</name>
        <url>http://repository.jboss.org/nexus/content/groups/public</url>
    </mirror>
    <mirror>
        <id>spring-snapshots</id>
        <mirrorOf>central</mirrorOf>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
    </mirror>
    <mirror>
        <id>spring-milestones</id>
        <mirrorOf>central</mirrorOf>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
    </mirror>
Copy the code

Example Modify the fe configuration file

Create Doris /apache-doris-0.13.0-incubating/SRC/FE before logging in

CD/root/Doris/apache - Doris - 0.13.0 - incubating - SRC/feCopy the code

. Modify the pom. XML file

<! -- for java-cup --> <repository> <id>cloudera-thirdparty</id> < url > https://repository.cloudera.com/content/repositories/third-party/ < / url > / / configure two lines above the following two lines * * * * to * * <id>cloudera-public</id> <url>https://repository.cloudera.com/artifactory/public/ </url> </repository> <pluginRepositories> <! -- for cup-maven-plugin --> <pluginRepository> <id>spring-plugins</id> < url > https://repo.spring.io/plugins-release/ < / url > / / change the two lines above configuration * * to * * * * < id > two lines below the cloudera - public < id > <url>https://repository.cloudera.com/artifactory/public/</url> </pluginRepository> </pluginRepositories>Copy the code

1.8 Direct Compilation

sh bulid.sh
Copy the code

The output file is produced when the compilation succeeds

1.9 Deploying and Starting FE

Copy the compiled file fe cp in output to opt/ Doris/FE on the host

. Create a folder doris-meta to store metadata

Mkdir /opt/ Doris /fe/doris-meta // The path must be created in advanceCopy the code

. Modify the fe.conf file

vi ./conf/fe.conf
Copy the code

. Add the following configuration

// If the startup fails due to memory problems, change the memory size to 4g. Initial meta_dir = /opt/ Doris /fe/ dors-meta Priority_networks = 192.168.220.133/22 IP addressCopy the code

Start the fe

CD /opt/ Doris /fe/bin sh start_sh --daemon background startup # An error occurs. You can view the error information by viewing fe/log/fe.log or fe/log/fe.outCopy the code

Disable the firewall or enable the corresponding port

Open the browser to access the visual interface

http://192.168.220.145:8030/
Copy the code

As shown in figure effect

Copy the “be CP” file output to opt/ Doris/FE

1.10 Deploying and Starting the BE

Copy the compiled file be cp in output to opt/ Doris /be

Create a folder where be stores data

Mkdir -p /opt/ Doris /be/storage # The path must be created in advanceCopy the code

. Start be

cd /opt/doris/be/bin
sh start_be.sh --daemon
Copy the code

.BE Handling the startup failure

. Change the maximum number of file handles

echo "* soft nofile 204800" >> /etc/security/limits.conf echo "* hard nofile 204800" >> /etc/security/limits.conf echo "* soft nproc 204800" > > / etc/security/limits the conf echo "* hard nproc 204800" > > / etc/security/limits. Conf /etc/sysctl.conf Add echo fs.file-max = 6553560 >> /etc/sysctl.confCopy the code

. Check whether it is successful

cat /etc/security/limits.conf
cat /etc/sysctl.conf
Copy the code

Restart.

reboot -h now
Copy the code

. Restart be

cd /opt/doris/be/bin
sh start_be.sh --daemon
Copy the code

1.11 Connecting using the mysql Client

.

. Install mysql

yum install -y mysql
Copy the code

. Connect to Doris through the mysql client

IP address mysql -h 192.168.220.145 -p 9030 -urootCopy the code

Enter mysql as shown in figure 2

. Add a be

ALTER SYSTEM ADD BACKEND "192.168.220.146:9050"
Copy the code

Disable the firewall.

systemctl disable firewalld
Copy the code

Check the status of the FE and BE nodes

Show the proc '/ frontends; Show the proc '/ backends;Copy the code

The following figure shows the fe node connection status

Be node connection status

. Delete the fe command

Mysql > ALTER SYSTEM DROPP FOLLOWER "192.168.220.145:9050";Copy the code

. Delete the be command

ALTER SYSTEM DROP BACKEND "192.168.220.146:9050";
Copy the code

Pre-compiled versions above 0.13 can be downloaded. Use the compiled components directly from the file.

Doris Precompiled download address

Pre-compiled version downloaded

2 creation and view of data table


2.1 Creating a Database

CREATE DATABASE example_db;
Copy the code

2.2 Run SHOW DATABASES. View database information.

Information_schema exists for compatibility with the MySQL protocol.

Use the CREATE TABLE command to CREATE a TABLE.

First switch the database

USE example_db;
Copy the code

Doris supports single partition and compound partition table creation.

In a composite partition:

The first level is called Partition. You can specify a dimension column as a partition column (currently only integer and time columns are supported) and specify a value range for each partition.

The second level is called Distribution. You can specify one or more dimensional columns and the number of buckets to HASH data.

Composite partitioning is recommended in the following scenarios

Historical data deletion requirements: If there is a need to delete historical data (for example, only the data of the latest N days is retained). With composite partitioning, this can be achieved by deleting the historical partitions. Data can also be deleted by sending a DELETE statement within a specified partition.

.

Create table for single partition

CREATE TABLE table1
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
Copy the code

The schema for this table is as follows:

Siteid: The type is INT (4 bytes). The default value is 10

Citycode: Type SMALLINT (2 bytes)

Username: The type is VARCHAR. The maximum length is 32. The default value is an empty string

Pv: type is BIGINT (8 bytes), default is 0; This is a metric column, and Doris internally aggregates the metric column, and this column is aggregated by SUM.


Create table for compound partition

CREATE TABLE table2
(
    event_day DATE,
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day, siteid, citycode, username)
PARTITION BY RANGE(event_day)
(
    PARTITION p201706 VALUES LESS THAN ('2017-07-01'),
    PARTITION p201707 VALUES LESS THAN ('2017-08-01'),
    PARTITION p201708 VALUES LESS THAN ('2017-09-01')
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
Copy the code

The schema for this table is as follows:

Event_day: The type is DATE. There is no default value

Siteid: The type is INT (4 bytes). The default value is 10

Citycode: Type SMALLINT (2 bytes)

Username: The type is VARCHAR. The maximum length is 32. The default value is an empty string

Pv: type is BIGINT (8 bytes), default is 0; This is a metric column, and Doris internally aggregates the metric column, and this column is aggregated by SUM.

We use the event_day column as the partition column to create three partitions: P201706, P201707, and P201708

P201706: Range is [minimum, 2017-07-01]

P201707: Ranges from [2017-07-01, 2017-08-01]

P201708: Ranges from [2017-08-01, 2017-09-01]

Each partition uses siteid to hash buckets and evenly distribute the buckets in different tables

2.4 Data query

Table1 data insert into table1 values (1, 1, 'Jim, 2), (2, 1, "grace", 2), (3, 2,' Tom ', 2), (4, 3, "bush", 3), (5, 3, 'Helen, 3);Copy the code
Insert data into Table2 Values (' 2017-07-03 ', 1, 1, 'Jim, 2), (' 2017-06-05', 2, 1, "grace", 2), (' 2017-07-12 ', 3, 2, 'Tom', 2), (' 2017-07-15 ', 4, 3, 'bush', 3), (' 2 The 017-08-12 ', 5, 3, 'Helen', 3);Copy the code

A simple query

SELECT * FROM table1 LIMIT 3;
​
SELECT * FROM table1 ORDER BY citycode;
Copy the code

As shown in figure

The Join query

SELECT SUM(table1.pv) FROM table1 JOIN table2 WHERE table1.siteid = table2.siteid;
​
Copy the code

As shown in figure

The subquery

 SELECT SUM(pv) FROM table2 WHERE siteid IN (SELECT siteid FROM table1 WHERE siteid > 2);
Copy the code

As shown in figure

3 Data source Import data

Doris subscription kafka

The data source reads the data and imports the data to Doris. Currently, only data imported from Kakfa is supported in non-authentication or SSL authentication mode.

Grammar:

    CREATE ROUTINE LOAD [db.]job_name ON tbl_name
    [load_properties]
    [job_properties]
    FROM data_source
    [data_source_properties]
Copy the code
  1. [db.]job_name

.

Name of the imported job. In the same database, only one job with the same name can be running.


2.tbl_name

.

Specifies the name of the table you want to import.


3.load_properties

.

This is used to describe importing data. Grammar:

[column_separator],
[columns_mapping],
[where_predicates],
[partitions]
Copy the code

3.1 column_separator:

Specify a column separator, such as:

COLUMNS TERMINATED BY “,”

The default is: \t

3.2 columns_mapping:

Specifies the mapping of the columns in the source data, and defines how the derived columns are generated.

3.2.1 Mapping column:

Specify, in order, the columns in the source data and the corresponding columns in the destination table. For columns you want to skip, you can specify a non-existent column name.

Suppose the destination table has three columns K1, K2, and V1. The source data has 4 columns, in which columns 1, 2 and 4 correspond to K2, K1 and V1 respectively. Write as follows:

 COLUMNS (k2, k1, xxx, v1)
Copy the code

Where XXX is a non-existent column and is used to skip the third column in the source data.

3.2.2 Derived columns:

Columns in the form col_name = expr are called derived columns. That is, the value of the corresponding column in the destination table can be calculated by expR.

The derived columns are usually arranged after the mapped columns, although this is not mandatory, but Doris always resolves the mapped column first and then the derived column.

As an example, suppose the destination table has a fourth column, V2, which is generated by the sum of K1 and k2. It can be written as follows:

 COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
Copy the code

3.2.3 where_predicates

Use to specify filter criteria to filter out unwanted columns. Filter columns can be mapped columns or derived columns. For example, if we only want to import columns where k1 is greater than 100 and K2 is equal to 1000, write as follows:

WHERE k1 > 100 and k2 = 1000
Copy the code

3.2.4. The partitions

Specifies which partitions of the destination table to import. If this parameter is not specified, the system automatically imports the data to the corresponding partition.

Example:

 PARTITION(p1, p2, p3)
Copy the code

4 job_properties

Common parameters used to specify routine import jobs.

Grammar:

        PROPERTIES (
            "key1" = "val1",
            "key2" = "val2"
        )
Copy the code

Currently we support the following parameters:

4.1 desired_concurrent_number

The expected degree of concurrency. A routine import task is broken down into subtasks. This parameter specifies the maximum number of tasks a job can execute simultaneously. It has to be greater than 0. The default value is 3.

The concurrency is not the actual concurrency, which is considered by the number of nodes in the cluster, the load, and the data source.

Ex. :

                "desired_concurrent_number" = "3"
Copy the code

4.2 max_batch_interval/max_batch_rows/max_batch_size

The three parameters represent:

1) Maximum execution time of each subtask, in seconds. Ranges from 5 to 60. The default value is 10.

2) Maximum number of rows read by each subtask. The value must be greater than or equal to 200,000. The default value is 200000.

3) Maximum number of bytes read per subtask. The value ranges from 100MB to 1GB in bytes. The default is 100MB.

These three parameters are used to control the execution time and processing amount of a subtask. When either of them reaches the threshold, the task ends.

Ex. :

              "max_batch_interval" = "20",
              "max_batch_rows" = "300000",
              "max_batch_size" = "209715200"
Copy the code

4.3 max_error_number

Maximum number of error lines allowed in the sampling window. It has to be greater than or equal to 0. The default is 0, which means no error lines are allowed.

The sampling window is max_BATch_ROWS * 10. That is, if the number of error lines in the sampling window is greater than max_error_number, routine jobs are suspended, and you need to manually check the data quality. Rows filtered out by the WHERE condition are not counted as error rows.

4. 4 strict_mode

Whether to enable strict mode. The default value is Enabled. If enabled, the column type transformation of non-empty raw data will be filtered if the result is NULL. Strict_mode = “true”


  1. data_source

Type of data source. Currently supported:

 KAFKA
Copy the code

  1. data_source_properties

Specify information related to the data source.

Grammar:

        (
            "key1" = "val1",
            "key2" = "val2"
        )
Copy the code

KAFKA data source

6.1 kafka_broker_list

Broker connection information for Kafka. The format is IP :host. Multiple brokers are separated by commas.

Example:

   "kafka_broker_list" = "broker1:9092,broker2:9092"
Copy the code

6.2. kafka_topic

Specify the Kafka topic to subscribe to.

Example:

 "kafka_topic" = "my_topic"
Copy the code

6.3. kafka_partitions/kafka_offsets

Specifies the kafka partition to be subscribed to and the start offset of each partition.

Offset can specify a specific offset from greater than or equal to 0, or:

1) OFFSET_BEGINNING: start the subscription from where the data is.

2) OFFSET_END: Subscriptions start at the end.

If not specified, all partitions under the topic are subscribed from OFFSET_END by default.

The sample

                    "kafka_partitions" = "0,1,2,3",
                    "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
Copy the code

Example 1:

Create a Kafka routine import task named test1 for example_tbl of example_DB. All partitions are consumed automatically by default, and subscriptions start at the end (OFFSET_END)

CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "Format" = "json" / / receiving data as json format) FROM KAFKA (" kafka_broker_list "=" broker1:9092, broker2:9092 ", "kafka_topic" = "my_topic" );Copy the code

Example 2:

Import table data of gossip furnace

CREATE ROUTINE LOAD test.job3 ON ba_gua_lu_1003_log COLUMNS ( id, reserve1, reserve2, reserve3, reserve4, reserve5, user_id, channel, registerTime, holdCard, isForge, leftTime, create_time ) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "Format" = "json") FROM KAFKA ("kafka_broker_list" = "192.168.1.27:9092", "kafka_topic" = "kafka-pull", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Copy the code

Viewing task Status

SHOW ROUTINE LOAD;
Copy the code

DORIS ‘personal use experience


From personal use, I think DORIS is more suitable for real-time/offline business data calculation. Doris is compatible with THE SQL protocol, which makes it good for dealing with relational data in the business library. However, its disadvantage is in dealing with non-relational data, because Doris actually deals with tables as well. The server performance requirements are particularly high. Doris has a data import problem. Doris supports five types of data import methods, which can basically overwrite Kafka, HDFS, and mysql as data upstream. However, according to the official introduction, Doris’s data import had a disadvantage, that is, partial exceptions of the imported data might lead to the failure of the whole task. Doris has a limited number of relevant market information, which makes it costly and risky to use, learn, and operate. Out of the problem, more difficult to maintain the need to be familiar with the source code, in order to debug the bug.