Log processing

This Demo is about how to build a log processing system based on Elasticsearch. It uses data synchronization tools and other open source components to create a log processing system.

Log demonstration website: http://es.52itstyle.com

Regional demonstration site: http://es.52itstyle.com/area/index

Of course, the project functions will be gradually increased to achieve a full 365° Demo case.

23456.png

The development environment

Maven, Eclipse, SpringBoot1.5.9, elasticsearch2.4.6, Dubbox2.8.4, zookeeper3.4.6, Redis, kafka, Vue, Iview

Version is introduced

Elasticsearch -2.4.6 spring-boot-starter-parent-1.5.9.RELEASE, spring-data-elasticSearch -2.1.9.RELEAS, elasticSearch -2.4.6

As of January 22, 2018, ElasticSearch is currently at 6.1.2, but the spring-boot update is not as fast as ElasticSearch. Elasticsearch -2.4.6 is the latest version of Spring-Boot.

Reference: https://github.com/spring-projects/spring-data-elasticsearch/wiki/Spring-Data-Elasticsearch—Spring-Boot—version-matr ix

ESD.png

access

With Spring-data-ElasticSearch in spring-Boot, there are two built-in client access options

1. Node Client: If local:false is set in the configuration file, the node client joins the cluster as a node-master or Node-client. In other words, it does not store any data itself, but it knows where the data is in the cluster and can directly forward requests to the corresponding node.

2. Transport Client: Set to local:true in the configuration file, this lighter Transport client can send requests to remote clusters. It does not join the cluster itself and simply forwards requests to nodes in the cluster. Both Java clients interact with the cluster through port 9300 using the Elasticsearch Transport Protocol. The nodes in the cluster also communicate with each other through port 9300. If this port is not open, your nodes will not cluster.

Service description

Using local ElasticSearch service (application-dev.properties)
spring.data.elasticsearch.cluster-name=elasticsearch
Prot = prot; prot = prot; prot = prot; prot = prot; You can configure multiple clusters separated by commas (,).
Java client: Interacts with the cluster through port 9300
All other programming languages: You can use RESTful apis to communicate with Elasticsearch through port 9200.
. # spring. Data. Elasticsearch cluster - nodes = 192.168.1.180:9300
Copy the code
Using remote ElasticSearch service (application-dev.properties)
  • Install ElasticSearch by yourself. Ensure that the version of ElasticSearch is consistent with the JAR package.

  • Download address: https://www.elastic.co/downloads/past-releases/elasticsearch-2-4-6

  • Installation instructions: http://www.52itstyle.com/thread-20114-1-1.html

  • You are not advised to use the root user to boot ElasticSearch. You can also use the following command to start elasticSearch -des.insecure. Allow. root=true -d Or add ES_JAVA_OPTS=” -des.insecure. Allow.root =true” to elasticSearch.

The project structure

├ ─ SRC │ ├ ─ the main │ │ ├ ─ Java │ │ │ └ ─ com │ │ │ └ ─ itstyle │ │ │ └ ─ es │ │ │ │ Application. Java │ │ │ │ │ │ │ ├ ─ common │ │ │ │ ├ ─ constant │ │ │ │ │ PageConstant. Java │ │ │ │ │ │ │ │ │ └ ─ interceptor │ │ │ │ MyAdapter. Java │ │ │ │ │ │ │ └ ─log│ │ │ ├ ─ controller │ │ │ │ LogController. Java │ │ │ │ │ │ │ ├ ─ the entity │ │ │ │ Pages. The Java │ │ │ │ SysLogs. Java │ │ │ │ │ │ │ ├ ─ repository │ │ │ │ ElasticLogRepository. Java │ │ │ │ │ │ │ └ ─ service │ │ │ │ LogService. Java │ │ │ │ │ │ │ └ ─ impl │ │ │ ├─ Resources │ │ ├─ application-dev. Properties │ │ ├─ application-prod.properties │ │ ├─ application-dev. Properties │ │ ├─ application-dev │ │ ├─ ├─static │ │ │ ├─ ─ application │ properties │ │ │ ├─ ─ application │ yml │ │ ├─ ├─static │ │ ├─ ─ Application │ │ ├ ─ iview │ │ │ │ │ │ iview. The CSS │ │ │ │ │ │ iview. Min. Js │ │ │ │ │ │ │ │ │ │ │ └ ─ fonts │ │ │ │ │ ionicons. The eot │ │ │ │ │ Ionicons. SVG │ │ │ │ │ ionicons. The vera.ttf │ │ │ │ │ ionicons. Woff │ │ │ │ │ │ │ │ │ ├ ─ jquery │ │ │ │ │ jquery - 3.2.1. Min. Js │ │ │ │ │ │ │ │ │ └ ─ vue │ │ │ │ vue. Min. Js │ │ │ │ │ │ │ └ ─ templates │ │ │ └ ─log│ │ │ index. The HTML │ │ │ │ │ └ ─ webapp │ │ │ index. The JSP │ │ │ │ │ └ ─ WEB - INF │ │ WEB. XML │ │ │ └ ─test│ ├ ─ ├ ─ Java │ ├ ─ Java │ ├ ─ Java │ ├ ─test│ Logs. Java │Copy the code

