Flume Introduction

Apache Flume is a distributed, highly available data collection system. It collects data from different data sources and sends it to the storage system after aggregation. It is usually used to collect log data. Flume is available in NG and OG (prior to 1.0) versions. NG has been completely refactored from OG and is currently the most widely used version. The following introductions are based on NG.

Flume architecture and basic concepts

The following figure shows the basic architecture of Flume:

2.1 Basic Architecture

The external data source sends events to Flume in a specific format. When the Source receives events, it stores them in one or more channels. Channe stores events until they are consumed by Sink. The main function of Sink is to read events from the channel, store them in the external storage system or forward them to the next source, and remove events from the channel after success.

2.2 Basic Concepts

1. Event

Evnet is the basic unit of Flume NG data transmission. Similar to messages in JMS and messaging systems. An Evnet consists of a header, which is a key/value map, and a body, which is an arbitrary byte array.

2. Source

A data collection component that collects data from external data sources and stores it in a Channel.

3. Channel

A Channel is a Channel between a source and a sink that is used to temporarily store data. Can be an in-memory or persistent file system:

  • Memory Channel: The use of memory, the advantage is fast, but the data may be lost (such as sudden downtime);
  • File Channel: Using a persistent file system has the advantage of not losing data, but it is slow.

4. Sink

The main function of Sink is to read Evnet from the Channel, store it in the external storage system or forward it to the next Source, and remove the Event from the Channel after success.

5. Agent

Is a standalone (JVM) process that contains components such as Source, Channel, Sink, and so on.

2.3 Component Types

Each component in Flume provides a wealth of types suitable for different scenarios:

  • Source types: Built in dozens of types, such as Avro Source, Thrift Source, Kafka Source, JMS Source;

  • Sink types: HDFS Sink, Hive Sink, HBaseSinks, Avro Sink, etc.

  • Channel types: Memory Channel, JDBC Channel, Kafka Channel, File Channel, etc.

For Flume use, most requirements can be met by combining various types of built-in Source, Sink, and Channel, unless there are specific requirements. On the official website of Flume, the configuration parameters of all types of components are introduced in detail in the form of tables, and the configuration samples are attached. Parameters of different versions may vary slightly. Therefore, you are advised to use the User Guide of the corresponding version on the official website as the main reference.

Flume architecture mode

Flume supports multiple architecture modes, which are described as follows

3.1 multi – agent flow


Flume supports data transfer across multiple agents, which requires that the Sink of the previous Agent and the Source of the next Agent must be Avro. Sink refers to the host name (or IP address) and port of Source (see Case 3 below for details).

3.2 Consolidation


A large number of clients (such as distributed Web services) often collect logs. Flume allows multiple agents to collect logs, aggregate logs by one or more agents, and then store logs in a file system.

3.3 Multiplexing the flow

Flume supports transferring events from one Source to multiple channels, that is, to multiple sinks. This operation is called Fan Out. By default, Fan Out copies events to all channels, meaning that all channels receive the same data. At the same time Flume also supports a custom Source multiplexing selector to implement custom routing rules.

Flume configuration format

Flume configuration usually requires the following two steps:

  1. Define agents’ Sources, Sinks and Channels respectively, and then bind Sources and Sinks with Channels. Note that a Source can be configured with multiple channels, but a Sink can be configured with only one Channel. The basic format is as follows:
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
Copy the code
  1. Define the specific attributes of Source, Sink, and Channel respectively. The basic format is as follows:

<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
Copy the code

5. Flume installation and deployment

For the convenience of later reference, the installation of all software in this warehouse is separate. See Flume installation:

Flume installation and deployment in Linux

Six, Flume use cases

Flume usage examples:

  • Case 1: Use Flume to monitor file content changes and output the newly added content to the console.
  • Case 2: Use Flume to listen to the specified directory and store the newly added files in the directory to the HDFS.
  • Case 3: Use Avro to send log data collected by this server to another server.

