Druid introduction

Druid is a MetaMarket data storage and Analysis system designed for OnLine Analysis Processing (OLAP) with high performance on massive data sets. Druid has been developed under the Apache Foundation. Druid’s key features:

  • Interactive Query: Druid’s low-latency data ingestion architecture allows events to be queried within milliseconds of their creation, because Druid’s Query latency is optimized by reading and scanning only necessary elements. Druid is a columnar store that reads the necessary data when querying, and the response to the query is subsecond.
  • High Available: Druid uses HDFS/S3 as Deep Storage, and the Segment is loaded on two Historical nodes. Multiple copies of data can be ingested to ensure data availability and fault tolerance.
  • Horizontal Scalable: The Druid deployment architecture can be scaled up horizontally, adding a large number of servers to speed data uptake and provide sub-second query service
  • Parallel Processing: Druid can process queries in Parallel across the cluster
  • Rich Query: Druid supports Scan, TopN, GroupBy, Approximate Query, and provides 2 types of Query: API and SQL

Druid is widely used in the following areas:

  • Page click flow analysis
  • Network Traffic Analysis
  • Monitoring system, APM
  • Data operations and marketing
  • BI analysis/OLAP

Why do we need Druid

As a SaaS company, there are a lot of business scenarios and a lot of real-time data and offline data. Before using Druid, some OLAP scene analysis was developed using SparkStreaming or Storm. In addition to realistic tasks, such schemes require careful storage for queries. The problems are: long development cycles; The initial storage design is difficult to meet the iterative development of requirements; Not extensible. After using Druid, developers need to fill out a data ingestion configuration, specify dimensions and metrics, and complete the data ingestion. As we know from the Druid features described above, Druid supports SQL, and applications can query data just as they would with normal JDBC. With the help of the OLAP platform developed by Youzun, data ingestion and configuration becomes much easier and convenient. A real-time task creation only takes about 10 minutes, greatly improving development efficiency.

2.1 Druid in likes Usage Scenarios

  • System Monitoring and APM: Excellent monitoring systems (Skynet) and many APM systems use Druid for data analysis
  • Data products and BI analysis: Uzan SaaS service provides merchants with many data products, such as merchant marketing tools and VARIOUS BI reports
  • Real-time OLAP services: Druid provides real-time OLAP services for C-side services such as risk control and data products

Druid’s architecture

Druid’s architecture is a Lambda architecture, divided into a real-time layer (Overlord, MiddleManager) and a batch layer (Broker and Historical). The main nodes include (PS: Druid all functions in the same package, starting with different commands) :

  • Coordinator node: manages and publishes cluster segments and balances the load of the segments in the Historical cluster
  • Overlord node: Overlord is responsible for accepting tasks, coordinating task allocation, creating task locks, collecting and returning task running status to clients. The asOverlord function can be configured on a Coordinator node to reduce the deployment, operation and maintenance (O&M) of a component
  • MiddleManager node: The MiddleManager node is responsible for receiving the index tasks assigned by Overlord and creating new Peon instances to perform the index tasks. A MiddleManager node can run multiple Peon instances
  • Broker nodes: Responsible for receiving query requests from clients and forwarding them to the Historical and MiddleManager nodes. Broker nodes need to be aware of the distribution of Segment information across the cluster
  • The Historical node is responsible for loading segments that are not real-time Windows according to the rules
  • Router node: Optional node, API gateway on top of Broker cluster. With Router node, Broker is no longer a single point of service, improving concurrent query capability

4. Architecture and function analysis of OLAP platform

4.1 Main objectives of OLAP platform

  • Minimize the cost of real-time task development: from developing real-time tasks to realistic tasks, design storage, to just fill in the configuration to complete the creation of real-time tasks
  • Provide data compensation service to ensure data security: solve the problem of late data loss due to real-time window closing
  • Provide stable and reliable monitoring services: OLAP platform provides comprehensive monitoring services from data ingestion, Segment dropping to data query for every DataSource

4.2 Good OLAP platform architecture

The OLAP platform is used to manage Druid and its surrounding component management systems.

  • The Datasource management
  • Druid configuration and instance management: The OLAP platform manages the druid instance on each machine through configuration, and expands or reduces the capacity of the druid instance
  • Data compensation management: To solve the problem of data delay, the OLAP platform can trigger compensation tasks manually and automatically
  • Druid SQL query: OLAP integrates SQL query functionality to help developers debug SQL
  • Monitoring alarm

