ELK+kafka+ fileBeat Set up a production ELFK cluster

ELK architecture

Cluster Service Version

service version
java 1.8.0 comes with _221
elasticsearch 7.10.1
filebeat 7.10.1
kibana 7.10.1
logstash 7.10.1
cerebro 0.9.2-1
kafka 2.12-2.3.0
zookeeper 3.5.6

Server Environment

The IP address The host name configuration role
10.0.11.172 elk-master 4C16G Es – master, kafka + zookeeper1
10.0.21.117 elk-node1 4C16G Es – node1, kafka + zookeeper2
10.0.11.208 elk-node2 4C16G Es – 2, kafka + zookeeper3
10.0.10.242 elk-kibana 4C16G Logstash, Kibana, Cerebro

System parameter optimization

{{< notice >}} All three nodes need to execute {{< /notice >}}

Changing the host Name

hostnamectl set-hostname elk-master
hostnamectl set-hostname elk-node1
hostnamectl set-hostname elk-node2
Copy the code

Add file descriptors

cat >>/etc/security/limits.conf<< EOF
*               soft      nofile          65536
*               hard      nofile          65536
*               soft      nproc           65536
*               hard      nproc           65536
*               hard      memlock         unlimited
*               soft      memlock         unlimited
EOF
Copy the code

Modify the default memory limit

cat >>/etc/systemd/system.conf<< EOF
DefaultLimitNOFILE=65536
DefaultLimitNPROC=32000
DefaultLimitMEMLOCK=infinity
EOF
Copy the code

Optimized kernel for ES support

Cat >>/etc/sysctl.conf<< EOF # disable swap memory vm.swappiness =0 # Max_map_count = 262144 # Optimized kernel listen connection NET.core. somaxconn=65535 # Maximum number of open file descriptors You are advised to change the value to 655360 or higher fs.file-max=655360 # Enable ipv4 forwarding net.ipv4.ip_forward= 1 EOFCopy the code

Modify the Hostname configuration file

Cat >>/etc/hosts<< EOF elk-master 10.0.11.172 elk-node1 10.0.21.117 ELk-node2 10.0.11.208 EOFCopy the code

Restart for the configuration to take effect

reboot
Copy the code

Deploy the Zookeeper

{{< notice >}} All three nodes need to execute {{< /notice >}}

Create the Zookeeper project directory

# mkdir zkdata # mkdir zklogsCopy the code

Decompress ZooKeeper

Wget tar ZXVF - http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz Apache - they are - 3.5.6 - bin. Tar. Gz mv apache - they are - 3.5.6 - bin zookeeperCopy the code

Modifying a Configuration File

[root @ elk - master zookeeper] # cat conf/zoo. The CFG | grep -v ^ # # server or between the time interval between client and server to maintain a heartbeat # tickTime in milliseconds. TickTime =2000 # Number of initial connection heartbeats between the follower server (F) and the leader server (L) in the cluster initLimit=10 # Maximum number of heartbeats that can be tolerated between the follower server and the leader server for requests and replies in the cluster SyncLimit =5 # dataDir=.. /zkdata # dataLogDir=.. /zklogs # clientPort=2181 # Maximum number of client connections MaxClientCnxns =60; maxClientCnxns=60; Service number = Service ADDRESS, LF communication port, and election port Server. 1=10.0.11.172:2888:3888 server.2=10.0.21.117:2888:3888 server.3=10.0.11.208:2888:3888Copy the code

Write node mark

{{< notice warning “note” >}} respectively in three nodes/home/tools/zookeeper/zkdata/myid tag written node {{< / notice >}}

{{< tabs master node1 Node2 node >}} {{< TAB >}}

The operation of the master
echo "1" > /home/tools/zookeeper/zkdata/myid
Copy the code

{{< /tab >}}

{{< tab >}}

The operation of the node1
echo "2" > /home/tools/zookeeper/zkdata/myid
Copy the code

{{< /tab >}}

{{< tab >}}

The operation of the 2
echo "3" > /home/tools/zookeeper/zkdata/myid
Copy the code

{{< /tab >}}

{{< /tabs >}}

Start the ZooKeeper cluster

