Apache Druid cluster design and workflow

This article describes the basic cluster architecture of Apache Druid and explains the role of the various processes in the architecture. The Druid architecture’s workflow is explained from the perspectives of data writing and data query.

Follow the public account MageByte, set the star punctuation “looking” is our motivation to create good writing. Public number back to “add group” into the technical exchange group to get more technical growth.

Druid is a multi-process architecture, and each process type can be configured and extended independently. This provides maximum flexibility for the cluster. This design also provides strong failure tolerance: a failed component does not immediately affect another component.

Let’s take a closer look at Druid’s process types and the role each plays in the cluster.

Processes and Servers

Druid has several process types, including the following:

  • A Coordinator process is responsible for managing data availability ina cluster.
  • The Overlord process controls resource load allocation for data ingestion.
  • The Broker process handles queries from external clients.
  • The Router process is optional and can route requests to Brokers, coordinators, and Overlord.
  • The Historical process stores queryable data.
  • The MiddleManager process is responsible for data intake.

You can deploy the above process in any way. However, for ease of operation and maintenance, it is recommended that processes be organized with three service types: Master, Query, and Data.

  • **Master: ** Runs Coordinator and Overlord processes to manage data availability and data writing.
  • Query: Runs the Broker and optional Router processes that handle external Query requests.
  • **Data: ** Runs the Historical and MiddleManager processes that perform Data writing tasks and store queryable Data.

External Dependencies

In addition to the built-in process types, Druid has three external dependencies.

Deep storage

Shared file storage, as long as it is configured to allow Druid access. In cluster deployment, distributed storage (such as S3 or HDFS) or mounted network file systems are commonly used. In single-node deployment, local disks are usually used. Druid uses Deep Storage to store data written to the cluster.

Druid uses Deep Storage only as a backup of data and as a way to transfer data between Druid processes in the background. To respond to queries, the Historical process does not read data from Deep Storage, but queries existing data from the local disk before any queries. This means that Druid does not need to access Deep Storage for queries, thus achieving optimal query latency. This also means that you must have enough disk space between the Deep Storage and Historical processes to store the data you plan to load.

Deep Storage is an important part of Druid’s resilient, fault-tolerant design. If local data is lost in the Druid standalone process, data can be recovered from Deep Storage.

Metadata storage

Metadata storage, which stores various shared system metadata, such as segment availability information and task information. In clustered deployments, traditional RDBMSS such as PostgreSQL or MySQL are often used. In a standalone deployment, local storage, such as an Apache Derby database, is typically used.

Zookeeper

For internal service discovery, coordination, and primary elections.

Architecture Diagram

The following figure shows how queries and writes are performed using the official recommended Master/Query/Data service deployment:

druid-architecture

Storage Design

Datasources and segments

Druid data is stored in “datasources”, which are like tables in an RDBMS. Each datasources is partitioned by time, or by other attributes. Each time range is called “chunk” (for example, one per day if your datasource uses the day partition). In chunk, data is partitioned into one or more “segments”. Each segment is a separate file, usually containing millions of lines of data. Once segments are stored in chunks, they are organized as follows:

druid-timeline

A datasource may have only one segment, or hundreds of thousands or even millions of segments. Each segment life cycle begins when the MiddleManager is created. When the segment is created, it is mutable and uncommitted. The segment build process consists of the following steps designed to produce a compact data file that supports fast queries.

  • Convert to column format
  • Create indexes using bitmaps
  • Compress data using various algorithms
    • Dictionary encoding for String columns with minimal ID storage
    • Perform bitmap compression on bitmap indexes
    • Do type aware compression for all columns

Segments are submitted and published regularly. At this point, the data is written to Deep Storage and is no longer mutable, and is migrated from the MiddleManagers process to the Historical process. An entry about the segment is written to the metadata storage. This entry is self-describing information about the metadata of the segment, including the data mode, size, and Deep Storage address of the segment. This information lets coordinators know what data is available in the cluster.

Indexing and Handoff

Indexing is the mechanism for creating each segment. Handoff is the mechanism by which data is published and begins to be processed by the Historical process. This mechanism works in the following order on indexing side:

  1. Launch a indexing Task and build a new segment. You must determine its identity before building. For an append task (such as a Kafka task, or an Append mode task), you can invoke Overlord’s “allocate”API to add a potential new partition to an existing segment. For an overwrite task (such as a Hadoop task, or a non-Append index task), a new version number and a new segment are created for the interval.
  2. If indexing tasks are real-time tasks (such as Kafka tasks), the segment can be queried immediately. The data is available, but still unpublished.
  3. When indexing is finished reading the segment data, it pushes the data to Deep Storage and publishes it by writing a record to the Metadata store.
  4. If indexing task is a live task, it will wait for the Historical process to load the segment. If indexing task is not a live task, exit immediately.

