Welcome toTencent Cloud + community, get more Tencent mass technology practice dry goods oh ~

This post was posted on cloud + Community by Michelmu

Elasticsearch is a mainstream full-text search engine. In addition to its powerful full-text search capability and high scalability, its compatibility with multiple data sources is another key to its success. Elasticsearch’s powerful compatibility with data sources comes from one of its core components, Logstash, which provides input and output to various data sources through plug-ins. Kafka is a high-throughput distributed publish-subscribe messaging system. It is a common data source and one of many input and output sources supported by Logstash. The Logstash Kafka Input plugin is used to import Kafka data into Elasticsearch.

1 Introduction to the Logstash Kafka Input plug-in

The Logstash Kafka Input plugin uses the Kafka API to read data from Kafka topics. When using this plugin, pay attention to whether the version of Kafka is the same as that of the plugin. This plugin supports connecting Kafka over SSL and Kerveros SASL. In addition, the plugin provides group management and uses the default offset management strategy to operate Kafka Topics.

By default, a single group is used to subscribe to The Logstash Kafka messages, and each Logstash Kafka Consumer uses multiple threads to increase throughput. Of course, multiple Logstash instances can use the same group_ID to balance the load. It is also recommended to set the number of consumers to the size of the Kafka partition to provide better performance.

2 Preparing the test environment

2.1 Creating an Elasticsearch Cluster

In order to simplify the construction process, Tencent Cloud Elasticsearch Service is used in this paper. The Tencent Cloud Elasticsearch Service not only allows you to quickly build Elasticsearch clusters, but also provides built-in Features such as Kibana, cluster monitoring, dedicated master node, and Ik word segmentation plug-in, which greatly simplifies the creation and management of Elasticsearch clusters.

2.2 Creating the Kafka Service

The establishment of Kafka service adopts Tencent cloud CKafka to complete. Kafka is 100% compatible with the Open source Kafka API(version 0.9).

2.3 server

In addition to preparing Elasticsearch and Kafka, you also need to prepare a server that runs Logstash to connect Elasticsearch and Kafka. Tencent Cloud CVM server is used in this paper

2.4 Precautions

  1. Elasticsearch, Kafka, and the server must be created on the same network. This document uses Tencent cloud-related technical services, so you only need to create Elasticsearch Service, CKafka, and CVM in the same private network (VPC).

  2. You have obtained the Intranet addresses and ports of Elasticsearch Serivce, CKafka, and CVM for use by subsequent services

In this test:

service ip port
Elasticsearch service 192.168.0.8 9200
Ckafka 192.168.13.10 9092
CVM 192.168.0.13

3 Connect Elasticsearch and Kafka using Logstash

3.1 Kafka preparation

You can refer to [CKafka introduction]

Follow the tutorial above

  1. Create a topic named kafka_ES_test

  2. Install the JDK

  3. Install the Kafka toolkit

  4. Create producer and Consumer validation kafka

3.2 installation Logstash

The installation and use of Logstash can be found in the article “How to get started with Logstash”.

3.3 Configuring the Logstash Kafka Input plug-in

Create the kafka_test_pipeline.conf file as follows:

input{
        kafka{
                bootstrap_servers=>"192.168.13.10:9092"
                topics=>["kafka_es_test"]
                group_id=>"logstash_kafka_test"
        }
}
output{
        elasticsearch{
                hosts=>["192.168.0.8:9200"]}}Copy the code

A Kafka input and an ElasticSearch output are defined

These three parameters are mandatory for Kafka Input. There are also some parameters that can be used to adjust the behavior of Kafka input.

Auto_commit_interval_ms Sets the interval at which the Consumer submits offset to Kafka

Consumer_threads sets the number of Consumer threads. The default value is 1, which is the same as the number of Kafka Topic partitions

Fetch_max_wait_ms specifies the maximum amount of time a Consumer should wait for a fetch request to reach fetch_min_bytes

Fetch_min_bytes Specifies the minimum amount of data that a Consumer FETCH request should return

Topics_pattern is used to subscribe to a set of topics that conform to a rule through regular subscriptions

[Kafka Input Configuration Options]

3.4 start the Logstash

The following is done in the Logstash root directory

  1. Verify the configuration
./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit
Copy the code

If any error occurs, modify the configuration file as prompted. If the configuration is correct, the following result is displayed

Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties [the 2018-11-11 T15:24:01, 598] [INFO] [logstash. Modules. Scaffold] the Initializing module {: module_name = > "netflow." : directory = > "/ root/logstash - 5.6.13 / modules/netflow/configuration"} [the 2018-11-11 T15:24:01, 603] [INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", : directory = > "/ root/logstash - 5.6.13 / modules/fb_apache/configuration"} configuration OK [the 2018-11-11 T15:24:01, 746] [INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting LogstashCopy the code
  1. Start the Logstash
./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic
Copy the code

Check whether any error occurs in logs and rectify the fault in a timely manner

3.4 Starting Kafka Producer

The following operations are performed in the root directory of the Kafka toolkit

./bin/kafka-console-producer.sh --broker-list 192.16813.10.:9092 --topic kafka_es_test
Copy the code

Write test data

This is a message
Copy the code

3.5 Kibana verification results

Log in to Kibana for Elasticsearch and perform the following operations on Dev Tools

  1. View index
GET _cat/indices
Copy the code

You can see that an index named logstash-xxx.xx.xx is successfully created

green open .kibana             QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
green open logstash2018.1111. DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb
Copy the code
  1. View the written data
GET logstash2018.1111./_search
Copy the code

You can see that the data has been written successfully

{
  "took": 0."timed_out": false."_shards": {
    "total": 5."successful": 5."skipped": 0."failed": 0
  },
  "hits": {
    "total": 1."max_score": 1."hits": [{"_index": "Logstash - 2018.11.11"."_type": "logs"."_id": "AWcBsEegMu-Dkjm1ap3H"."_score": 1."_source": {
          "message": "This is a message"."@version": "1"."@timestamp": "The 2018-11-11 T07:33:09. 079 z"}}]}}Copy the code

4 summarizes

As the core component of data collection and processing in Elastic Stack, Logstash provides powerful data source compatibility for Elasticsearch. As can be seen from the test process, using Logstash to implement the connection process between Kafka and Elaticsearch is quite simple and convenient. In addition, the Logstash data processing function also makes the system using this architecture have natural advantages in data mapping and processing.

Elasticsearch is not the only solution to Kafka and Elasticsearch using Logstash. Another common solution is to use Kafka Connect, see “When Elasticsearch meets Kafka–Kafka Connect”

Machine learning in action! Quick introduction to online advertising business and CTR knowledge

This article has been authorized by the author to Tencent Cloud + community, more original text pleaseClick on the

Search concern public number “cloud plus community”, the first time to obtain technical dry goods, after concern reply 1024 send you a technical course gift package!

Massive technical practice experience, all in the cloud plus community!