[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/ [root@elk-master bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /home/tools/zookeeper/bin/.. /conf/zoo.cfg Starting zookeeper ... STARTEDCopy the code

Checking cluster Status

[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/tools/zookeeper/bin/.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leaderCopy the code

Setting global variables

cat >>/etc/profile<< EOF export ZOOKEEPER_INSTALL=/home/tools/zookeeper/ export PATH=$PATH:$ZOOKEEPER_INSTALL/bin export  PATH EOFCopy the code
  • Source /etc/profile So that the zkserver. sh command can be used globally

    The deployment of Kafka

    {{< notice >}} All three nodes need to execute {{< /notice >}}

    Download the kafka package

    [root@elk-master tools]# mkdir kafka [root@elk-master tools]# cd kafka/ [root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz [root @ elk - master kafka] # tar xf kafka_2. 12-2.3.0. TGZ [root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka [root@elk-master kafka]# CD kafka/config/Copy the code

Configuration kafka

[root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties ############################# Server Basics # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # broker id, value as an integer, and must be the only, In a cluster can't repeat. Broker id = 1 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Socket Server Se: Ttings # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # kafka listening by default port is 9092 (the default connection with host name) listeners = PLAINTEXT: / / : 9092 # processing network request the number of threads, Num.io. Threads =8 num.io. Threads =8 num.io. Default 100KB socket.sess.buffer. Bytes =102400 # Buffer size of data received by socket service, . The default 100 KB socket. The receive buffer. The bytes = 102400 # socket service can accept a request the maximum size, Default is 100 m socket request. Max. 104857600 bytes = # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Log Basics # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Kafka log. Dirs =.. Each topic/kfkdata # default partition number num. Partitions = 3 # when it starts to recover data and closing the refresh data for each data directory number of threads num. Recovery. Threads. Per. Data. The dir = 1 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Log Flush the Policy # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # news news article number threshold of flushed to disk # the flush. The interval. The messages = 10000 # message of maximum time interval flushed to disk, 1 # s log. Flush. Interval. Ms = 1000 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # log Log.retention. Hours =168 # Log Retention Policy ############################# # Number of log Retention hours, which are automatically deleted when timeout expires. The default value is 7 days. The default value is 1 gb #log.retention. Bytes =1073741824 # The maximum size of a log file is 1 GB. Beyond a new log file is created after the segment. The bytes = 1073741824 # whether how long every testing data to remove conditions, the 300 s the retention. Check. Interval. Ms = 300000 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Zookeeper # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Zookeeper connection information, if it is a Zookeeper cluster, They are to be set off by a comma. Connect = 10.0.11.172, 10.0.21.117, zookeeper 10.0.11.208 # connection timeout, 6 s zookeeper. Connection. A timeout. Ms = 6000Copy the code

Create a directory for storing data

[root@elk-master config]# mkdir .. /kfkdataCopy the code

Modify the broker. Id

{{< notice warning “note” >}} respectively in the three nodes in turn modify/home/tools/kafka kafka/config/server properties configuration file {{< / notice >}} {{< tabs Master node Node1 Node2 node >}} {{< TAB >}}

The configuration of the master
broker.id=1
Copy the code

{{< /tab >}}

{{< tab >}}

Node1 configuration
broker.id=2
Copy the code

{{< /tab >}}

{{< tab >}}

2 the configuration of the
broker.id=3
Copy the code

{{< /tab >}}

{{< /tabs >}}

Start the Kafka cluster

CD/home/tools/kafka kafka/bin / # start testing. / kafka - server - start. Sh.. / config/server. The properties # in the background. / kafka - server - start. Sh - daemon.. /config/server.propertiesCopy the code

test

{{< notice warning “warning” >}}

This command can be executed on any node

{{< /notice >}}

Publish messages subscribe to message validation results at any node in the cluster in the creation topic

{{< tabs create topic messages publish topic messages subscribe >}} {{< TAB >}}

[root @ elk - master bin] #. / kafka - switchable viewer. Sh \ - create \ - zookeeper 10.0.11.172:2181,10.0. 21.117:2181,10.0. 11.208: \ 2181 --partitions 3 \ --replication-factor 1 \ --topic logsCopy the code

{{< /tab >}}

{{< tab >}}

[root @ elk - master bin] #. / kafka - the console - producer. Sh \ - broker - list 10.0.11.172:9092,10.0. 21.117:9092,10.0. 11.208: \ 9092 --topic logsCopy the code

{{< /tab >}}

{{< tab >}}

[root@elk-master bin]# ./kafka-console-consumer.sh \ --bootstrap-server 10.0.11.172:9092,10.0. 21.117:9092,10.0. 11.208: \ 9092 - topic logs \ - from - beginningCopy the code

{{< /tab >}}

{{< /tabs >}}

Elasticsearch deployment

{{< notice >}} All three nodes need to execute {{< /notice >}}

Download and install ElasticSearch

Wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.1-x86_64.rpm [root @ elk - master package] # RPM - the ivh elasticsearch 7.10.1 - x86_64. RPMCopy the code

Backing up configuration Files

cd /etc/elasticsearch
cp elasticsearch.yml  elasticsearch.yml.bak
Copy the code

Modifying a Configuration File

The cat > / etc/elasticsearch/elasticsearch. Yml < < EOF # cluster. The cluster name name: elk - # cluster node, the node name. Name: Elk - 1 # data storage path path. Data: / home/elasticsearch/esdata # data snapshot path path. The repo: / home/backup/essnapshot # log store path path. The logs: / home/elasticsearch/eslogs # es binding IP address, modify according to your machine IP network. The host: HTTP 0.0.0.0 # service port. The port: Discovery. seed_hosts: [" 10.0.11.172 10.0.21.117 ", ""," 10.0.11.208 "] cluster. Initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"] # http.kors. enabled: true #* "*" # add request headers http.coron. allow-headers: Authorization, x-requested-with, content-Length, content-type # production must be true, memory lock check, the purpose is to map memory address directly, Memory_lock: true # System filter check to prevent data corruption. For cluster security, set production to false bootstrap.system_call_filter: False # xpack configuration xpack. Security. Enabled: true xpack. Security. Transport.. SSL enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12 xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12 EOFCopy the code

Modify the JVM

  • willjvm.optionsLines 22-23 in the file8gSet to half of your services memory ` ` ` [root @ elk rac-node1 elasticsearch] # cat – n JVM. The options | grep 22-23 – Xmx8g Xms8g 8 g
    # # # modify other node configuration {{< notice warning "note" >}} respectively in three nodes modified ` / etc/elasticsearch/elasticsearch yml ` configuration file {{< / notice >}}Copy the code

{{< tabs master node1 Node2 node >}} {{< TAB >}}

The configuration of the master
node.name: "es-master"
Copy the code

{{< /tab >}}

{{< tab >}}

Node1 configuration
node.name: "es-node1"
Copy the code

{{< /tab >}}

{{< tab >}}

2 the configuration of the
node.name: "es-node2"
Copy the code

{{< /tab >}}

{{< /tabs >}}

Final presentation

Assign permissions

Because you customize the data and log storage directory, you need to assign permissions to the directory

mkdir  -p /home/elasticsearch/{esdata,eslogs}
chown  elasticsearch:elasticsearch  /home/elasticsearch/*
mkdir  -p /home/backup/essnapshot
chown elasticsearch:elasticsearch /home/backup/essnapshot
Copy the code

Start the service

Start all three nodes and join the startup

systemctl start elasticsearch
systemctl enable  elasticsearch
Copy the code

Use Xpack for security authentication

Xpack’s security features

  • The TLS feature. The communication can be encrypted
  • Files and native Realm. Can be used to create and manage users
  • Role-based access control. Can be used to control user access to cluster apis and indexes
  • Multi-tenancy is also allowed in Kibana through security features for Kibana Spaces. During the configuration process, I found that the cluster authentication needs to be configured first. Otherwise, an error will be reported when creating the secret key for the built-in user. {{< notice Warning “error” >}} Cause: Cluster state has not been recovered yet, cannot write to the [null] index {{< /notice >}} “` shell Unexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password? pretty Cause: Cluster state has not been recovered yet, cannot write to the [null] index

Possible next steps:

  • Try running this tool again.
  • Try running with the — verbose parameter for additional messages.
  • Check the elasticsearch logs for additional error details.
  • Use the change password API manually.

ERROR: Failed to set password for user [apm_system].

#### Apply for certificate {{< notice warning "notice" >}} In one of the nodes can be {{< / notice >}} ` ` ` shell/usr/share/elasticsearch/bin/elasticsearch - certutil ca /usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12Copy the code

After the certificate is created, it is stored in the es data directory by default. Copy the certificate to etc and grant permissions.

[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*
/usr/share/elasticsearch/elastic-certificates.p12
/usr/share/elasticsearch/elastic-stack-ca.p12
cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*
Copy the code

When you’re done, copy the certificate to the other node

Add a password for the built-in account

ES has several built-in accounts for managing other integration components: APM_SYSTEM, Beats_system, Elastic, Kibana, LogSTash_System, and Remote_Monitoring_user.

/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user. You will be prompted to enter passwords as the process progresses. Please confirm that you would like to continue [y/N]y Enter password for [elastic]:  Reenter password for [elastic]: Enter password for [apm_system]: Reenter password for [apm_system]: Enter password for [kibana]: Reenter password for [kibana]: Enter password for [logstash_system]: Reenter password for [logstash_system]: Enter password for [beats_system]: Reenter password for [beats_system]: Enter password for [remote_monitoring_user]: Reenter password for [remote_monitoring_user]: Changed password for user [apm_system] Changed password for user [kibana] Changed password for user [logstash_system] Changed password for user [beats_system] Changed password for user [remote_monitoring_user] Changed password for user [elastic]Copy the code

The deployment of Curator

Download and install

Wget https://github.com/lmenezes/cerebro/releases/download/v0.9.2/cerebro-0.9.2-1.noarch.rpm RPM - the ivh Cerebro - 0.9.2-1. Noarch. RPMCopy the code

Modifying a Configuration File

Modify/etc/cerebro/application. The conf configuration file Find the corresponding configuration changes for the following {{< codes to modify a modified content 2 >}} {{}}

data.path: "/var/lib/cerebro/cerebro.db"
#data.path = "./cerebro.db"
Copy the code

{{}}

{{}}

hosts = [ #{ # host = "http://localhost:9200" # name = "Localhost cluster" # headers-whitelist = [ "x-proxy-user", "x-proxy-roles", [X] [X] [X] [X] [X] [X] [X] [X] [X] [X] [X] auth = { username = "elastic" password = "123" } } ]Copy the code

{{}}

{{}}

An error

{{< notice Warning “error” >}} Cerebro [8073]: * No Java Installation is interrupted. {{< /notice >}} An error occurs after the service is started, but my environment has Java. Also made global variables feel strange…

The solution

{{< /notice >}} Add JAVA_HOME to startup service file {{< /notice >}}

  • Type: log # Input type enable: true # Enable this type configure paths:

    • / home/homeconnect/logs/AspectLog/aspect, log # monitoring tomcat business journal json. Keys_under_root: Overwrite_keys: true # Override the default key and use a custom JSON-formatted key max_bytes: 20480 # Limit on the size of a single log, recommended limit (default: 10M,queue.mem. Events * max_bytes will occupy memory) fields: # Additional fields source: Test-prod -tomcat-aspect-a # custom source field for es index
  • Type: log # Input type enable: true # Enable this type configure paths:

    • / home/tools/apache tomcat – 8.5.23 / logs/localhost_access_log..log # Monitor tomcat Access logs

      Json. keys_under_root: true # Flase by default will also store jSON-parsed logs in the messages field

      Json. overwrite_keys: true # Override the default key and use a custom JSON-formatted key

      Max_bytes: 20480 # Limit on the size of a single log. Recommended limit (default: 10M,queue.mem.events
      Max_bytes will be part of the occupied memory.

      Fields: # Additional fields

      Source: test-prod-tomcat-access-a

Indexes for custom es require ilM to be set to false

setup.ilm.enabled: false

=============================== output ===============================

Output. Kafka: # output to kafka Enabled: true [” 10.0.11.172:9092 “, “10.0.21.117:9092”, “10.0.11.208:9092”] # kafka node list topic: ‘logstash-%{[field.source]}’ # kafka creates this topic, and then logstash(which can be filtered) is passed to es as the index name partition. Hash: reachable_only: Compression: gzip # compression max_message_bytes: 1000000 # Event Specifies the maximum number of bytes. The default of 1000000. Required_acks: 1 # Kafka ACK level worker: 1 # Maximum number of concurrent kafka outputs bulk_max_size: To_files: true # Output all logs to file. The default is true. When the log file size limit is reached, the log file will be replaced automatically

=============================== other ===============================

Close_older: 30m # If the file has not been updated within a certain period of time, close the monitor file handle. Default 1h force_CLOSE_files: false # This option closes a file when the file name changes. The recommended value is true only at Windows

How long does it take to close file handles after no new logs are collected? The default value is 5 minutes and the value is set to 1 minute to speed up file handle closing

close_inactive: 1m

This configuration item is the key point to solve the problems in the above cases

close_timeout: 3h

This configuration item should also be configured. The default value is 0 to indicate no cleanup, which means that the collected file description is never cleaned in The Registry file. After running for a period of time, Registry becomes larger and may cause problems

clean_inactive: 72h

Once clean_INACTIVE is set, ignore_OLDER needs to be set and ensure that ignore_OLDER < clean_inactive

ignore_older: 70h

' 'shell systemctl start filebeat.service systemctl enable filebeat.service systemctl status filebeat.serviceCopy the code

The deployment of logstash

Download and install

Wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.1-x86_64.rpm RPM - the ivh logstash - 7.10.1 - x86_64. RPM mv logstash.yml logstash.yml.bakCopy the code

Modifying a Configuration File

Modify the logstash. Yml

Vim logstash. Yml http.host: "0.0.0.0" # Specifies the size of the batch request sent to Elasticsearch. 3000 # refers to adjusting the delay of the Logstash pipeline, after which the Logstash pipeline starts to execute the filter and output pipeline.batch.delay: 200Copy the code

Modify the configuration file to get logs from Kafka

[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf input {# input kafka {# consume data from kafka "10.0.11.172 bootstrap_servers = > [: 9092,10.0. 21.117:9092,10.0. 11.208:9092"] codec = > "json" # # switchable viewer data format = > [" 3IN1-topi "] # use topicS_pattern => "logstash-.*" # use regular matching topic consumer_threads => 3 # Consume thread number Decorate_events => true # Add Kafka metadata to events, such as topic, message size options, This adds a field named kafka to the logstash event auto_offset_reset => "latest" # automatically resets the offset to the latest offset #group_id => "logstash-node" # consumption group ID, Multiple Logstash instances with the same group_ID are one consumer group #client_id => "logSTash1" #client ID fetch_MAX_wait_ms => "1000" # The maximum amount of time the server will block before answering a FETCH request when there is not enough data to immediately satisfy the FETch_MIN_bytes}} filter{# #if ([message] =~ "traceId=null") { # drop {} #} mutate {convert => ["Request time", "float"]} if [IP]! = "-" { geoip { source => "ip" target => "geoip" # database => "/usr/share/GeoIP/GeoIPCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "Float "]}}} output {# elasticSearch {# Logstash => [10.0.11.172: "9200", "10.0.21.117:9200", "10.0.11.208:9200"] index = > "logstash - % {[fields] [source]} - % {+ - dd YYYY - MM}" # #index => "%{[@metadata][topic]}-%{+ YYYY-MM-DD}" user => "elastic" password => "123"} #stdout {# codec => rubydebug #} }Copy the code

Test receive Log

Tests whether data can be received

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf
Copy the code

Below the logstash set to use systemd start modify/etc/systemd/system/logstash. Service file

[Unit]
Description=root

[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Restart=alway
WorkingDirectort=/
Nice=19
LimitNOFILE=16384

[Install]
WantedBy=multi-user.target
Copy the code

In the launcher/usr/share/logstash/bin/logstash lib. Add JAVA_HOME sh in file about 86 lines of the if [-z “$JAVACMD”]; JAVACMD=”/home/tools/jdk1.8.0_221/bin/ Java”

[root@elk-kibana ~]# cat -n /usr/share/logstash/bin/logstash.lib.sh |grep JAVACMD 85 # set the path to java into JAVACMD 86 JAVACMD="/home/tools/jdk1.8.0_221/bin/ Java "87 if [-z "$JAVACMD" ]; thenCopy the code

Start the service

systemctl reload logstash.service
systemctl restart  logstash.service
systemctl enable  logstash.service
Copy the code

Final inspection

Log in to Kibana to create the index

choosemanagementThe index modelCreate index schema





The inputThe index nameThe next step



choose[@timestamp](https://github.com/timestamp "@timestamp")Create index schema



Then you can see the log

The original article cnsre.cn/posts/21032…

This article uses the article synchronization assistant to synchronize