One, foreword

Flume has been playing the role of a stable and reliable “porter” of log data in the great big data business. This paper mainly describes the application practice of Youzan Big data department in Flume, and also intersperses some of our understanding of Flume.

Ii. Delivery Guarantee

It is important to understand Flume’s reliability assurance for event delivery, and it is often one of the determinants of whether or not we use Flume to solve problems.

There are three reliable guarantees for message delivery:

  • At-least-once
  • At-most-once
  • Exactly-once

Almost all tool users expect the tool framework to ensure that messages are Exactly once, so that they do not have to consider message loss or repeated processing scenarios in their design implementation. But there are very few tools and frameworks that can do this, and the cost is often too high or the extra impact is not worth it. If Flume really achieves Exactly-once, it will inevitably reduce stability and throughput, so Flume chooses at-least -once.

Of course, at-least-once is quoted. It does not mean that any component of Flume can be used to form an instance and save messages during running. In fact, the at-least-once principle only refers to the guarantee of sending messages up and down between Source, Channel and Sink. When you select MemoryChannel, if the instance hangs abnormally and then restarts, residual data in the channel that is not consumed by sink will be lost, thus the at-least-once of the whole link cannot be guaranteed.

Flume At-least-once guarantee is based on the establishment of its own Transaction mechanism. Flume transactions have four lifecycle functions: start, COMMIT, ROLLBACK, and close. When Source sends events in batches to Channel, first call start to start the transaction. After putting events in batches, commit the transaction. If commit is abnormal, rollback, and close the transaction. Finally, the Source submits the batch of message events just committed to the Source service ACK (such as kafka submits a new offset). The Sink consumption Channel is in the same mode, with the only difference being that Sink needs to commit the transaction after writing to the target source. The same approach for both components is to ack upstream only if a message has been successfully delivered downstream, ensuring at-least-once delivery down.

3. Datay application scenarios

Data warehouse incremental synchronization (Datay service) based on mysql binlog is a classic application scenario of Flume in big data. Datay service is not detailed. It should be emphasized that Flume has a requirement to ensure that binlog messages in NSQ (message queue) can be reliably dropped to HDFS, and no message is lost. Absolute at-least-once is required.

The Flume model itself is based on the at-least-once principle to transmit events, so it is necessary to consider the at-least-once guarantee under various abnormal conditions (such as abnormal process hanging). Obviously a MemoryChannel can’t be satisfied, so we’ll use a FlieChannel instead. Because the company currently uses NSQ as the message forwarding service of binlog, we cannot use the existing KafkaSource, so we extended NsqSource based on the company’s NSQ SDK. In order to ensure at-least-once, the Source must support ack mechanisms for receiving messages. For example, the Kafka client submits offset only after it considers that the message has been consumed. Otherwise, it needs to accept duplicate messages.

So we went live with our first release, and it looked pretty secure that we wouldn’t lose data if the process hung up and restarted unexpectedly.

A key question for some of you: If the disk fails and the process exits unexpectedly, and the FileChannel happens to have unconsumed event data, isn’t that data lost? While disk failure is an extremely low probability, it is certainly something to consider.

One of Flume’s existing components that is more reliable than FlieChannel might come to mind is KafkaChannel, which can keep multiple copies of messages, increasing data reliability. But our second version of the solution didn’t choose it, and instead extended NsqChannel directly. So there’s a second version.

Source
Channel
Sink
NsqChannel
HDFSEventSink

  • The delivery of each message requires only one transaction, rather than two, for better performance.
  • The introduction of new Kafka services is avoided, reducing resource costs while keeping the architecture simpler and thus more stable.

Fourth, customized extension

Flume has very good design consideration for scalability support of various components.

When our custom requirements are not met, we can choose the appropriate components to extend. Here are some of the things we expanded.

  • NsqSource.

It is relatively easy to customize a Source in Flume by inheriting an abstract class that already has a common implementation and implementing a few lifecycle methods. Note that the life cycle of a Flume component may be called many times. For example, Flume has the ability to automatically detect instance configuration changes and restart individual components, which requires proper resource release.

  • HdfsEventSink extended configuration.

