In this article today, the Logstash focus on data conversion, analysis, extraction and core operation of convenient content. First, hopefully you’ve already set up a Logstash as I did in my previous article “How to Install a Logstash in an Elastic stack.”

 

 

Logstash data source

 

We know that Logstash can be used in many applications. It has various data sources, such as:

The data is rich and colorful. In order for the data to end up in Elasticsearch for analysis, we have to do a lot of processing on the data source to form useful information.

 

Logstash plugins  

When you run the Logstash instance, in addition to starting the configured pipe, it also starts the Logstash monitoring API endpoint on port 9600. Note that the Logstash monitoring API is only available in Logstash 5.0+ and later. We can view all our installed plugins in our browser at the following address:

http://localhost:9600/_node/plugins?pretty
Copy the code

 

Logstash is a very easy framework to scale. It can analyze and process all kinds of data. This relies on the more than 200 plugins currently available. First, let’s take a look at what plugins are currently available:

Input plugins:

We first go to the bin subdirectory under the Logstash installation directory and type the following command on the command line:

$ ./logstash-plugin list --group input
Copy the code

Display:

logstash-input-azure_event_hubs
logstash-input-beats
logstash-input-couchdb_changes
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jdbc
logstash-input-jms
logstash-input-kafka
logstash-input-pipe
logstash-input-rabbitmq
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
Copy the code

Filter plugs:

Type the following command on the command line:

$ ./logstash-plugin list --group filter
Copy the code
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-jdbc_static
logstash-filter-jdbc_streaming
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
Copy the code

Output plugins:

Type the following command on the command line:

$ ./logstash-plugin list --group output
Copy the code
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elastic_app_search
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-rabbitmq
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
Copy the code

Codec plugins:

Type the following command on the command line:

$ ./logstash-plugin list codec
Copy the code
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
Copy the code

It shows that all the plugins have been configured for us after we installed the Logstash. We can develop our own plugin and install it ourselves. We can also install a plugin that someone else has already developed.

As can be seen from above, since file is in input and output, we can even do the following configuration:

Input {file {path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*" type => "Apache"}} output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }Copy the code

So we read the input file into the Logstash file, and after processing it, we get output like this:

0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] "GET/HTTP/1.1" 200 11418Copy the code
{"path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt", "@ timestamp" : "the 2016-12-25 T10:37:00. 363 z", "@ version" : "1", "the host" : "Dell - PC," "Message" : "0:0:0:0:0:0:1-0 - [25 / Dec / 2016:18:37:00 + 0800] \" GET/HTTP / 1.1 \ "200\11418 r", "type" : "apache", "tags" : []}Copy the code

Install the plugins

In the standard Logstash, there are many plugins already installed, but in some cases, we need to manually install the required plugins, such as the Exec Output Plugin. We can start with the following command in the bin directory:

./bin/logstash-plugin install logstash-output-exec
Copy the code

To check if the plugin has been successfully installed, use the following command:

./bin/logstash-plugin list --group output | grep exec
Copy the code
$ ./bin/logstash-plugin list --group output | grep exec Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in Version 9.0 and will likely be removed in a future release. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.bouncycastle.jcajce.provider.drbg.DRBG (file: / Users/liuxg/elastic/logstash - 7.4.0 / vendor/app/lib/ruby/stdlib/org/bouncycastle/bcprov - jdk15on / 1.61 / bcprov - jdk15 On - 1.61. Jar) to the constructor sun. Security. The provider. The sun () WARNING: Please consider reporting this to the maintainers of org.bouncycastle.jcajce.provider.drbg.DRBG WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release logstash-output-execCopy the code

Reading log files

 

Logstash is easy to set up to read a log file. For example, we can read an Apache log file as follows:

input {
  file { 
  	type => "apache"
  	path => "/Users/liuxg/data/apache_logs"
 	start_position => "beginning"
	sincedb_path => "null"
  }
}

output {
	stdout { 
		codec => rubydebug 
	}
}
Copy the code

We can even read multiple files:

# Pull in application-log data. They emit data in JSON form.
input {
  file {
    path => [
      "/var/log/app/worker_info.log",
      "/var/log/app/broker_info.log",
      "/var/log/app/supervisor.log"
    ]
    exclude => "*.gz"
    type    => "applog"
    codec   => "json"
  }
}
Copy the code

 

Serialization of data

 

We can serialize our data using the provided Codec, for example:

// Deserialize newline separated JSON file {path => "/some/sample.log", codec => json } } output { // Serialize to the msgpack format redis { codec => msgpack } stdout { codec => rubydebug } }Copy the code

With our Longstash up and running, we can add content to the sample.json file ina terminal with the following command:


$ echo '{"name2", "liuxg2"}' >> ./sample.log
Copy the code

We can see the following output:

{" @ version "= >" 1 ", "message" = > "{\" name2 \ ", \ "liuxg2 \"} ", "@ timestamp" = > 2019-09-12 T07:37:56. 639 z, "host" => "localhost", "tags" => [ [0] "_jsonparsefailure" ], "path" => "/Users/liuxg/data/sample.log" }Copy the code

The most commonly used codec

1) Line uses the data in “Message” to convert each row into a Logstash event. You can also format the output as a custom line.

2) Multiline: allows you to form arbitrary boundaries for “message”. Often used for stackfunctions, etc. This can also be done in FileBeat.

