Searching and Aggregation of Timing Data in ElasticSearch by Sonja Krause-Harder

Originally, ElasticSearch was a search engine that kept its search index in a Lucene database. However, Elasticsearch has evolved considerably since then and is now a high-performance, clustered, extensible data store. Although its origins can still be seen in the indexing format, ElasticSearch is now widely used by a variety of users for a variety of purposes.

One of its uses is to store, process, and retrieve temporal data. The characteristic of sequential data is that each data point has an exact timestamp associated with it. Most commonly, a data point represents some kind of measurement at a specific point in time, be it a stock price, a scientific observation, or a server load.

Although there are several database implementations dedicated to processing temporal data, there is still no common format for storing and querying temporal data, and in theory all database engines can be used to process temporal data.

In the abstract sense, a temporal data entry consists of the following:

  • Name of the sequence
  • The time stamp
  • value
  • A collection of key-value pairs (also known as _ tags _) that contains more information about the sequence

In a server monitoring use case, there would certainly be a key-value pair to indicate which host the timing data belongs to, but any additional information could be added, and this additional information could then be used to request that metrics be fetched only for a specific host set. Some examples are hosts running specific services, hosts belonging only to a production environment, or instances running on a specific cloud service provider platform.

To be more down-to-earth, let’s use MetricBeat data as an example to see how to use ElasticSearch queries to filter out specific temporal information from the data.

Each MetricBeat document includes the following information:

  • The time stamp
  • True temporal data

    • For MetricBeat, see the MetricBeat documentation for a description of the format in this section of the document. If you want to understand in detailsystemThe modulecpuIndicator set, seeMetric set document.
  • Metadata contained in the document about the metrics themselves. MetricBeat uses the ECS fields Event. Module and Event. Dataset to specify which MetricBeat module is used when creating the document and which set of metrics is included in the document.

    • This information can help you understand what metrics are in the document before you try to retrieve temporal data.
  • The metadata about the instance, whether it’s a physical host, a virtual machine, or a small entity such as a Kubernetes Pod or Docker container

    • This unary data fits_Elastic Common Schema__, so it can match data from other sources that also use ECS. _

For example, the MetricBeat document in the System. CPU metric set looks like this. The inline comment of the _source object indicates where you can get more information about the field:

  • ECS document

    • [ECS Field Resources]
  • Metricbeat document

    • [MetricBeat Common Fields]
  • System. CPU metric set document

    • [System field]
    • [System CPU indicator set]

Note: The document adds a # comment to the JSON file for ease of understanding.