It has role file function itself. For example, when Sink generates files by hour, the first event of this hour creates new files, and then closes the files after a fixed role configuration time (such as one hour). The problem here is that if the source normally has a small amount of data, for example, the first event of the 8 o ‘clock hour comes at 8:25, that means it takes 9:25 to close the file. Because the TMP file that is not closed is ignored by the computing engine of the offline data task, the offline data task at the hour level cannot obtain real-time data. The modification we made was to roll files based on the hour, not the time of the first event, such as a fixed 05 minutes to close the last hour of the file, and offline task scheduling after 05 minutes of the hour to solve this problem. The final result is shown below:

  • MetricsReportServer.

Enable MonitorService when you need to collect counter metrics for each component of the Flume instance run. Customizing a metric to report HTTP requests to a centralized Web service on a regular basis. Native HTTPMetricsServer is also based on HTTP services, the difference is that it uses Flume as the HTTP server, while we will deploy many instances on a machine, port allocation becomes a headache.

When we collect the following counter metrics, we can use them to implement some monitoring alerts.

{
    "identity":"olap_offline_daily_olap_druid_test_timezone_4@49"."startTime":1544287799839."reportCount":4933."metrics": {"SINK.olap_offline_daily_olap_druid_test_timezone_4_snk": {"ConnectionCreatedCount":"9"."ConnectionClosedCount":"8"."Type":"SINK"."BatchCompleteCount":"6335"."BatchEmptyCount":"2"."EventDrainAttemptCount":"686278"."StartTime":"1544287799837"."EventDrainSuccessCount":"686267"."BatchUnderflowCount":"5269"."StopTime":"0"."ConnectionFailedCount":"48460"
        },
        "SOURCE.olap_offline_daily_olap_druid_test_timezone_4_src": {"KafkaEventGetTimer":"26344146"."AppendBatchAcceptedCount":"0"."EventAcceptedCount":"686278"."AppendReceivedCount":"0"."StartTime":"1544287800219"."AppendBatchReceivedCount":"0"."KafkaCommitTimer":"14295"."EventReceivedCount":"15882278"."Type":"SOURCE"."OpenConnectionCount":"0"."AppendAcceptedCount":"0"."KafkaEmptyCount":"0"."StopTime":"0"
        },
        "CHANNEL.olap_offline_daily_olap_druid_test_timezone_4_cha": {"ChannelCapacity":"10000"."ChannelFillPercentage":"0.11"."Type":"CHANNEL"."ChannelSize":"11"."EventTakeSuccessCount":"686267"."StartTime":"1544287799332"."EventTakeAttemptCount":"715780"."EventPutAttemptCount":"15882278"."EventPutSuccessCount":"686278"."StopTime":"0"}}}Copy the code
  • Event timestamp interception. Some HDFS Sink services are sensitive to the time of message events. Data in the same hour must be stored in the same directoryHdfsEventSinkInstead of using system time to calculate the file directory, it should be based on a timestamp field in the message content. This can be extendedInterceptorTo solve it.InterceptorFor use inSourcePost event toChannelThe previous interception processing is generally used for event enrichmentheaderInformation. Strongly not recommended inSourceIn direct processing, implement aInterceptorCan satisfy otherSourceReusability of similar requirements.

Fifth, performance tuning

The most common configurations for performance tuning Flume instances are transaction Batch and Channel Capacity.

  • Transaction Batch refers to the batch configuration that can be set properly to significantly improve instance throughput. I mentioned aboveSourceChannelPut orSinkChannelAll take operations are performed by enabling transactions. Therefore, increasing the batch configuration of the two components can reduce CPU consumption and NETWORK I/O wait.
  • ChannelThe capacity of directly affects the event production and consumption at both ends of source and sink. The larger capacity is, the better the throughput is, but other factors prevent the setting from being very large. Such asMemoryChannel, which directly represents memory consumption and the number of events lost when a process exits unexpectedly. differentChannelDifferent considerations are required, ultimatelytrade-offIt’s inevitable.

Vi. Summary and outlook

Flume is a very stable service, which is well proven in our production environment. The model design is also very clear and easy to understand, with many off-the-shelf implementations for each component type and extension points in mind, so it is easy to find or customize the data pipeline solution we need.

As the number of users increases, a unified platform is needed to centrally manage all Flume instances. It has the following advantages:

  • Reduce Flume cost to users. You can enjoy the value of Flume without knowing much about it.
  • Coordinate the use of machine resources effectively.
  • Perfect monitoring makes FLume run more stable and reliable.

Of course, we have just started this step, and hopefully it will become more and more valuable in the future.