The Logstash Prune filter plug-in makes use of whitelists to ensure that only specific required fields are output from the Logstash and all other fields are removed. In this blog post, we demonstrated how to whitelist required fields and required subdocuments using Logstash before indexing to Elasticsearch.

 

Example input document

As input to the Logstash, we use a CSV file containing stock market transactions. Here are some examples of CSV stock market transactions.

1483230600162, 8.75, 1678.1, 1772.8, 2443.6 1483232400161, 3.63, 1688.5, 1750.5, 2460.2 1483234200160, 6.51, 1678.6, 8.2 1718244 1483236000162, 1.04, 1684.1, 1708.1, 2470.4Copy the code

Comma-separated values stand for “time”, “DAX”, “SMI”, “CAC”, “FTSE”. You may want to copy and paste the above lines into a CSV file called stocks.csv to execute the sample command line given later in this blog post.

 

Sample Logstash pipeline

Here is a Logstash pipeline that you can store in a file called stocks.conf that will do the following:

  • Read stock market transactions from standard input in CSV format.
  • Map each line of the CSV input to the JSON document, where the CSV column maps to the JSON field.
  • Convert the time field to Unix format.
  • Move the DAX and CAC fields into a nested structure called “my_NEST”.
  • Whitelisted the “my_NEST” field (which contains subdocuments) and the “SMI” field to remove all other (unwhitelisted) fields.
  • Write the generated document to the Elasticsearch index named “stocks_whitelist_test”.

stocks.conf

# For this simple example, pipe in data from stdin. input { stdin {} } filter { csv { columns => ["time","DAX","SMI","CAC","FTSE"] separator => ","  convert => { 'DAX' => 'float' 'SMI' => 'float' 'CAC' => 'float' 'FTSE' => 'float'} } date { match => ['time', 'UNIX'] } mutate { # Move DAX and CAC into a sub-document # called 'my_nest' rename => { "DAX" => "[my_nest][DAX]" "CAC"  => "[my_nest][CAC]" } } # Remove everything except "SMI" and the # "my_nest" sub-document prune { whitelist_names => [ "SMI", "my_nest" ] } } output { stdout { codec => dots } elasticsearch { index => "stocks_whitelist_test" } }Copy the code

Test the Logstash pipeline

To test the pipeline using the sample CSV data, you can do something similar to the following command and modify it to ensure that you are using the correct system path:

cat ./stocks.csv | ./bin/logstash -f ./stocks.conf
Copy the code

You can check the data stored in Elasticsearch by executing the following command from the Kibana developer console:

GET /stocks_whitelist_test/_search
Copy the code

Documents with the following structure should be displayed:

"hits" : [ { "_index" : "stocks_whitelist_test", "_type" : "_doc", "_id" : "Pk4R0nMBSNqQApgz9SS0", "_score" : 1.0, the "_source" : {" SMI ": 1688.5," my_nest ": {" DAX" : 1613.63, "CAC" : 1750.5}}}, {" _index ": "Stocks_whitelist_test _type", "" :" _doc ", "_id" : "PE4R0nMBSNqQApgz9CTu", "_score" : 1.0, "_source" : {" SMI ": 1684.1, "my_nest" : {" DAX ": 1621.04," CAC ": 1708.1}}},...Copy the code

Note that only “my_nest” and “SMI” have been indexed, as shown in the contents of the document “_source”. Also notice that the “FTSE” and “time” fields have been removed because they are not in the whitelist of the Prune filter.

 

conclusion

In this blog post, we demonstrated how the Logstash Prune filter plug-in leverageswhitelists to ensure that Logstash only outputs specific required fields.