{" _index ":" metricbeat - 8.0.0-2019.08.12-000001 ", "_type" : "_doc", "_id" : "VWm5hWwB6vlM0zxdF3Q5 ", "_source" : {"@timestamp" :" 2019-08-12T12:06:34.572z ", "ecs" : {# ECS metadata "version" :"1.0.1"}, "host" : {# ECS metadata "name" :" noether", "hostname" :" noether"; "Noether ", "architecture" :" x86_64"," OS ": {"kernel" :"4.15.0-55-generic", "codename" :" bionic", "platform" : "Ubuntu ", "version" :" 18.04.3LTS (Bionic Beaver)", "family" : "debian", "name" :"Ubuntu" }, "id" :"4e3eb308e7f24789b4ee0b6b873e5414", "containerized" : false }, "agent" : { # ECS metadata "ephemeral_id" :"7c725f8a-ac03-4f2d-a40c-3695a3591699", "hostname" : "noether", "id" : "E8839ACC-7F5E-40BE-A3AB-1CC891BCB3CE ", "version" :"8.0.0", "type" :" metricbeat"}, "event" : { # ECS metadata "dataset" : "system.cpu", "module" : "system", "duration" :725494 }, "metricset" : { # metricbeat metadata "name" : "cpu" }, "service" : { # metricbeat metadata "type" : "system" }, "system" : {# MetricBeat Time Series Data "CPU" : {"softirq" : {" PCT ":0.0112}, "steal" : {" PCT" :0}, "cores" :8, "irq" : {" PCT ": 0}," idle ": {" PCT", 6.9141}, "nice" : {" PCT ": 0}," user ": {" PCT", 0.7672}, "system" : {" PCT ", 0.3024}, "iowait" : {" PCT ", 0.0051}, "total" : {" PCT ": 1.0808}}}}}

To summarize, in the MetricBeat document, temporal data and metadata are mixed together, so you need specific knowledge of the document format to accurately retrieve what you want.

However, if you want to process, analyze, or visualize chronological data, the data should generally be in a table-like form, as follows:

<series name> <timestamp> <value> <key-value pairs System. CPU.User. PCT 1565610800000 0.951 Host. Name = "Hilbert" System. CPU.User. PCT 1565610810000 0.793 Host. name= "Hilbert" System. CPU.User. PCT 1565610820000 0.802 Host. name= "Noether" System. The CPU. User. 0.679 PCT 1565610820000 host. The Hilbert "name ="

The following example shows how ElasticSearch queries can help you programmatically retrieve temporal data in a form that closely resembles such a table. To experience the lookup process yourself, you’ll need an instance of ElasticSearch, and you’ll need to have MetricBeat installed and running to transfer data for the System.CPU and System.Net work metric sets. For a brief introduction to MetricBeat, see the introductory documentation on the ES website.

You can run all queries from the Dev Tools console in Kibana. If you haven’t used it before, you can check out the Kibana Console documentation on the ES website for a quick overview. Note that you need to change the hostname in the sample query.

We assume that you have set up MetricBeat with the default configuration. This means that it creates an index every day, and these indexes are named “metricbeat- version number – date – count”, such as metricbeat-7.3.0-2019.08.06-000009. To query all of these indexes at once, we need to use wildcards:

Example queries:

GET metricbeat-*/_search

The sample response is as follows:

{ "took" :2, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : {" total ": {" value" : 10000, the "base" : "gte"}, "max_score" : 1.0, "hits" : [...]. }}

Obviously, this query exceeds the limit of how many documents ElasticSearch can return in a single query. The real hit information is omitted here, but you may want to scroll through the query results and compare them with the annotated document above.

Depending on the size of the infrastructure being monitored, although there may bea huge amount of MetricBeat documentation, you rarely need to query chronological data from the earliest (recorded) point in time, so we start with a date range, in this case the past 5 minutes:

Example queries:

GET metricbeat-*/_search 
{ 
  "query": { 
    "range": { 
      "@timestamp": { 
        "gte": "now-5m" 
      } 
    } 
  } 
}

The sample response is as follows:

{ "took" :4, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : {" total ": {" value" : 30, "base" : "eq"}, "max_score" : 0.0, "hits" : [...]. }}

It’s a much easier scale to manage. However, the system on which this query was run had only one host reporting to it, so in a production environment, the hit count would still be high.

If you want to retrieve all of the CPUs on a particular host, the first native attempt on an ElasticSearch query might be to add a filter on the host.name and the metric set System. CPU:

Example queries:

GET metricbeat-*/_search 
{ 
  "query": { 
    "bool": { 
      "filter": [ 
        { 
          "range": { 
            "@timestamp": { 
              "gte": "now-5m" 
            } 
          } 
        },
        { 
          "bool": { 
            "should": [ 
              { 
                "match_phrase": { 
                  "host.name": "noether" 
                } 
              },
              { 
                "match_phrase": { 
                  "event.dataset": "system.cpu" 
                } 
              } 
            ] 
          } 
        } 
      ] 
    } 
  } 
}

The sample response is as follows:

{ "took" :8, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : {" total ": {" value" : 30, "base" : "eq"}, "max_score" : 0.0, "hits" : [...]. }}

This query still returns a large number of documents, all containing complete data about the System.CPU metric set sent by MetricBeat. This result is not very useful for the following reasons.

First, we need to retrieve all the documents in the entire time range. Once we reach the configured upper limit, ElasticSearch will not return these results at once; It will try to sort the documents, which is not applicable to our query at all; Elasticsearch also returns results that are not sorted by timestamp.

Second, we are only interested in a small part of each document: the timestamp, a few index values, and possibly a few other metadata fields. Returns the full _source from ElasticSearch and then picks the data from the query results, which is inefficient.

One way to solve this problem is to use Elasticsearch _ aggregation _. 支那

Example 1: CPU percentage, reduced sample

支那

Let’s first look at the date histogram. The date histogram aggregation will return a value for each time interval. The returned buckets are sorted by time, and the user can specify the interval (also known as bucket size) to match the data. In this example, we chose the interval to be 10 seconds, because MetricBeat sends data from the system module every 10 seconds by default. The size: 0 parameter at the top level indicates that we are no longer interested in the actual hit, but only in the aggregate, so no documents will be returned.

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" } } } }

The sample response is as follows:

{... , "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T13:03:20.000z ", "key" :1565615000000, "doc_count" :1}, {"key_as_string" :" 2019-08-12T13:03:30.000z ", "key" :1565615010000, "doc_count" :1}, {" key_as_string ":" the 2019-08-12 T13:03:40. 000 z ", "key" : 1565615020000, "doc_count" : 1},... }}}

