Article by Foochane

Original link: foochane.cn/article/201…

Flume Log Collection framework Install and deploy the Flume running mechanism. Collect static files to HDFS. Collect dynamic log files to HDFS

Flume log collection framework

In a complete offline big data processing system, in addition to HDFS, MapReduce and Hive, which constitute the core of the analysis system, it also needs indispensable auxiliary systems such as data collection, result data export and task scheduling, and these auxiliary tools have convenient open source frameworks in the Hadoop ecosystem, as shown in the figure:

Introduction to the Flume

Flume is a distributed, reliable, and highly available system for collecting, aggregating, and transferring massive logs. Flume can collect files, socket packets, files, folders, kafka and other forms of source data, and output the collected data (sink) to many external storage systems such as HDFS, hbase, Hive, and Kafka.

General collection requirements can be realized through simple configuration of Flume.

Flume can be customized for special scenarios. Therefore, Flume can be used in most daily data collection scenarios.

2 Flume operating mechanism

The core role of Flume distributed system is agent. Flume acquisition system is formed by connecting each agent, and each agent is equivalent to a data transfer agent, with three internal components:

  • Source: Collection component used for interconnecting with data sources to obtain data
  • Sink: sink assembly for going down one levelagentPass data or pass data to the final storage system
  • Channel: Transport channel component used by thesourcePass the data tosink

Data collected by single agent:

Series between multi-level agents:

3 Flume Installation and Deployment

Procedure 1 Download the installation package apache-Flume-1.9.0-bin.tar. gz and decompress it

2 Add JAVA_HOME to flume-env.sh in the conf folder

Export JAVA_HOME = / usr/local/bigdata/Java/jdk1.8.0 _211Copy the code

3 Add a collection solution configuration file based on collection requirements. The file name can be any

See the following example for details

4 start the flume

Test environment:

$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

Copy the code

Command description:

  • -c: Specifies the directory of the flume configuration file
  • -f: Specify your own configuration file, here ask under the current folderdir-hdfs.conf
  • -n: Specify which to use in your own configuration fileagent, the name defined in the corresponding configuration file.
  • -Dflume.root.logger: Prints logs on the console of typeINFO, this is for testing only and will be printed to the log file later

Flume should be started in the background:

nohup bin/flume-ng  agent  -c  ./conf  -f ./dir-hdfs.conf -n  agent1 1>/dev/null 2>&1 &

Copy the code

4 Collect static files and upload them to the HDFS

4.1 Collection Requirements

New files are constantly generated in a specific directory on a server. When new files are generated, the files need to be collected to the HDFS

4.2 Adding a Configuration File

Conf file in the installation directory, and then add the configuration information.

Obtain the Agent and name it agent1. The following configurations are followed by agent1. You can also change the value to other values, such as agt1.

Based on the requirements, the following three elements are defined

Data source component

Source — Monitor file directory: spooldir Spooldir has the following features:

  • Monitor a directory and collect the contents of new files whenever they appear in the directory
  • After collecting files, the Agent will automatically add a suffix:COMPLETED(Modifiable)
  • Files with the same file name cannot be repeated in the monitored directory
Sinking component

Sink: HDFS file system: HDFS sink

Modality components

Channel — you can use file channel or memory channel

Define the names of the three components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

Configure the source component
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/log/
agent1.sources.source1.fileSuffix=.FINISHED
If a file exceeds this length, the file will be automatically cut off, resulting in data loss
agent1.sources.source1.deserializer.maxLineLength=5120

Configure the sink componentagent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = app_log agent1.sinks.sink1.hdfs.fileSuffix = .log agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat  =Text# roll: Controls the switching rule for writing files
Cut by file size (bytes)
agent1.sinks.sink1.hdfs.rollSize = 512000
## Cut by event number
agent1.sinks.sink1.hdfs.rollCount = 1000000
Switch files by time interval
agent1.sinks.sink1.hdfs.rollInterval = 60

## Control the rules for generating directories
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Channel component configuration
agent1.channels.channel1.type = memory
Article # # event number
agent1.channels.channel1.capacity = 500000
Flume transaction control requires a cache capacity of 600 events
agent1.channels.channel1.transactionCapacity = 600

Bind the connection between source, channel, and sink
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

Copy the code

Channel parameter description:

  • capacity: The largest that can be stored in the channel by defaulteventThe number of
  • trasactionCapacity: Each time the maximum value can be fromsourceTo get or sendsinkIn theeventThe number of
  • keep-alive:eventAllowed time to add to or remove from a channel

4.3 start the flume

$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console

Copy the code

5 Collect dynamic log files and upload them to the HDFS

5.1 Collection Requirements

For example, service systems use Log4j to generate logs. As the log content increases, data added to the log file needs to be collected to the HDFS in real time

5.2 Configuration File

Configuration file name: tail-hdfs.conf Define the following three elements based on requirements:

  • The collection source, i.esource— Monitoring file content update:exec tail -F file
  • The sinking target, i.eSink - HDFSFile system: HDFS Sink
  • SourceandsinkThe transfer channel betweenchannelThat can be usedfile channelYou can also use memorychannel

Configuration file contents:


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log

# Describe the sinkagent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = weichat_log agent1.sinks.sink1.hdfs.fileSuffix = .dat agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat  =Text agent1.sinks.sink1.hdfs.rollSize = 100 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 1 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code

5.3 start the flume

Start command:

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

Copy the code

6 Two agents are cascaded

Another node can be configured with an Avro source to relay data to external storage

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/log/access.log


# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdp-05
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2



# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code

Receives data from avro port and sinks to HDFS

Collect the configuration file, avro-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
The Avro component in ## Source is a receiver service
a1.sources.r1.type = avro
a1.sources.r1.bind = hdp-05
a1.sources.r1.port = 4141


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = tail-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 24
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 50
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
The generated file type is Sequencefile by default and DataStream is available
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code