Project presentations

The demo website: http://es.52itstyle.com

Project screenshots

ES_index.png

Paging query

Insert 200,000 pieces of data using ElasticsearchTemplate template, local extranet server (1 core 1G), 60s+, about a minute. Although the index base capacity increased, it took about 10 minutes to search.

Select * from config where Result window is too large; select * from config where Result window is too large;

# Define your own quantity
index.max_result_window : '10000000'
Copy the code

Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html

Java API

Elasticsearch provides two built-in clients for Java users:

  • Node client:

The node client, as the name suggests, is itself part of the Elasticsearch cluster. It joins the cluster as a None data node. In other words, it does not store any data itself, but it knows where the data is in the cluster and can forward requests directly to the corresponding node.

  • Transport Client:

This lighter transport client can send requests to remote clusters. It does not join the cluster itself and simply forwards requests to nodes in the cluster. Both Java clients interact with the cluster through port 9300 using the Elasticsearch Transport Protocol. The nodes in the cluster also communicate with each other through port 9300. If this port is not open, your nodes will not cluster.

Install the Elasticsearch – Head

Elasticsearch-head is an interface for performing foolproof operations on a cluster. You can integrate it into ES via plug-ins (preferred) or install it as a standalone WebApp.

Es-head has three main operations:

  • Displays the topology of the cluster and is able to perform index and node-level operations
  • The search interface enables you to query the retrieved data in the cluster in raw JSON or table format
  • Quickly access and display the status of the cluster

Plug-in installation, reference: https://github.com/mobz/elasticsearch-head

  • for Elasticsearch 5.x: site plugins are not supported. Run as a standalone server
  • for Elasticsearch 2.x: sudo elasticsearch/bin/plugin install mobz/elasticsearch-head
  • for Elasticsearch 1.x: sudo elasticsearch/bin/plugin -install mobz/elasticsearch-head/1.x
  • For Elasticsearch 0.x: sudo Elasticsearch /bin/ plugin-install mobz/ Elasticsearch -head/0.9

After a successful installation, a head directory appears under the plugins directory, indicating that the installation was successful.

View screenshots:

ES_head.png

X – pack monitoring

Elasticsearch and Logstash jumped from 2.4 to 5.0 with the naming upgrade of Kibana. The 5. X ELK has relatively high requirements for version matching, so it no longer supports the mash-up of 5. The original Marvel, Watch, Alert to do a package, the formation of x-pack.

Installation: https://www.elastic.co/guide/en/elasticsearch/reference/6.1/installing-xpack-es.html

User management

After x-Pack is installed, there is a super user Named Elastic, whose default password is Changeme, who has control over all indexes and data and can be used to create and modify other users, as well as manage users and user groups through the Kibana Web interface.

Change the password of an Elastic user:

curl -XPUT -u elastic 'localhost:9200/_xpack/security/user/elastic/_password' -d '{ "password" : "123456" }'
Copy the code

IK Analysis for Elasticsearch

Download and install:

  • Download pre-build package from here: https://github.com/medcl/elasticsearch-analysis-ik/releases unzip plugin to folder your-es-root/plugins/

  • Use elasticSearch -plugin to install (version > v5.5.1) ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip

Because Elasticsearch version is 2.4.6, select IK version 1.10.6

Wget HTTP: / / https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v1.10.6/elasticsearch-analysis-ik-1.10.6.zipCopy the code