For each bucket, this returns a timestamp in the key, as well as the helpful key_as_string (which contains a user-readable date-time string), and the number of documents contained in the bucket.

The doc_count for this case is 1 because the bucket size matches MetricBeat’s reporting period. This result is not useful without additional information, so in order to see the true index value, we need to add another aggregate. At this step, we need to decide on the type of aggregation — AVG, Min, and Max are all good choices for numbers — but since we only have one document per bucket, it doesn’t matter which one we choose. The following example is a good example of this, as it returns AVG, Min, and Max aggregations for the value of the indicator System.CPU.User.PCT within the bucket’s 10 seconds:

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" }, "aggregations": { "myActualCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myActualCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myActualCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T13:12:400.000z ", "key" :1565615560000, "doc_count" :1, "myActualCpuUserMin" : {"value" :1.002}, "myactualCPUUserAVG" : {"value" :1.002}, "myactualCPUUserMax" : {"value" :1.002}}, {"key_as_string" :" 2019-08-12T13:12:50.000z ", "doc_count" :1, "doc_count" :1, "MyActualCPuUsermin" : {"value" :0.866}, "myActualCPuUserAVG" : {"value" :0.866}, "myActualCPuUserMax" : {"value" :0.866}},...] }}}

As you can see, in each bucket MyActualCPUUsermin, MyActualCPUUserAVG, and MyActualCPUUserMax are the same, so if you need to retrieve the raw value of chronographic data reported at fixed intervals, you can do so using a date histogram.

Most of the time, however, you are not interested in every single data point, especially if you take a measurement every few seconds. For many purposes, it is actually better to have coarse-grained data: for example, if a visualization has only a limited number of pixels to show changes in temporal data, it will discard finer grained data in its rendering.

We typically narrow the sampling of temporal data until the granularity matches the requirements of any subsequent processing steps. During narrowing sampling, multiple data points in a given time period are reduced to a single point. In our server monitoring example, the data is measured every 10 seconds, but in most cases, the average of all the values over a minute should do just fine. Occasionally, the process of narrowing down the sample is exactly the same as the process of date histogram aggregation, provided that the date histogram aggregation process finds more than one document per bucket and the correct nested aggregation is applied.