6.1 a case

Requirement: Listen for changes in file contents and output new additions to the console.

Implementation: mainly use Exec Source with tail command implementation.

1. The configuration

Create an exec-memory-logger.properties configuration file with the following contents:

Describe sinks and describe sinks in detail. # Specify agent's sources,sinks,channels a1. Sources = s1 -f/TMP /log.txt a1.sources.s1.shell = /bin/bash -c Channels = C1 # configure sink A1. Sediment.k1. Type = Logger # bind sinks with channels  a1.channels.c1.type = memoryCopy the code

2. Start

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console
Copy the code

3. The test

Append data to a file:

Console display:

6.2 second case

Requirement: Listen to the specified directory and store the newly added files in the directory to the HDFS.

Implementation: Use Spooling Directory Source and HDFS Sink.

1. The configuration

Describe sinks and describe sinks in detail. # Specify agent's sources,sinks,channels a1. Sources = s1  a1.sources.s1.spoolDir =/tmp/logs a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = fileName Channels = C1 # Configure sink A1.sediment.k1. type = HDFS a1.sediment.k1.hdfs. Path = / the flume/events / % y - % m % % d/H/a1 sinks. K1. HDFS. FilePrefix = % {fileName} # generated file type, default is Sequencefile, available DataStream, Compared to plain text a1. Sinks. K1. HDFS. FileType = DataStream a1. Sinks. The k1. HDFS. UseLocalTimeStamp = true # will be binding sinks and channels Channels. C1. Type = memoryCopy the code

2. Start

flume-ng agent \ --conf conf \ --conf-file The/usr/app/apache - the flume - 1.6.0 - cdh5.15.2 - bin/examples/spooling - the memory - HDFS. Properties \ - name a1 -Dflume.root.logger=INFO,consoleCopy the code

3. The test

Copy any file to the listening directory, you can view the file path to HDFS from the log:

# cp log.txt logs/
Copy the code

Check whether the file content uploaded to the HDFS is the same as the local file:

# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
Copy the code

6.3 case three

Requirement: Send data collected by this server to another server.

Implementation: Using Avro Sources and Avro Sink.

1. Configure Flume for collecting logs

New configuration netcat-memory-avro.properties, listen for changes in file contents, and send the new file contents to port 8888 of hadoOP001 via Avro sink:

Describe sinks and describe sinks in detail. # Specify agent's sources,sinks,channels a1. Sources = s1 -f/TMP /log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 # a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop001 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 Channels. Type = memory CHANNELS. C1. Capacity = 1000 a1.channels.c1.transactionCapacity = 100Copy the code

2. Configure Flume for log aggregation

Use avro source to listen on port 8888 of hadoOP001 server and output the content to the console:

Describe sinks and describe sinks in detail. # Specify agent's sources,sinks,channels a2.sources = s2 a2.channels = c2 Sources.s2. port = 8888 # Bind sources to channels a2.sources.s2.channels = c2 # 22. channel = c2 # configure channel type a2. Channels. C2. type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100Copy the code

3. Start

Flume:

flume-ng agent \ --conf conf \ --conf-file / usr/app/apache - the flume - 1.6.0 cdh5.15.2 - bin/examples/avro - the memory - logger. The properties \ - name a2 -Dflume.root.logger=INFO,consoleCopy the code

Flume:

flume-ng agent \ --conf conf \ --conf-file The/usr/app/apache - the flume - 1.6.0 - cdh5.15.2 - bin/examples/netcat - the memory - avro. Properties \ - name a1 -Dflume.root.logger=INFO,consoleCopy the code

The reason is that avro.source will be bound with the port first, so that the connection failure will not be reported when Avro Sink is connected. But it doesn’t matter if it starts out of sequence, sink will keep trying again until the connection is established.

4. Test

Add content to TMP /log.txt:

You can see that the content has been listened on port 8888 and successfully printed to the console:

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series