3) JSON_lines: Parse JSON data separated by newlines

4) JSON: Parse all JSON. Only for message-oriented input/output, such as Redis/Kafka/HTTP, etc

There are many other codecs.

 

Analysis and extraction

 

Grok Filter

filter {
	grok {
		match => [
			"message", "%{TIMESTAMP_ISO8601:timestamp_string}%{SPACE}%{GREEDYDATA:line}"
		]
	}
}
Copy the code

The above example makes it easy to turn the following log information into an organizational data:


2019-09-09T13:00:00Z Whose woods these are I think I know.
Copy the code

More grok patterns can be found at the address grok Pattern.

 

Date filter

 

filter {
  date {
    match => ["timestamp_string", "ISO8601"]
  }
}
Copy the code

The Date filter can help us convert a string to a desired format and assign that value to the @TIMESTAMP field.

Dissect filter

 

Is a faster, lighter and smaller Grok:

The filter {4} {mapping = > {" message "= >" % {id} {server} {function - >} % % "}}}Copy the code

The format of the field and delimiter modes is similar to Grok.

Example:

Input {generator {message = > "< 1 > Oct 16 20:21:22 www1 1201 6/10/16 20:21:20, 3 -- kyoui, SCAN, 6201 6/10/16 20:21:20, 8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45 , 46,47,48,49,50,51,52,53,54 "count = > filter {1}} if [message] = ~" -- kyoui, "{4 {mapping = > {message = > "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pa n_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app}, %{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_ 03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_fl ags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number },%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan _cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan _report_id},%{pan_anymore}" } } } } output { stdout { codec => rubydebug } }Copy the code

After the operation:

{" @ timestamp "= > 2019-09-12 T09: who fell. The 514 z," pan_dst_ip "= >" 9 ", "pan_nat_src_ip" = > "10", "the sequence" = > 0, "logsource" => "www1", "pan_session_id" => "23", "pan_vsys" => "16", "pan_cat" => "34", "pan_rule_name" => "12", "pan_gen_time" => "2016/10/16 20:21:20", "pan_seq_number" => "37", "pan_subject" => "50", .... "The message" = > "< 1 > Oct 16 20:21:22 www1 1201 6/10/16 20:21:20, 3, -- kyoui, SCAN, 6201 6/10/16 20:21:20, 8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45 , 46,47,48,49,50,51,52,53,54 pan_fut_use_02 ", "" = >" 6 ", "pan_flags" = > ", "" 29 syslog_timestamp" = > "Oct 16 20:21:22", 53 "pan_anymore" = > ", "}Copy the code

For more information, please visit the address.

 

KV filter

An easy way to parse data in key/value pairs

Filter {kv {source => "message" target => "parsed" value_split => ":"}}Copy the code

Let’s run a conf file like this:

input { generator { message => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345" count => 1 } } filter { kv { source =>  "message" target => "parsed" field_split => "&?" } } output { stdout { codec => rubydebug } }Copy the code

The results are as follows:

{" @ timestamp "= > 2019-09-12 T09:46:04. 944 z," the host "= >" localhost ", "parsed" = > {" ss "= >" 12345 ", "e" = > "[email protected]." "pin" => "12345~0", "oq" => "bobo", "d" => "123" }, "message" => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345", "sequence" => 0, "@version" => "1" }Copy the code

For KV Flter, we can also use a target to organize information into an object, such as:

Filter {kv {source => "message" target => "parsed" value_split => ":"}}Copy the code

 

Core operations

 

Mutate filter

The filter provides a number of features:

  • Converting field types (from string to integer, etc.)
  • Add/rename/replace/copy fields
  • Uppercase/lowercase conversion
  • Concatenate arrays together (useful for Array => String operations)
  • Merge hash
  • Split fields into arrays
  • Strip whitespace
input { generator { message => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345" count => 1 } } filter { kv { source =>  "message" field_split => "&?" } if [pin] == "12345~0" { mutate { add_tag => [ 'metrics' ] } mutate { split => ["message", "&"] add_field => {"foo" => "bar-%{pin}"} } } } output { stdout { codec => rubydebug } if "metrics" in [tags] { stdout {  codec => line { format => "custom format: %{message}" } } } }Copy the code

The results are as follows:

{ "foo" => "bar-12345~0", "e" => "[email protected]", "sequence" => 0, "message" => [ [0] "pin=12345~0", [1] "d=123", [2] "[email protected]", [3] "oq=bobo", [4] "ss=12345" ], "pin" => "12345~0", "d" => "123", "host" => "localhost", "Ss" = > "12345", "@ timestamp" = > 2019-09-14 T15:03:15. 141 z, "oq" = > "bobo", "@ version" = > "1", "tags" => [ [0] "metrics" ] } custom format: pin=12345~0,d=123,[email protected],oq=bobo,ss=12345Copy the code

Core transformed filters

  • Mute – Modify/add each item
  • Split – Converts one event to multiple events
  • Drop – To Drop an event

Conditional logic

  • if/else
  • You can use regexps with =~
  • You can check a member in an array
Filter {mutate {lowercase => "account"} if [type] == "Batch" {split {field => Actions target => action}} if { Action =~ /special/} {drop {}}}Copy the code

GeoIP

GeoIP filter rich IP address information:

Filter {geoip {fields => "my_geoIP_field"}}Copy the code

Run the following configuration:

Input {generator {message => "83.149.9.216" count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip}' } } geoip { source => "clientip" } } output { stdout { codec => rubydebug } }Copy the code

The following information is displayed:

{" host "= >" localhost ", "@ version" = > "1", "clientip" = > "83.149.9.216", "message" = > "83.149.9.216." "@timestamp" => 2019-09-15T06:54:46.695z, "sequence" => 0, "geoip" => {"timezone" => "Europe/Moscow", Region_code = > "MOW," "latitude" = > 55.7527, "country_code3" = > "RU", "continent_code" = > "EU", "Longitude" = > 37.6172, "country_name" = > "Russia", "location" = > {" lat "= > 55.7527," says lon "= > 37.6172}, "IP" = > "83.149.9.216", "postal_code" = > "102325", "country_code2" = > "RU", "region_name" = > "Moscow", "city_name" => "Moscow" } }Copy the code

We can see that under geoIP, there is a lot of concrete information.

DNS filter

Enrich host names with DNS information for more information

Filter {DNS {fields => "my_dns_field"}}Copy the code

We define a Logstash configuration file as follows:

input { generator { message => "www.google.com" count => 1 } } filter { mutate { add_field => { "hostname" => "172.217.160.110"}} DNS {reverse => ["hostname"] action => "replace"}} output {stdout {codec => rubydebug}Copy the code

Here is Google’s address, so its output is:

{"host" => "localhost", "sequence" => 0, "message" => "www.google.com", "@timestamp" => 2019-09-15t11:35:43.791z, "Hostname" = > "tsa03s06-in-f14.1e100.net", "@ version" = > "1"}Copy the code

Here we can see the hostname value.

Useragent filer

Enrich the browser userAgent information. We use the following Logstash configuration:

Input {generator {message => '83.149.9.216 -- [17/May/2015:10:05:50 +0000] "GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG ", 200, 321631 "Http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, Like Gecko) Chrome/32.0.1700.77 Safari/537.36"' count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (? :-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' } } useragent { source => "agent" target => "useragent" } } output {  stdout { codec => rubydebug } }Copy the code

The result is:

{ "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png", "useragent" => { "name" => "Chrome", "build" => "", "device" => "Other", "os_major" => "10", "os" => "Mac OS X", "minor" => "0", "major" => "32", "os_name" => "Mac OS X", "patch" => "1700", "os_minor" => "9" }, "sequence" => 0, "The message" = > "83.149.9.216 - [17 / May / 2015:10:05:50 + 0000] \" GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG \ ", 200, 321631 \ \ "http://semicomplete.com/presentations/logstash-monitorama-2013/\" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "timestamp" => "17/May/2015:10:05:50 +0000", "Referrer" = > "\" http://semicomplete.com/presentations/logstash-monitorama-2013/\ ""," clientip "= >" 83.149.9.216." "ident" => "-", "auth" => "-", "response" => 200, "@version" => "1", "verb" => "GET", "host" => "localhost", "@timestamp" => 2019-09-15t12:03:34.650z, "httpversion" => "1.1", "bytes" => 321631, "Agent" = > "\" Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"}Copy the code

We can see more detailed information in userAgent.

Translate Filter

Use local data to enrich the data. We use the following Logstash configuration file:

Input {generator {message => '83.149.9.216 -- [17/May/2015:10:05:50 +0000] "GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG ", 200, 321631 "Http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, Like Gecko) Chrome/32.0.1700.77 Safari/537.36"' count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (? :-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' } } translate { field => "[response]" destination => "[http_status_description]" dictionary => { "100" => "Continue" "101" => "Switching Protocols" "200" => "OK" "500" => "Server Error" } fallback => "I'm a teapot" } } output { stdout { codec => rubydebug } }Copy the code

The result of the run is:

{ "auth" => "-", "host" => "localhost", "timestamp" => "17/May/2015:10:05:50 +0000", "The message" = > "83.149.9.216 - [17 / May / 2015:10:05:50 + 0000] \" GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG \ ", 200, 321631 \ \ "http://semicomplete.com/presentations/logstash-monitorama-2013/\" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "Httpversion" = > "1.1", "@ version" = > "1" and "response" = > 200, "clientip" = > "83.149.9.216", "verb" = > "GET", "sequence" => 0, "referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"", "Agent" = > "\" Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "Http_status_description" => "OK", "ident" => "-", "@timestamp" => 2019-09-15t12:30:09.575z, "bytes" => 321631, "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png" }Copy the code

We can see an http_status_description entry that changes to “OK”.

Elasticsearch Filter

Get data from index in Elasticsearch and enrich events. To do this test, we’ll create an index called elasticSearch_filter:

PUT / _doc / 1 c {" name ":" liuxg ", "age" : 20, "@ timestamp" : "2019-09-15"}Copy the code

What I must point out here is that we must have an item called @timestamp, otherwise we will get an error. This is used to do sort.

We use the following Logstash configuration:

input {
  generator {
    message => "liuxg"
    count => 1
  }
}

filter {
	elasticsearch {
		hosts => ["http://localhost:9200"]
		index => ["elasticsearch_filter"]
		query => "name.keyword:%{[message]}"
		result_size => 1
		fields => {"age" => "user_age"}
	}
}

output {
	stdout {
		codec => rubydebug
	}
}
Copy the code

Running the above example shows the result:

{ "user_age" => 20, "host" => "localhost", "message" => "liuxg", "@version" => "1", "@timestamp" => 2019-09-15t13:21:29.742z, "sequence" => 0}Copy the code

We can see that user_age is 20. This is obtained by searching for name:liuxg.

Reference:

【1】Getting started with Logstash