For Elasticsearch, add the following code to the elasticSearch. yml file.

index:  
      analysis:                     
        analyzer:        
          ik:  
              alias: [ik_analyzer]  
              type: org.elasticsearch.index.analysis.IkAnalyzerProvider  
          ik_max_word:  
              type: ik  
              use_smart: false  
          ik_smart:  
              type: ik  
              use_smart: true
Copy the code

or

Index. Analysis. Analyzer. Ik. Type: "ik"Copy the code

Before the installation:

http://192.168.1.180:9200/_analyze?analyzer=standard&pretty=true&text= I love you China {"tokens": [{"token" : "我"."start_offset": 0."end_offset" : 1,
    "type" : "<IDEOGRAPHIC>"."position": 0}, {"token" : "Love"."start_offset" : 1,
    "end_offset": 2."type" : "<IDEOGRAPHIC>"."position": 1}, {"token" : "You"."start_offset": 2."end_offset": 3."type" : "<IDEOGRAPHIC>"."position": 2}, {"token" : "In"."start_offset": 3."end_offset": 4."type" : "<IDEOGRAPHIC>"."position": 3}, {"token" : "The"."start_offset": 4."end_offset" : 5,
    "type" : "<IDEOGRAPHIC>"."position": 4}]}Copy the code

After the installation:

http://121.42.155.213:9200/_analyze?analyzer=ik&pretty=true&text= I love you China {"tokens": [{"token" : "I love you."."start_offset": 0."end_offset": 3."type" : "CN_WORD"."position": 0}, {"token" : "Love you"."start_offset" : 1,
    "end_offset": 3."type" : "CN_WORD"."position": 1}, {"token" : "China"."start_offset": 3."end_offset" : 5,
    "type" : "CN_WORD"."position": 2}]}Copy the code

Data synchronization

Elasticsearch is synchronized from MySql to ElasticSearch using elasticSearch JDBC.

elasticsearch-jdbc.png

Operating environment:

Centos7.5, JDK8, elasticsearch-jdbc-2.3.2.0

Installation steps:

  • Here is the first step of the list text: Download (may be very slow, please be patient) wget http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.2.0/elasticsearch-jdbc-2.3.2.0-dist.zi p
  • Unzip ElasticSearch-JdbC-2.3.2.0-dist. Zip
  • Here is the list text step 3: configure the script mysql_import_es.sh
#! /bin/sh
# elasticSearch - JDBC installation pathBin = / home/elasticsearch - JDBC - 2.3.2.0 / bin lib = / home/elasticsearch - JDBC - 2.3.2.0 / libecho '{ "type" : "jdbc", "jdbc": {# if the database exist in the Json file Here is set to false, otherwise it will sync error "detect_json" : false, "url" : "JDBC: mysql: / / 127.0.0.1:3306 / itstyle_log?? useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true", "user":"root", "password":"root", Select * from id; select * from id; If I want Id to be the primary key, The id can be set to _id "SQL" : "SELECT id AS _id, id, user_id AS userId, username, operation, time, method, params, IP, device_type AS deviceType,log_type AS logType,exception_detail AS exceptionDetail, gmt_create AS gmtCreate,plat_from AS platFrom FROM sys_log", "elasticsearch" : { "host" : "Port" : "9300"}, "index" : "Elasticsearch ",# index name equivalent to library "type" : "sysLog" | java \
       -cp "${lib}/ *" \
       -Dlog4j.configurationFile=${bin}/log4j2.xml \
       org.xbib.tools.Runner \
       org.xbib.tools.JDBCImporter

Copy the code
  • This is part four of the list text: Authorization and Execution
chmod +x mysql_import_es.sh
./mysql_import_es.sh
Copy the code

ElasticSearchRepository and ElasticSearchTemplate

Elasticsearch is the Spring data layer for elasticSearch. It encapsulates a large number of basic operations, and allows you to easily operate elasticSearch data.

Basic use of ElasticSearchRepository

/**
 * @param <T>
 * @param <ID>
 * @author Rizwan Idrees
 * @author Mohsin Husen
 */
@NoRepositoryBean
public interface ElasticsearchRepository<T, ID extends Serializable> extends ElasticsearchCrudRepository<T, ID> {

	<S extends T> S index(S entity);