The following example shows date histogram results with nested AVG, Min, and Max aggregations in a full 1-minute bucket, giving the first example of narrowing sampling. Because calendar_interval is used and fixed_interval is not used, this parameter adjusts the bucket boundary to a full minute.

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T13:27:00.000z ", "key" :1565616420000, "doc_count" :4, "myDownSampledCPUUserMax" : {"value" :0.927}, "myDownSamPledCPUUserMin" : {"value" :0.6980000000000001}, "myDownSamPledCPUUserAVG" : {"value" :0.8512500000000001}}, {"key_as_string" :" 2019-08-12T13:28:00.000z ", "doc_count" :6, "MyDownSamPledCPUUserMax" : {"value" :0.838}, "myDownSamPledCPUUserMin" : {"value" :0.5670000000000001}, "myDownSamPledCPUUserAVG" : {"value" :0.7040000000000001},...] }}}

As you can see, MyActualCPUUsermin, MyActualCPUUserAVG, and MyActualCPUUserMax now have different values, depending on the aggregation being used.

Which method is used in narrowing the sample depends on the index. For CPU percentage, AVG aggregation in less than one minute is fine, and for metrics such as queue length and system load, MAX aggregation may be more appropriate.

Now, you can also use ElasticSearch to perform some simple algebraic operations and compute temporal data that is not in the original data. Assuming we do AVG aggregation for CPUs, our example can be optimized to return user CPUs, system CPUs, and the sum of users and system divided by the number of CPU cores, with the following command:

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuSystemAvg": { "avg": { "field": "system.cpu.system.pct" } }, "myCpuCoresMax": { "max": { "field": "system.cpu.cores" } }, "myCalculatedCpu": { "bucket_script": { "buckets_path": { "user": "myDownsampledCpuUserAvg", "system": "myDownsampledCpuSystemAvg", "cores": "myCpuCoresMax" }, "script": { "source": "(params.user + params.system) / params.cores", "lang": "painless" } } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myDateHistogram" : { "buckets" : [{" key_as_string ":" the 2019-08-12 T13:32:00. 000 z ", "key" : 1565616720000, "doc_count" : 2, "myDownsampledCpuSystemAvg" : {"value" :0.344}, "myCPUCOresmax" : {"value" :8.0}, "myDownSamPledCPuUserAVG" : {"value" :0.8860000000000001}, "mycalculatedCPU" : {"value" :0.15375}}, {"key_as_string" :" 2019-08-12T13:33:00.000z ", "doc_count" :6, "MyDownsampledCpuSystemAvg" : {" value ", 0.33416666666666667}, "myCpuCoresMax" : {"value" :8.0}, "myDownSamPledCPUUserAVG" : {"value" :0.8895}, "myCalculatedCPU" : {"value" :0.15295833333333334}},...] }}}

支那

Example 2: Network traffic – multivalue aggregation and derivative aggregation

支那

There is also a more detailed example of the System.Net work metric set to show how powerful Elasticsearch aggregation can be when dealing with temporal data. The relevant sections in the System.Net work metric set documentation look like this:

{... "system": { "network": { "in": { "bytes":37904869172, "dropped":32, "errors":0, "packets":32143403 }, "name": "wlp4s0", "out": { "bytes":6299331926, "dropped":0, "errors":0, "packets":13362703 } } } ... }

MetricBeat sends a document for each network interface that exists in the system. These documents have the same timestamp, but the System.net Work.Name field has a different value, one for each network interface.

Any further aggregation needs to be done on the interface, so we changed the top-level date histogram aggregation from the previous example to a multivalue aggregation for the System.net Work.Name field.

