The background,

Recently, the company used FileBeat, so I learned this technology. Filebeat is a lightweight log collection tool developed in Golang that can forward logs to ES, Kafka, etc. Official support for FileBeat is the most comprehensive. Filebeat performs well and is easy to deploy, making it an ideal tool for collecting files. Filebeat was also developed to replace LogStash, which takes up less memory than logStash. Of course, there are also disadvantages. For example, The official functions provided by FileBeat are relatively simple, which often cannot meet our needs. We often collect logs into Kafka and continue processing with the help of tools such as Flink.

Second, the principle of

Here’s how FileBeat works. The Harvester module of FileBeat collects one or more lines of logs after a service is written to the Harvester log. The Harvester aggregate log is then sent to an exporter such as ES or Kafka. To preserve file state, Harvester records the offset of the log to the Registry file. Each input corresponds to a Registry file, which will be restored from if FileBeat is restarted. How does FileBeat ensure that logs are sent successfully at least once? In fact, the registry file is also used. After sending, if there is no confirmation of success, filebat will resend the file until it succeeds.

Docker deployment

Considering docker’s advantages, such as consistent operating environment and easy migration, we use Docker to deploy FileBeat. Here is an example of a Docker-compose deployment

3.1. Configure docker-compose file
version: "3"
services:
    elasticsearch:
      image: Elasticsearch: 7.11.1
      container_name: elasticsearch
      hostname: elasticsearch1
      environment:
        - discovery.type=single-node
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      ports:
        - 9200: 9200
        - 9300: 9300
      networks:
        - "elk-net"

    kibana:
      image: Docker. Elastic. Co/kibana/kibana: 7.1.1
      environment:
        - SERVER_NAME=kibana
        - ELASTICSEARCH_URL=http://elasticsearch1:9200
        - XPACK_MONITORING_ENABLED=true
      ports:
        - "5601:5601"
      networks:
        - "elk-net"
      depends_on:
        - "elasticsearch"

    filebeat:
      image: Docker. Elastic. Co/beats/filebeat: 7.1.1
      volumes:
        - ./filebeat/logs/nginx:/var/log/nginx/
        - ./filebeat/logs/biz:/var/log/biz/
        - ./filebeat/logs/log4j:/var/log/log4j/

        - ./filebeat/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
      networks:
        - "elk-net"
      depends_on:
        - "elasticsearch"
        - "kibana"
networks:
      elk-net:
Copy the code

Log4j, Nginx, and business logs will be collected to ES and then displayed to Kibana

3.2. Configure FileBeat
# Details are as follows:
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/*.log
  scan_frequency: 10s
  tail_files: true
  fields:
    index_name: "nginx_log"
- type: log
  enabled: true
  # Lines that do not begin with [are merged to the end of the previous line
  multiline.type: pattern
  multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
  multiline.negate: false
  multiline.match: after
  paths:
    - /var/log/log4j/*.log
  fields:
    index_name: "log4j_log"
- type: log
  enabled: true
  multiline.type: pattern
  multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
  multiline.negate: false
  multiline.match: after
  fields:
    index_name: "biz_log"
  scan_frequency: 10s
  pipeline: "extract-traceid-pipeline"
  paths:
    - /var/log/biz/*.log



This index life cycle needs to be disabled in the # # # 7.x version, otherwise there will be problems when specifying the es index name
setup.ilm.enabled: false
setup.template.name: "my-log"
setup.template.pattern: "my-*"
setup.template.enabled: true
setup.template.overwrite: false

# output to es
output.elasticsearch:
  #worker: 1
  #bulk_max_size: 1500
  hosts: ["elasticsearch1:9200"]
  index: "pb-%{[fields.index_name]}- *"
  indices:
    - index: "pb-nginx-%{+yyyy.MM.dd}"
      when.equals:
        fields.index_name: "nginx_log"
    - index: "pb-log4j-%{+yyyy.MM.dd}"
      when.equals:
        fields.index_name: "log4j_log"
    - index: "pb-biz-%{+yyyy.MM.dd}"
      when.equals:
        fields.index_name: "biz_log"

Copy the code

Pipeline to es

curl -H "Content-Type: Application/json "- XPUT 'http://127.0.0.1:9200/_ingest/pipeline/extract-traceid-pipeline' [email protected]Copy the code

Results the following

Filebeat key parameters

4.1. How to distinguish different logs
  • Fields, adding attachment fields to enable Values, Arrays, dictionaries, or any nested data. You can also add conditional statements to the output, when. Equals, and then output to different indexes
  • Enter the Add Tag field, which can be used for grouping
4.2 How to extract parameters, taking Trace as an example

This can be extracted using gork syntax. The steps are as follows

  • Write the Pipleline file
  "description" : "extract-traceid-pipeline",
  "processors" : [
    {
      "grok" :{            
        "field" : "message",
        "patterns" : ["ERROR\\|%{DATA:trace_id}\\|"]  
      }
    }
  ]
}
Copy the code
  • The test results

www.5axxw.com/tools/v2/gr…

Write to es and FileBeat profiles

curl -H "Content-Type: Application/json "- XPUT 'http://127.0.0.1:9200/_ingest/pipeline/extract-traceid-pipeline' [email protected]Copy the code

Viewing the write Effect

The final result

4.2. How to collect the Java exception stack
  multiline.type: pattern
  multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
  multiline.negate: false
  multiline.match: after
Copy the code
4.3 Why doesn’t Pipleline work

The official website is in out, but this version of the actual configuration does not work, need to be added to the input

- type: log
  enabled: true
  multiline.type: pattern
  multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
  multiline.negate: false
  multiline.match: after
  fields:
    index_name: "biz_log"
  scan_frequency: 10s
  pipeline: "extract-traceid-pipeline"
  paths:
    - /var/log/biz/*.log
Copy the code
4.4 What if gorK cannot handle the complex business logs

This situation is typically output to Kafka and then handled by other middleware, such as Flink.

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092"."kafka2:9092"."kafka3:9092"]
  # message topic selection + partitioning
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
Copy the code
4.5. How to improve collection efficiency?
  • Elasticsearch Bulk API bulk API for ElasticSearch Bulk API Bulk API for ElasticSearch Bulk API Bulk API for ElasticSearch Bulk API We have a document(or event) per row, so fileBeat only sends 50 rows per row by default, so when we add hundreds of thousands of rows, we can easily calculate how many times we need to push BULK Request to complete the data entry of the file

  • Worker is also an output. elasticSearch property. We can specify the maximum concurrency that FileBeat uses to send data to and from Elastic. We can also increase this value appropriately. [“10.0.07:9200″,”10.0.08:9200″,”10.0.09:9200”], we can set the worker to 3.

  • The harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester Harvester Harvester Harvester Harvester Harvester If we want to increase the read throughput of certain files, we can adjust the size of this value. You can determine the throughput size of different files by defining multiple inputs, each individually specified

Reference documentation

  • 1. Official documents
  • 2. What is the relationship between logstash and FileBeat
  • 3. Identify sources