This mechanism works on the Coordinator/Historical side as follows:

  1. Coordinator periodically pulls published segments from the metadata storage (executed every minute, by default).
  2. When Coordinate finds a published segment that is not available, it selects a Historical process to load the segment and tells Historical what to do.
  3. Historical loads the segment and serves it.
  4. At this point, indexing tasks can exit if they are still waiting for data to be handed over.

Indexing and Handoff:


Segment Identifiers

The Segment identifier consists of the following four parts:

  • The name of the Datasource.
  • Time interval (Time interval included in the segment, corresponding to data ingestionsegmentGranularitySpecify parameters).
  • Version number (usually ISO8601 timestamp, corresponding to the time when the segment was first generated).
  • Partition number (integer, unique in datasource+interval+version, not necessarily consecutive).

For example, here is datasource clarity- Cloud0, time segment 2018-05-21T16:00:00.000z / 2018-05-21T17:00:00.000z, version number 2018-05-21T15:56:09.909z, Identifier with partition number 1:

Clarity - cloud0_2018-05-21 t16:00:00. Z_2018 000-05-21 t17:00:00. Z_2018 000-05-21 t15:56:09. 909 z_1

Copy the code

A segment with partition number 0 (the first partition in the block) omits the partition number, as shown in the following example, which is in the same time block as the previous segment, but with partition number 0 instead of 1:

Clarity - cloud0_2018-05-21 t16:00:00. Z_2018 000-05-21 t17:00:00. Z_2018 000-05-21 t15:56:09. 909 z

Copy the code

Segment versioning

You may be wondering what the “version number” described in the previous section is.

Druid supports batch mode overwriting. In Driud, if all you have to do is append data, there is only one version per time block. But what happens behind the scenes when you overwrite data is that you create a new set of segments using the same data source, at the same time interval, but with a higher version number. This signals to the rest of the Druid system that the older version should be removed from the cluster and replaced with a new version.

Switching seems to happen instantaneously to the user, as Druid handles this by loading the new data first (but not allowing it to be queried) and then switching the new query to the new segment as soon as all the new data is loaded. It then deletes the old segment after a few minutes.

Segment life cycle

The life cycle of each segment covers the following three main areas:

  1. ** Metadata store: ** Once the segment is built, store the segment metadata (small JSON data, usually no more than a few kilobytes) inMetadata storeIn the. The operation of inserting the segmnet records into the metadata store was calledrelease. And then the metadata inuseThe Boolean value is set toavailable. Segments created by live tasks are available before publication because they are published only when the segment is complete and do not accept any additional data.
  2. ** Push the segment data file to the deep storage immediately after the segment data is built and before the metadata is published to the metadata store.
  3. Availability of queries: Segments can be used for queries on some Druid data servers, such as live tasks or Historical processes.

Segments check the state of the segment using the Druid SQL sys.segments table. It includes the following marks:

  • is_published: If the segment metadata has been published to the stored metadata,usedIs true, and this value is also true.
  • is_available: If the segment is currently available for live tasks orHistoricalQuery, True.
  • is_realtime: True if segment is available on live tasks. For data sources that use real-time write, it is usually set totrue, and then as the segment is published and handed overfalse.
  • is_overshadowed: If the segment has been published (usedSet to true) and is completely overwritten by some other published segment. Typically, this is a transitional state, and the segment in this state will soon transfer itusedThe flag is automatically set to false.

Query processing

The query first enters the Broker process, which figures out which segments have data relevant to the query (the segment list is always scheduled by time, or can be based on other attributes, depending on how the data source is partitioned). Then, The Broker determines which Historical and middleManagers are serving these segments and sends the rewritten subqueries to each process. The Historical/MiddleManager process accepts the query, processes it, and returns the results. The Broker receives the results, merges them together to get the final answer, and returns it to the client.

The Broker analyzes each request and optimizes the query to minimize the amount of data that must be scanned per query. The index structure within each segment allows Druid to find out which rows (if any) match the filter set before viewing any rows compared to the optimization Broker filters do. Once Druid knows which rows match a particular query, it will only access the specific columns needed for that query. In these columns, Druid can skip between rows to avoid reading data that doesn’t match the query filter.

Therefore, Druid uses three different techniques to optimize query performance:

  1. Retrieves the segment to access for each query.

  2. Within each segment, an index is used to identify the rows being queried.

  3. Within each segment, only the rows and columns that are relevant to a particular query are read.

Links to other articles in the series:

Understanding and selection of time Series database (TSDB)

Learn about Apache Druid in 10 minutes

To learn more about data storage, time series, and Druid, follow me on my official account. Click “watching” is our motivation to create good articles

The public MageByte no.