	Iterable<T> search(QueryBuilder query);

	Page<T> search(QueryBuilder query, Pageable pageable);

	Page<T> search(SearchQuery searchQuery);

	Page<T> searchSimilar(T entity, String[] fields, Pageable pageable);

	void refresh();

	Class<T> getEntityClass();
}
Copy the code

ElasticsearchRepository has several special search methods, which are unique to ES and different from normal JPA. They are used to build some ES queries. The QueryBuilder and SearchQuery parameters are used to build specific queries.

1.png

In general, instead of directly using New Creative SearchQuery, we use Creative SearchQueryBuilder. Through NativeSearchQueryBuilder. WithQuery (QueryBuilder1). The withFilter (QueryBuilder2). WithSort (SortBuilder1). WithXXXX (). The build (); This is the way to complete the build of Creative SearchQuery.

2.png
3.png

The use of ElasticSearchTemplate

ElasticSearchTemplate is more of a complement to ESRepository and provides some lower-level methods.

Here, we mainly realize the function of fast read batch insertion, which can insert 200,000 pieces of data into the local Internet server (1 core 1G), and it takes 60s+, about one minute. Although the index base capacity increased, it took about 10 minutes to search.

Public void bulkIndex(List<SysLogs>)logList) {  
	long start = System.currentTimeMillis();
    int counter = 0;  
    try {  
        List<IndexQuery> queries = new ArrayList<>();  
        for (SysLogs log : logList) {  
            IndexQuery indexQuery = new IndexQuery();  
            indexQuery.setId(log.getId()+ "");  
            indexQuery.setObject(log);  
            indexQuery.setIndexName("elasticsearch");  
            indexQuery.setType("sysLog"); IndexQuery Index = new IndexQueryBuilder().withid (person.getid () +"").withObject(person).build();  
            queries.add(indexQuery);  
            if (counter % 1000 == 0) {  
            	elasticSearchTemplate.bulkIndex(queries);  
                queries.clear();  
                System.out.println("bulkIndex counter : " + counter);  
            }  
            counter++;  
        }  
        if (queries.size() > 0) {  
        	elasticSearchTemplate.bulkIndex(queries);  
        }
        long end = System.currentTimeMillis();
        System.out.println("bulkIndex completed use time:"+ (end-start));  
        
    } catch (Exception e) {  
        System.out.println("IndexerService.bulkIndex e;"+ e.getMessage()); throw e; }}Copy the code

Redis log queue

See package: com.itstyle.es.com mon. Redis

RedisListener:

@Component
public class RedisListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
	@Bean
	RedisMessageListenerContainer container(
			RedisConnectionFactory connectionFactory,
			MessageListenerAdapter listenerAdapter) {
		LOGGER.info("Start listening"); 
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		container.addMessageListener(listenerAdapter, new PatternTopic("itstyle_log"));
		return container;
	}

	@Bean
	MessageListenerAdapter listenerAdapter(Receiver receiver) {
		return new MessageListenerAdapter(receiver, "receiveMessage");
	}

	@Bean
	Receiver receiver(CountDownLatch latch) {
		return new Receiver(latch);
	}

	@Bean
	CountDownLatch latch() {
		return new CountDownLatch(1);
	}

	@Bean
	StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
		returnnew StringRedisTemplate(connectionFactory); }}Copy the code

Log Receiver:

public class Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    @Autowired
	private  ElasticLogRepository elasticLogRepository;
    private CountDownLatch latch;

    @Autowired
    public Receiver(CountDownLatch latch) {
        this.latch = latch;
    }

    public void receiveMessage(String message) {
        LOGGER.info("Receive log message <{}>",message);
        if(message == null){
            LOGGER.info("Receive log message <" + null + ">");
        }else {
        	ObjectMapper mapper = new ObjectMapper();  
			try {
				SysLogs log = mapper.readValue(message, SysLogs.class);
				elasticLogRepository.save(log);
				LOGGER.info("Receive log message content <{}>",log.getOperation()); } catch (JsonParseException e) { e.printStackTrace(); } catch (JsonMappingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } latch.countDown(); }}Copy the code

Test LogController: http://lip:port/redisLog

Kafka log queue

See package: com.itstyle.es.com mon. Kafka

Code cloud download: SpringBoot development case building a distributed log processing system