4.2 Druid Instance Management

OLAP platform uses the druid tool, which assigns different number of druid instances to each DataSource according to the traffic volume. The configuration of the DataSource is pushed to the Agent-master. The Agent-Master collects the resource usage of each server and starts the druid instance on a resource-rich machine. For now, only the memory resources of the server are considered. In addition, the OLAP platform supports the start, stop, capacity expansion, and capacity reduction of the Tranquility instance.

4.3 Troubleshooting Data Delay — Offline data compensation function

Streaming data processing frameworks have time Windows, and data arriving later than the window is discarded. How to ensure that the late data can be built into the Segment and avoid the real-time task window being closed for a long time? We developed Druid data compensation, which uses THE OLAP platform to configure streaming ETL to store raw data in HDFS. Flume-based streaming ETL ensures that data of the same hour is stored in the same file path according to the Event time. Then OLAP platform is used to manually or automatically trigger the Hadoop-Batch task to build segments offline.

Flume-based ETL uses HDFS Sink to synchronize data, implements Timestamp Interceptor, creates files according to the Timestamp field of Event (a folder is created every hour), and the delayed data can be correctly archived into the file of the corresponding hour.

4.4 Separation of hot and cold data

As the number of connected services increases and the operation time is long, the data scale becomes larger and larger. The Historical node loads a large number of Segment data, and it is observed that most queries are concentrated in recent days. In other words, hot data in recent days can be easily queried. Therefore, the separation of hot and cold data is very important to improve query efficiency. Druid provides the Historical Tier grouping mechanism and data loading Rule mechanism, which can be configured to separate hot and cold data. First, group the Historical group. The default group is “_default_tier”. Plan a small number of Historical nodes and use SATA disks. Plan a large number of Historical nodes into “hot” groups, using SSDS. Then load the Rule for each DataSource configuration:

  • Rule1: Load a Segment from the last 30 days into the “hot” group;
  • Rule2: load two segments of the latest 6 months into the “_default_tier” group.
  • Rule3: Drop all previous segments (note: Historical segments are backed up in HDFS)
{"type":"loadByPeriod"."tieredReplicants": {"hot": 1}."period":"P30D"} 
{"type":"loadByPeriod"."tieredReplicants": {"_default_tier": 2}."period":"P6M"} 
{"type":"dropForever"}
Copy the code

Increase the druid.server.priority value of the “hot” cluster (default: 0). Hot data queries fall into the “hot” cluster.

4.5 Monitoring and alarm

Each component of the Druid architecture is fault-tolerant. A cluster can still provide services when a single point of failure occurs: Coordinator and Overlord have HA guarantees. Segment is multiple copies stored on HDFS/S3. At the same time, the Segment and Peon nodes loaded by Historical can set up multiple copies of real-time partial data to provide services. At the same time, it can send alarm information as soon as possible when the node/cluster enters bad state or reaches capacity limit. As with other big data frameworks, we have done detailed monitoring and alerting for Druid at two levels:

  • Basic monitoring includes service monitoring of each component, cluster water level and status monitoring, and machine information monitoring
  • Service monitoring Service monitoring includes key indicators such as real-time task creation, data ingestion TPS, consumption delay, persistence correlation, and QUERY RT/QPS. There are a single DataSource and two different views of the whole world. At the same time, these monitoring items have alarm items, exceeding the threshold trigger alarm reminder. Business Metrics are collected mostly through Metrics and Alerts provided by the Druid framework itself, which are then fed into components such as Kafka/OpenTSDB to obtain desired Metrics through streaming data analysis.

4.6 Deployment Architecture

The deployment of Historical cluster corresponds to the data hot and cold separation described in Section 4.4. SSD cluster is used to store the hot data of recent N days (adjustable Load days), and relatively cheap Sata model is used to store the Historical cold data of longer time. Meanwhile, THE I/O capability of Sata is fully utilized. Load segments to different disks; There are a lot of charging businesses in Youzan, so we do isolation at the hardware level to ensure that these businesses have enough resources on the query side. At the access layer, the Router is used for routing, avoiding Broker single point problems and achieving a large degree of clustering query throughput. In the MiddleManager cluster, in addition to the Index task (memory task), we also mixed some high traffic Tranquility task (CPU task) to improve the resource utilization of the MiddleManager cluster.

