Kafka monitoring implementation mechanism learning

This article outlined the Metrics of kafka module architecture: blog.csdn.net/u010952362/… It is recommended to eat after reading this article.

Monitoring Implementation

Initialize the Metrics

Step 1: Metrics are initialized for the first time in kafkaServer’s startup () method:

metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
Copy the code

The metirCS instance is then passed into the quotaManagers constructor.

quotaManagers = QuotaFactory.instantiate(config, metrics, time)
Copy the code

The quotaManagers are used in kafka to limit kafka. Producer’s transfer speed. For example, in the config file, producer cannot transfer faster than 5MB/s. The name restriction is implemented using quotaManager.

Metrics and Metrics

Step 2: Let’s dive into the source code for the Metrics class

public class Metrics implements Closeable { .... . private final ConcurrentMap<MetricName, KafkaMetric> metrics; private final ConcurrentMap<String, Sensor> sensors;Copy the code

Metrics and sensors concurrentMap are two important member properties of metrics. Name is KafkaMetric and Sensor.

First, KafkaMetric

KafkaMetric implements the Metric interface. Let’s look at the Metric interface first and see that its core method value() returns the value of the parameter to monitor.

public interface Metric {
​
    /**
     * A name for this metric
     */
    public MetricName metricName();
​
    /**
     * The value of the metric
     */
    public double value();
​
}
Copy the code

Then let’s look at KafkaMetric, which implements the Metric interface:

@Override
public double value() {
    synchronized (this.lock) {
        return value(time.milliseconds());
    }
}
​
double value(long timeMs) {
    return this.measurable.measure(config, timeMs);
}
Copy the code

This value () method ends up calling this.____. Measure (config, timeMs), which is accomplished by another member property in kafkaMetric.

_____interface source code

public interface Measurable {
​
    /**
     * Measure this quantity and return the result as a double
     * @param config The configuration for this metric
     * @param now The POSIX time in milliseconds the measurement is being taken
     * @return The measured value
     */
    public double measure(MetricConfig config, long now);
​
}
Copy the code

To summarize Metrics:

Metrics -> Metric -> Measurable

For example, Metrics is the dashboard of the car, Metric is the instrument of the dashboard, and Measurable is the encapsulation of what is really being measured.

Sensor

ConcurrentMap

sensors;
,>

The following is the source of the Sensor class:

/** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. * For example a sensor on message size would record a sequence of message sizes using the * {@link #record(double)} api and would maintain a The sensor applies a sequential sequence of values to a set of related metrics. * For example, a message size sensor logs a series of message sizes using the Logging application interface and maintains a set of metrics about request sizes, such as average or maximum. */ Public final class Sensor {// Kafka has only one instance of Metrics. private final String name; private final Sensor[] parents; private final List<Stat> stats; private final List<KafkaMetric> metrics;Copy the code

Interpretation of Sensor: KafkaMetric only returns the value of a parameter. However, Sensor has the function of calculating the time series of a parameter, such as the average value and the minimum value. How can these statistics be realized? The answer is the List STATS member property.

Stat

Stat Stat Stat Stat Stat Stat Stat Stat

public interface Stat {
​
    /**
     * Record the given value
     * @param config The configuration to use for this metric
     * @param value The value to record
     * @param timeMs The POSIX time in milliseconds this value occurred
     */
    public void record(MetricConfig config, double value, long timeMs);
​
}
Copy the code

Stat is an interface that uses a record method to record a sample value.

public final class Max extends SampledStat { public Max() { super(Double.NEGATIVE_INFINITY); } @Override protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value = Math.max(sample.value, value); } @Override public double combine(List<Sample> samples, MetricConfig config, long now) { double max = Double.NEGATIVE_INFINITY; for (int i = 0; i < samples.size(); i++) max = Math.max(max, samples.get(i).value); return max; }}Copy the code

SampledStat is an implementation class of the Stat interface, and Max inherits SampledStat. Update compares the current value to the historical maximum value, and updates the maximum value (bubble). The Combine method is equivalent to finding the maximum with a complete bubble sort.

Then we go back to the Sensor class, check its record source:

public void record(double value, long timeMs) {
    this.lastRecordTime = timeMs;
    synchronized (this) {
        // increment all the stats
        for (int i = 0; i < this.stats.size(); i++)
            this.stats.get(i).record(config, value, timeMs);
        checkQuotas(timeMs);
    }
    for (int i = 0; i < parents.length; i++)
        parents[i].record(value, timeMs);
}
Copy the code

The Record method, for each stats registered in it, submits to the parent sensor if it has one. The checkQuotas method is used by Kafka to check whether the value of each KafkaMetric registered on the sensor exceeds the quota set in the config file.

conclusion

  • The Metrics dashboard

    • Metric One of the meters in the dashboard

      • The source of Measurable instrument data — measurement
    • Sensor Provides statistics function

      • Stat Specifies the implementer of the statistics function

reference

Blog.csdn.net/qq_21519863…