Note that for this method to work, the aggregated fields need to be mapped to keyword fields. If you use the default index template provided by MetricBeat, this mapping may already be set up for you. If not, the MetricBeat template documentation page gives a short description of what you need to do.

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "enp0s31f6", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "lo", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "wlp61s0", "doc_count" :29, "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T13:39:000z ", "key" :1565617140000, "doc_count" :1}, {"key_as_string" :" 2019-08-12T13:40:00.000z ", "key" :1565617200000, "doc_count" :6}, {"key_as_string" :" 2019-08-12T13:41:00.000z ", "key" :1565617260000, "doc_count" :6}, {"key_as_string" :" 2019-08-12T13:42:00.000z ", "key" :1565617320000, "doc_count" :6}, {"key_as_string" :" 2019-08-12T13:43:00.000z ", "key" :1565617380000, "doc_count" :6}, {" key_as_string ":" the 2019-08-12 T13:44:00. 000 z ", "key" : 1565617440000, "doc_count" : 4}]}},... }}}

As with the CPU example, without nested aggregation, date histogram aggregation only returns doc_count, which is not very useful.

The byte field contains monotonically increasing values. The values of these fields include the number of bytes sent or received since the machine was last started, so the value increases each time you measure it. In this case, the correct nested aggregation is Max, so the scaled down value will include the highest value, which is the most recent measurement taken during the bucket interval.

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } } } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T13:50:00.000z ", "key" :1565617800000, "doc_count" :2, "myNetworkinBytesMax" : {"value" :2.991659837E9}, "myNetWorkoutBytesMax" : {"value" :5.46578365E8}}, {"key_as_string" :" 2019-08-12T13:51:00.000z ", "doc_count" :6, "MyNetWorkInBytesMax" : {"value" :2.992027006E9}, "myNetWorkOutBytesMax" : {" value ": 5.46791988 e8}," myNetworkInBytesPerSecond ": {" value ": 367169.0," normalized_value ": 6119.483333333334}," myNetworkoutBytesPerSecond ": {" value ": 213623.0," normalized_value ": 3560.383333333333}},...]}},... }}}

To get the byte rate per second from monotonically increasing counts, you need to use derivative aggregation. When the aggregate receives the passed optional parameter unit, the desired number per unit is returned in the normalized_value field: normalized_value = normalized_value; normalized_value = normalized_value;

Example queries:

GET metricbeat-*/_search { "query": {... }, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkInBytesPerSecond": { "derivative": { "buckets_path": "myNetworkInBytesMax", "unit":"1s" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } }, "myNetworkoutBytesPerSecond": { "derivative": { "buckets_path": "myNetworkOutBytesMax", "unit":"1s" } } } } } } } }

The sample response is as follows:

{... , "hits" : {... }, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [{"key_as_string" :" 2019-08-12T14:07:00.000z ", "key" :1565618820000, "doc_count" :4, "myNetWorkinBytesMax" : {"value" :3.030494669E9}, "myNetWorkoutBytesMax" : {"value" :5.56084749E8}}, {"key_as_string" :" cial08-12t14:08:00.000z ", "doc_count" :6, "cial08-12t14:08:00.000z ", "MyNetWorkInBytesMax" : {"value" :3.033793744E9}, "myNetWorkOutBytesMax" : {" value ": 5.56323416 e8}," myNetworkInBytesPerSecond ": {" value ": 3299075.0," normalized_value ": 54984.583333333336}," myNetworkoutBytesPerSecond ": {"value" :238667.0, "normalized_value" :3977.7833333333333}}, {"key_as_string" :" 2019-08-12T14:09:00.000z ", "Key" :1565618940000, "doc_count" :6, "myNetworkInBytesMax" : {"value" :3.037045046E9}, "myNetworkOutBytesMax" : {" value ": 5.56566282 e8}," myNetworkInBytesPerSecond ": {" value ": 3251302.0," normalized_value ": 54188.36666666667}," myNetworkoutBytesPerSecond ": {" value ": 242866.0," normalized_value ": 4047.766666666667}},...]}},... }}}

You can try all the commands on your own cluster. If you don’t already have a cluster, you can try Elastic Cloud based Elasticsearch Service for free and quickly deploy a cluster. You can also download the default Elastic Stack distribution. Use MetricBeat to start sending data from your system and enjoy your queries!