The background,

In the process of daily system visual monitoring, when the monitoring detects abnormal indicators, we often need to locate the root cause of the problem. However, the information exposed by monitoring data is preset in advance and highly refined, which has a great deficiency in the amount of information. It needs to be used together with the log system that can carry rich information.

When the monitoring system detects an abnormal alarm, we usually delineate the general direction of the problem on the Dashboard based on the cluster, host, instance, application, and time to which the exception indicator belongs, and then switch to the log system for more detailed query to obtain more abundant information to determine the root cause of the problem.

In the above process, the monitoring system and the logging system are often independent and are used in very different ways. For example, monitoring system Prometheus was popular, and logging system ES+Kibana was used. They have completely different concepts, different search syntax and interface, which not only increases the learning cost for users, but also makes it necessary to do frequent context switching between the two systems when using them, and delays in locating problems.

In addition, log systems mostly use full-text indexes to support search services. It needs to establish reverse indexes for the original text of logs, which leads to the doubling of the final stored data compared with the original content, resulting in a significant storage cost. In addition, regardless of whether the data will be searched in the future, it will occupy a large amount of computing resources during the writing because of the index operation, which is undoubtedly a waste of computing resources for the log, which is a service of more write and less read.

Loki is a solution to these problems. Its goal is to build an extremely low-cost logging system that can be deeply integrated with monitoring.

Loki log scheme

1. Low cost of use

The data model

Loki referred to Prometheus for its data model. Data consists of labels, time stamps, and content. Data with the same label belongs to the same log flow and has the following structure:

Loki referred to Prometheus for its data model. Data consists of labels, time stamps, and content. Data with the same label belongs to the same log flow and has the following structure:

{" stream ": {" label1" : "value1", "label1" : "value2"}, # label "values" : [["<timestamp nanoseconds>","log content"]]}Copy the code

Tag, which describes meta information such as the cluster, service, host, application, and type of the log, and is used for searching for services in the future. Timestamp, log generation time; Content, the original content of the log.

Loki also supports multi-tenancy, and a collection of logs with exactly the same label under the same tenant is called a log flow.

In this way, the same label can be used in the subsequent combination with the monitoring system, and the data basis can be provided for the rapid context switch when the combination with the monitoring in the UI interface.

LogQL

Loki uses a query statement logQL, similar to Prometheus’ PromQL, with simple syntax and community usage, reducing user learning and use costs. Example syntax:

{file="debug.log""} |= "err"
Copy the code

Stream selector: {label1=”value1″, label2=”value2″}, select log streams by label, support equal, unequal, match, mismatch and other selection methods; Filter: | = “err”, filtering log content, support contains, does not contain, such as matching, does not match filtering method.

This works like find+grep, find finds the file, grep matches the file line by line:

find . -name "debug.log" | grep err
Copy the code

In addition to querying log content, logQL supports aggregate calculation of log quantity and log frequency.

Grafana

The Loki plugin is native to Grafana and integrates monitoring and log query together. Side-by-side drilling and exploration of monitoring data and logs can be performed in the same UI, which is more intuitive and convenient than switching between different systems repeatedly.

In addition, monitoring and log query can be configured together in dashboards. In this way, you can view the trend of monitoring data and log content at the same time, providing a more intuitive way to catch possible problems.

Low storage cost

Only metadata tags related to the log are indexed, and the log content is stored in the object store in a compressed manner without any indexing. Compared to full-text indexing systems such as ES, data can be reduced by a factor of ten, and with object storage, the ultimate storage cost can be reduced by a factor of ten or less. The solution does not solve complex storage system problems, but directly applies existing mature distributed storage systems, such as S3, GCS, Cassandra, and BigTable.

2, the architecture

As a whole, Loki adopts a read-write architecture, which is composed of multiple modules. Its main structure is shown in the figure below:

  • Promtail, Fluent-bit, Fluentd, and Rsyslog open source clients collect and report logs.
  • Distributor: Log write entry that forwards data to Ingester;
  • Ingester: log writing service that caches and writes log content and indexes to underlying storage
  • Querier: log reading service that performs search requests.
  • QueryFrontend: log entry that distributes read requests to Querier and returns results.
  • Cassandra BigTable/DnyamoDB/S3 / GCS: index, log content underlying storage;
  • Cache: Cache. Redis, Memcache, and local Cache are supported.

Distributor

As an entry service for log writing, it parses, verifies, and forwards reported data. It will verify the size, entry, frequency, label, tenant and other parameters of the received number after parsing, and then forward the legitimate data to Ingester service. Before forwarding, its most important task is to ensure that the data of the same log stream must be forwarded to the same Ingester to ensure the sequence of data.

Hash ring

Distributor uses a combination of a consistent hash and replica factors to determine which ingesters the data is forwarded to.

When Ingester starts up, it generates a series of random 32-bit numbers as its tokens, and then registers itself with the Hash ring along with this set of tokens. When selecting a data forwarding destination, the Distributor generates a Hash based on the label and tenant ID of the log and then looks in the Hash ring for the first Token that is greater than this Hash. Ingester corresponding to the Token is the destination to which the log needs to be forwarded. If the replica factor is set, different ingesters are sequentially looked up in subsequent tokens as the destination for the replica.

Hash rings can be stored in ETCD, Consul. In addition, Loki uses Memberlist to implement KV storage within the cluster. If you do not want to rely on ETCD or Consul, you can use this solution.

The input of Distributor mainly receives and reports logs in bulk through HTTP protocol. The log packaging format supports JSON and PB and the data packaging structure:

[
  {
   "stream": { 
     "label1": "value1",
     "label1": "value2"
   },
   "values": [
     ["<timestamp nanoseconds>","log content"],
     ["<timestamp nanoseconds>","log content"]
   ]
  }
  ......
]
Copy the code

Distributor sends data to Ingester in the form of GRPC, which encapsulates the data structure:

{
  "streams": [
    {
      "labels": "{label1=value1, label2=value2}",
      "entries": [
          {"ts": <unix epoch in nanoseconds>, "line:":"<log line>" },
          {"ts": <unix epoch in nanoseconds>, "line:":"<log line>" },
      ]
    }
    ....
   ]
}
Copy the code

Ingester

As a write module for Loki, Ingester’s main job is to cache and write data to the underlying storage. Ingester is roughly divided into three layers: validation, caching, and storage adaptation, according to the life cycle of written data in the module.

check

An important feature of Loki is that it does not sort out data out of order and requires that data in the same log stream be written in a strict monotonic increasing order of time stamps. So in addition to checking the length and frequency of data, it is very important to check the log sequence. Ingester compares the timestamp and content of each log in each log stream with that of the previous log:

  • Compared with the previous log, the timestamp of this log is updated, and this log is received.
  • Compared with the previous log, this log is received because the timestamp is the same and the content is different.
  • Compared with the previous log, the timestamp and content are the same. This log is ignored.
  • This log has an older timestamp than the previous log and returns an out-of-order error.

The cache

The log cache in memory adopts a multi-level tree structure to isolate different tenants and log flows. The same log flow is sequentially appended to blocks. The overall structure is as follows:

  • Instances: Map structures that take the tenant’s userID as the key and Instance as the value.
  • Instance: a container for all log streams under a tenant;
  • Streams: a Map structure in which the fingerprint of _ log Stream (streamFP) is the key and Stream is the value;
  • Stream: a container _ log Stream _ all chunks;
  • Chunks: a list of Chunks;
  • Chunk: the structure of the smallest read/write unit in memory for persistent storage.
  • Block: a Chunk of compressed archived data.
  • HeadBlock: a block that is still open for writing;
  • Entry: A single log unit, including timestamp and log content (line).

Chunks

Before writing data to memory, Ingester first locates log streams and Chunks based on tenant IDS (userIDS) and fingerprints calculated by tags (streamPF).

Chunks consist of Chunks arranged in ascending chronological order. The last chunk receives the latest data written, while the rest are written to the underlying storage. When the lifetime or data size of the last chunk exceeds a specified threshold, a new chunk is added to the end of the chunk.

Chunk

Chunk is the structure in memory of the smallest unit of Loki read and write on the underlying storage. It consists of blocks where the headBlock is the block that is being written open and the other blocks have archived compressed data.

Block

A Block is a compression unit of data. The purpose of a Block is to avoid wasting computing resources by decompressing the entire Chunk at a time in a read operation, because in many cases, only part of a Chunk of data is read and the required amount of data is returned.

Block stores compressed log data in the form of log timestamp and original content in chronological order. The compression can be in gZIP, SNappy, or LZ4 modes.

HeadBlock

A special block is being written, which is compressed and archived as a block after a certain size, and a new headBlock is created.

The storage adapter

As the underlying storage system supports S3, Cassandra, BigTable, DnyamoDB and other systems, the adaptation layer abstracts the read and write operations of various systems into a unified interface and is responsible for data interaction with them.

The output

Chunk

Loki reads and writes data in chunks in the storage system. Chunks in the persistent state have the following structure

  • Meta: Encapsulates meta information such as the fingerprint, tenant ID, and start and end time of the stream to which chunk belongs.
  • Data: encapsulates log content, including some important fields.
  • Encode saves data compression mode;
  • Block -n bytes Saves log data of a block.
  • Block section byte offset Specifies the #block unit offset.
  • Block cells record how many blocks there are in total;
  • Entries correspond to block-n bytes one by one, recording meta information such as the number of Japanese lines in each block, start time point, start position and length of blokC-n bytes.

The sequence of Chunk data parsing:

  1. #blocks section byte offset = #blocks section byte offset
  2. The number of blocks in chunk is calculated according to #block unit records.
  3. Start from #block location to read all block entries, mint, maxt, offset, len and other meta information;
  4. The data of each block is parsed sequentially based on the metadata of each block

The index

Loki only indexes the label data, which is used to realize the label → log flow →Chunk index mapping, and is stored in the storage layer in the form of separate tables.

1. The table structure

CREATE TABLE IF NOT EXISTS Table_N (hash text, range blob, value blob, PRIMARY KEY (hash, range))Copy the code
  • Table_N, the table name according to the time period;
  • Hash, the index used for different query types;
  • Range query field;
  • Value: indicates the value of the log label

2. Data type

Loki stores different types of index data to realize different mapping scenarios. For each type of mapping data, the data composition of Hash/Range/Value fields is shown as follows:

SeriesID is the log flow ID, shard is the fragment ID, userID is the tenant ID, labelName is the labelName, labelValueHash is the hash of the label value, and chunkID is the ID of chunk. ChunkThrough refers to the time of the last piece of data in chunk. The role of these data elements in the mapping process is described in detail in the Querier query process.

The three color-coded index types in the figure above are, from top to bottom:

  • Data type 1: Used to query the ids of all log flows by user ID.
  • Data type 2: INDICATES the ID of the log flow queried based on the user ID and label.
  • Data type 3: Used to query the ID of the underlying storage Chunk based on the log flow ID.

Loki also uses bucket and shard methods to optimize index query speed.

  • Points barrels

To divide by day:

BucketID = timestamp/secondsInDay.

Divided by hours:

BucketID = timestamp/secondsInHour.

  • shard

Shard = seriesID% Number of shards.

State of the Chunk

Chunk is an important data unit in Ingester, and its life cycle in memory can be divided into the following four states:

  • Writing: Writing new data;
  • Waiting flush: stop writing new data to storage;
  • Retain: Has been written to storage and is waiting to be destroyed;
  • Destroy: The system has been destroyed.

The transitions between the four states are in writing -> waiting flush -> retain -> destroy order.

1. Timing of state transition

  • Collaboration trigger: there is a new data write request;
  • Timing trigger:Flash cycleTriggering chunk to be written to storage,Recycling cycleThe chunk destruction is triggered.

2. Writing turns to waiting flush

Chunk’s initial state is writing, indicating that data is being written. If the following conditions are met, chunk enters the waiting state:

  • Chunk space full (cooperative trigger);
  • The chunk ofSurvival time(The time difference between the first data and the last data) exceeds the threshold (triggered periodically).
  • The chunk ofFree time(Continuous time of no data being written) Exceeds the value (triggered by timing).

3. Waiting flush turned to etain

Ingester periodically writes the chunks that are waiting to be flushed to the underlying storage. After that, these chunks will be in the “retain” state. This is because Ingester provides a search service for the latest data and needs to retain it in memory for a period of time. The retain state decouples the flush time and retention time in memory, making it easier to optimize memory configuration depending on the options.

4. Destroy, retrieve, wait for GC to destroy

In general, Loki uses sequential appending for logging scenarios, indexing only meta information, greatly simplifying its data structure and processing logic, and providing a foundation for Ingester’s ability to handle high-speed writes.

Querier

The execution component of the query service, which is responsible for pulling data from the underlying store and filtering it according to the filtering criteria described by the LogQL language. It can provide query services directly through the API, or it can be used in conjunction with queryFrontend for distributed concurrent queries.

Types of queries

  • Range Log Query
  • Single log Query
  • Statistical query
  • Meta information query

Of these query types, scope log queries are the most widely used, so only scope log queries are described in detail below.

Concurrent query

For a single query request, although Querier API can be directly called to query, but it is easy to cause OOM due to large query. In order to deal with this problem, Querier and queryFrontend are combined to achieve query decomposition and concurrent execution of multiple Queriers.

Each Querier establishes a GRPC bidirectional streaming connection with all queryFrontend, and obtains the partitioned sub-query query from queryFrontend in real time, and sends the result back to queryFrontend after execution. How to split queries and schedule subqueries between Queriers will be explained in the queryFrontend section.

The query process

1. Parse the logQL command

2. Query the log flow ID list

Loki uses different index query logic based on different tag selector syntax, which is generally divided into two types:

  • =, or multi-valued regular matching =~, the working process is as follows:
  1. Query with semantics similar to those described in the following SQLLabel selectorEvery single one of themLabel key-value pairsThe correspondingLog stream ID (seriesID)The collection.
SELECT * FROM Table_N WHERE hash=? AND the range > =? AND value=labelValueCopy the code

◆ Hash indicates the hash value calculated by the combination of the tenant ID(userID), bucketID, and labelName. ◆ range indicates the hash value calculated by the labelValue.

  1. Based on theLabel key-value pairsMultiple seriesID sets queried take union or intersection to find the final set.

Tags, for example, the selector {file = “app. The log”, level = ~ “debug | error”} working process is as follows:

  1. Query seriesIDS S1, S2, S3 corresponding to the key values of file=”app.log”, level=”debug”, level=”error”. 2. Calculate the final seriesID set S = S1∩ CAP (S2∪S3) based on the three sets.
  • ! =, =,!, the working process is as follows:
  1. Query with the semantics described in the following SQLLabel selectorEvery single one of themThe labelSeriesID set corresponding to.
SELECT * FROM Table_N WHERE hash = ?
Copy the code

◆ Hash indicates the tenant ID(userID), bucket (bucketID), and label (labelName).

  1. Each seriesID collection is filtered according to label selection syntax.

  2. After filtering the set of union, intersection and other operations to find the final set.

For example, {file~=”mysql*”, level! =”error”}

  1. Query the seriesids S1 and S2 corresponding to labels “file” and “level”. Mysql > select level, level, level, level, level, level, level, level, level, level = a subset of “error” SS2; 3. Calculate the final seriesID set S = SS1∩SS2.

3. Query the ids of chunks contained in all log flows using the semantics described in the following SQL statement

SELECT * FROM Table_N Where hash = ?
Copy the code
  • Hash Hash value calculated for bucketID and seriesID.

4. Generate a traverser from the chunkID list to read the log lines sequentially

As a component of data reading, the traverser’s main function is to pull chunk from the storage system and read log lines from it. It adopts a multi-layer tree structure and pops up data recursively from top to bottom. The specific structure is shown in the figure below:

  • Batch Iterator: Downloads chunk raw data from storage in batches and generates an Iterator tree.
  • 1. stream Iterator: a traverser of multiple streams that uses heapsort to ensure that the data between the streams is sorted;
  • Chunks Iterator: a traverser of chunks, which also uses heap sorting to ensure sequence preservation between chunks and deduplication between multiple copies.
  • 1. A traverser of multiple blocks;
  • Block Bytes Iterator: Iterator of log lines in a block.

5. Query from Ingester for data in memory that has not been written to storage

Because Ingester periodically writes cached data to storage, Querier also uses the GRPC protocol to query memory data from each Ingester when it queries data in a newer time range. The time range that you need to query in Ingester is configurable, depending on how long ingester caches data.

The main flow of log content query is shown above. As for the process of index query, it is almost the same, but the traverser layer of index calculation is added to calculate index data from the queried log. The other two are simpler and won’t be discussed in detail here.

QueryFrontend

Loki adopts the post-calculation method for query, which is similar to grep on a large number of original data, so the query is bound to consume a lot of computing and memory resources. If you execute a query request with a single node, it is easy to cause OOM, slow speed and other performance bottlenecks due to large queries. To solve this problem, Loki splits a single query into multiple Queriers for concurrent execution, where query requests are split and scheduled by queryFrontend.

QueryFrontend is at the front end of Querier in the overall architecture of Loki. It serves as the entry service for data reading operations. Its main components and workflow are shown in the following figure:

  1. Split Request: Split a single query into a list of subqueries subReq;
  2. Feeder: The subquery order is injected into the cache Queue Buf Queue;
  3. Runner: Multiple concurrent runners inject queries from the Buf Queue into subquery queues and wait for the results to return;
  4. Querier pops up the sub-query from the sub-query queue in real time through GRPC protocol, and returns the result to the corresponding Runner after execution.
  5. All sub-requests are summarized and returned to THE API response after Runner completes execution.

Query segmentation

QueryFrontend splits query requests into subqueries over a fixed time span. For example, if the time range of a query is 6 hours and the split span is 15 minutes, the query will be divided into 6*60/15=24 subqueries

Query scheduling

Feeder

Feeder is responsible for writing partitioned subqueries to the cache Queue Buf Queue one by one, and realizes controllable subquery concurrency with downstream Runners in producer/consumer mode.

Runner

Subqueries are read competitively from the Buf Queue and written to the downstream request Queue, processing the returned results from the Querier. The concurrent number of Runner is controlled by global configuration to avoid the huge flow of Querier caused by decomposition of too many sub-queries at one time, which affects its stability.

Subquery queue

A queue is a two-dimensional structure. The first dimension stores queues of different tenants and the second dimension stores subquery lists of the same tenant. Both of them organize elements in and out of queues in FIFO order

Allocation request

QueryFrontend passively allocates query requests. The back-end Querier and queryFrontend listen to the sub-query queue in real time through GRPC. When there is a new request, the next request pops up in the queue in the following order:

  1. Loop through the list of tenants in the queue to find the next tenant queue with data.
  2. The oldest request in the vstore queue is displayed.

Third, summary

As a rapidly developing project, the latest version of Loki has reached 2.0. Compared with 1.6, it has enhanced new functions such as log analysis, Ruler, boltdb-shipper, etc. However, the basic modules, architecture, data model and working principle are in a stable state. I hope that the analysis of this paper can provide you with some help, if there is a misunderstanding in the article, welcome criticism and correction.

Recommended reading:

  • 11.11 the Tech Talk | revelation 11.11 monitoring debugging tool Jingdong highly stable log service depth resolution

  • What to do with DevOps? Here’s a jd 11.11DevOps prep guide

  • Easily support millions of series data points to write into the HoraeDB architecture decryption of JD Zhaopin cloud timing database

Welcome to [JD Zhilian Cloud] to learn about the developer community

More wonderful technical practice and exclusive dry goods analysis

Welcome to [JD Zhilian cloud developer] public account