4.7 Contributing to the Open Source Community

In the like business query method is generally SQL On Broker/Router, we found that once there are a few slow queries, the client will not respond to the query, and the connection is more and more difficult to obtain. Logging into the server side of the Broker shows that the number of available connections is drastically reduced to exhaustion and a large number of TCP Close_Wait occur. Use the JSTACK tool to find a deadlock. For the specific Stack, see Issue-6867.

After a source code search, DruidConnection registered callbacks for each Statement. In the normal case, after the Statement ends, a callback function is executed to remove its state from the Statements in the DruidConnection. If there is a slow query (exceeding the maximum connection time or from a client Kill), the connection will be forced to close all statements under it. The two threads (the one that closed the connection and the one that is exiting the statement) each have a lock. Waiting for the other party to release the lock, a deadlock occurs, and the connection is immediately exhausted.

Statement is the callback function executed when the thread exits
final DruidStatement statement = new DruidStatement(
	connectionId,
	statementId,
	ImmutableSortedMap.copyOf(sanitizedContext),
	() -> {
	    // onClose function for the statement
	    synchronized (statements) {
	       log.debug("Connection[%s] closed statement[%s].", connectionId, statementId); statements.remove(statementId); }});Copy the code
// Automatic kill when the maximum connection time is exceeded
return connection.sync(
    exec.schedule(
        () -> {
          log.debug("Connection[%s] timed out.", connectionId);
          closeConnection(new ConnectionHandle(connectionId));
        },
        new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(),
        TimeUnit.MILLISECONDS
    )
);
Copy the code

After identifying the problem, we also proposed PR-6868 to the community. It has been successfully merged into the Master branch and will be released in 0.14.0. If readers run into this problem, they can just pick the PR cherry-pick into their branch and fix it.

V. Challenges and prospects for the future

5.1 Data intake system

Currently, the most common data ingestion schemes are KafkaIndex and Tranquility. We use the solution of Tranquility. Currently, Tranquility supports data uptake through Kafka and Http, and the uptake methods are not abundant. Tranquility is also an open source project of MetaMarket company, and the update speed is slow and many functions are missing. The most important one is the lack of monitoring function. We cannot monitor the running status of the instance, uptake rate, backlogs, loss and other information. Druid’s MiddleManager manages Peon nodes in the same way that Druid’s MiddleManager manages Peon nodes. By switching the druid or self-developed ingesting tool to the Yarn or Docker application, you can hand over resource scheduling and instance management to a more reliable scheduler.

5.2 Query dimension table JOIN of the Druid

Druid does not currently support JOIN queries; all aggregate queries are limited to a single DataSource. However, in practical scenarios, we often need several DataSource to perform JOIN query to get the desired results. This is a challenge for us and a challenge for the Druid development team.

5.3 Querying RT Burrs on the Hour

For OLAP query scenarios on the C-terminal, RT requirements are high. Druid creates the Index task of the current hour on the hour. If a query falls on a new Index task, the query will have a lot of burrs, as shown in the figure below:

We have already made some optimizations and tweaks, first adjusting the warmingPeriod parameter and starting Druid’s Index task just before the hour. For DataSource users with low TPS but high QPS, upgrade SegmentGranularity. Most queries are based on data of the latest 24 hours, ensuring that the queried data is in memory, reducing the creation of Index tasks and improving the Query burr greatly. However, from our goal is still a certain gap, next we go to optimize the source code.

5.4 Automatic Rull-up for Historical Data

Currently, most DataSource segments are granularity on an hourly basis, while data sharing on HDFS is one Segment per hour. When the Query time span is relatively long, it will lead to slow Query, occupy a large amount of Historical resources, and even Broker OOM. If you create a Hadoop-Batch task that rull-ups the data from a week ago (for example) and reconstructs the Index, it should have a good effect on compressing storage and improving query performance. Rull-up is already in the practice stage and will be covered in a blog post.

The infrastructure team is mainly responsible for the data Platform (DP), real-time computing (Storm, Spark Streaming, Flink), offline computing (HDFS, YARN, HIVE, Spark SQL), Online storage (HBase), real-time OLAP (Druid) and other technical products. If you are interested, please contact [email protected]