Loki has many of these limit policies, some of which are already open in configuration files, and some of which are implemented in configuration code but not yet open. For the most part, the default parameters are good enough, but sometimes we need to fine-tune them. Limits_Config: Limits_Config: Limits_Config: Limits_Config: Limits_Config: Limits_Config: Limits_Config: Limits_Config

1. Current limiter

Xiao Bai accidentally restarted and upgraded the service of Loki some time ago. Because the process lasted for a period of time, when the service was restored, the client always received the following error when pushing the log.

429 Too Many Requests Ingestion rate limit exceeded (limit 10485760 bytes/sec)
Copy the code

As a result, the buffer of log collectors on some servers is always congested, which causes the delay of log collection.

Two parameters in limits_config control the rate at which logs are received by the Loki Distributor:

Limits_config: # token bucket injection ingestion_rate_mb token rate: | default = 4] # token bucket capacity ingestion_burst_size_mb: | default = 6]Copy the code

The method of speed limiting in Distributor Loki is realized by Golang standard library’s Time /rate limiter. Its general logic is as follows:

First the protobuf encoding is declared when distributor processes log push requests, which may include multiple log streams and log timestamps and entries in label information and Entry information in each stream.


type PushRequest struct {
	Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
}

type Stream struct {
	Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
	Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
}

type Entry struct {
	Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
	Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}
Copy the code

Fetch the log Entry from the Entry of the log flow and calculate the length to get the validatedSamplesSize, then pass the length of this log to the traffic limiter

func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {

	for _, stream := range req.Streams {
		iferr := d.validator.ValidateLabels(userID, stream); err ! =nil {
			validationErr = err
			continue
		}

		entries := make([]logproto.Entry, 0.len(stream.Entries))
		for _, entry := range stream.Entries {
			iferr := d.validator.ValidateEntry(userID, stream.Labels, entry); err ! =nil {
				validationErr = err
				continue
			}
			entries = append(entries, entry)
			validatedSamplesSize += len(entry.Line)
			validatedSamplesCount++
		}
Copy the code

Gets the current time, and the tenant ID and pass ingestionRateLimiter validatedSamplesSize AllowN current limiter, if current limit will ignore the submitted successfully, and print an error in the screenshot above,

now := time.Now() if ! d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) { validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount)) validation.DiscardedBytes.WithLabelValues(validation.RateLimited, UserID).add (float64(validatedSamplesSize)) // Throw 429 Too Many Requests in Requests httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)) }Copy the code

Next comes the specific processing within the restrictor, where we see that Distributor gets the token primarily through ratelimiter.allown. In other words, by the current time, judge the number of tokens in the flow limiter bucket and the number of log entry capacity (N), if yes, return true, consume N tokens from the bucket at the same time, return failure anyway. This part we can find in here

https://github.com/cortexproject/cortex/blob/master/pkg/util/limiter/rate_limiter.go
Copy the code

At this point, all we need to change is the size and rate of the token bucket, which were the two parameters in limits_config from the beginning. We can increase it appropriately and get immediate results:

limits_config:
  ingestion_rate_mb: 32
  ingestion_burst_size_mb: 64
Copy the code

You can see that after adjusting the rate and burst, the client’s log buffer in the green box is quickly consumed.

extension

In addition, Loki’s current limiter has a global dynamic configuration

Limits_config: ingestion_rate_strategy: | default = "local"Copy the code

The default is local, which, if your Loki is a distributed system, acts on each distributor with the aforementioned limiter token bucket, greatly increasing the throughput of Loki log collection. If the configuration for global distributor will use ingestion_rate_mb/ring. HealthyInstancesCount get global tenants limit rate of each distributor. This part of the code is implemented in ingestion_rate_strategy.go

type globalStrategy struct {
	limits *validation.Overrides
	ring   ReadLifecycler
}

func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
	return &globalStrategy{
		limits: limits,
		ring:   ring,
	}
}

func (s *globalStrategy) Limit(userID string) float64 {
	numDistributors := s.ring.HealthyInstancesCount()

	if numDistributors == 0 {
		return s.limits.IngestionRateBytes(userID)
	}

	return s.limits.IngestionRateBytes(userID) / float64(numDistributors)
}
Copy the code

According to the tenant to enable the global log rate limiting policy, it should not be used in practice

Conclusion:

Flow limiter is a very important component in background service. It can protect background service from overload by limiting the number of requests or traffic. Token bucket is a common implementation method. Loki is clearly using token buckets to limit the capacity of the log stream.

For more information about the use of time/ rates in the Golang library, check out godoc.org/golang.org/…

2. Limit the label length

Loki Limit_Config limits the length of the label key value pair, including the following:

Key maximum length max_label_name_length limits_config: # label: |default = 1024# value maximum length of the label max_label_value_length: |default = 2048The biggest label number # each flow max_label_names_per_series: |default = 30
Copy the code

This part of the label restriction code is in distributor’s validator.go, for example counting the number of key-value pairs in labels, discarking the configured maximum and returning BadRequest

ls, err := util.ToClientLabels(stream.Labels)
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
   validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
   bytes := 0
   for _, e := range stream.Entries {
   	bytes += len(e.Line)
   }
   validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
   return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
   }
Copy the code

For example, here distributor extracts the length of name and value in labels and compares them with the maximum length specified in the configuration, as well as the name in labels to prevent the same name.

maxLabelNameLength := v.MaxLabelNameLength(userID) maxLabelValueLength := v.MaxLabelValueLength(userID) lastLabelName :=  "" for _, l := range ls { if len(l.Name) > maxLabelNameLength { updateMetrics(validation.LabelNameTooLong, userID, stream) return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name)) } else if len(l.Value) > maxLabelValueLength { updateMetrics(validation.LabelValueTooLong, userID, stream) return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value)) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { updateMetrics(validation.DuplicateLabelNames, userID, stream) return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name)) } lastLabelName = l.Name }Copy the code

Conclusion:

There is no particular emphasis on length limits for log labels in Loki, but it is not appropriate to configure max_label_NAMes_per_series too large, given that unbounded labels can drag down Loki performance

3. Tenant restrictions

By default we deploy Loki with only one tenant, which uses a tenant ID called fake. Therefore, in Loki’s restriction policy, parameters related to tenants are as follows:

Limits_config: # max_streams_per_user: | tenants default = 10000 # enable global biggest log flow number, the default # 0 closed once the configuration, each tenant log flow will have ingester # registered to hash ring at the same time state of the HEALTH calculated the number of dynamic, Any change in the number of ingester takes effect dynamically to this value max_global_STREAMS_per_user: default = 0Copy the code

Here we can see the calculation method between local and global of max_streams_per_user as follows

(max_global_streams_per_user / max_streams_per_user) * replication_factor
Copy the code

The replication factor configuration is ingest_config.replication_factor

Here is the code implementation for the global maximum log flow in limite.go:

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
	if globalLimit == 0 {
		return 0
	}
	// Get the healthy ingester number from the ring
	numIngesters := l.ring.HealthyInstancesCount()
	if numIngesters > 0 {
	
	// The global maximum is divided by the number of healthy ingesters, multiplied by a replicator
		return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
	}
	return 0
}
Copy the code

AssertMaxStreamsPerUser simply processes local and global limit data. If the number of log flows is lower than the calculated limit, null is returned; otherwise, exceptions exceeding the tenant maximum limit are thrown

func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
	localLimit := l.limits.MaxLocalStreamsPerUser(userID)

	globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
	// Calculate the global limit according to the above algorithm
	adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)

   // When adjustedGlobalLimit is not 0 and localLimit is greater than adjustedGlobalLimit
   //calculatedLimit is dynamically generated limit, or localLimit is localLimit
	calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
	
	if calculatedLimit == 0 {
		calculatedLimit = math.MaxInt32
	}

   // Null is returned if log flows are less than calculatedLimit, otherwise an exception is returned when the number of tenant log flows exceeds
	if streams < calculatedLimit {
		return nil
	}
	return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *Limiter) minNonZero(first, second int) int {
	if first == 0|| (second ! =0 && first > second) {
		return second
	}
	return first
}
Copy the code

In ingester’s instance.go, Loki calls the AssertMaxStreamsPerUser method above to determine the tenant flow’s limits, and if it does not return empty, discards the processing and throws the TooManyRequests exception

err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) if err ! = nil { validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) bytes := 0 for _, e := range pushReqStream.Entries { bytes += len(e.Line) } validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg()) }Copy the code

Conclusion:

If max_global_STREAMS_per_user is enabled, the number of replicators should be properly configured. If the number of replicators > the number of healthy ingesters, the tenant limit is calculated to be much larger than defined in the configuration file.

4. Limit the log entry size

In Loki, entry size limits are imposed on each log stream that the client pushes into the distributor. This is not limited by default in the configuration, which means that the log size per row can be unlimited 😄. In most cases, we do not limit this. Consider enabling size limits for each line of logs.

Limits_config: # log entry size limits, the default does not restrict max_line_size: | default = noneCopy the code

In distributor’s Validator. go, the ValidateEntry function compares the lengths of entries in the fetch log stream. If MaxLineSize is not 0 and the length of entries in the log stream is greater than MaxLineSize, processing is aborted. And throws the LineTooLong exception

func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error { if maxSize := v.MaxLineSize(userID); maxSize ! = 0 && len(entry.Line) > maxSize { validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line))) return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels)) } }Copy the code

5. Query restrictions

There is a limit to the number of lines returned by Loki queries. This directly controls how many lines you can get from Loki in Grafana or any other platform. 5000 lines will suffice in most cases, but you can increase or set it to 0 if you feel that is not enough

Limits_config: # query returns item limit max_entries_limit_per_query: | default = 5000Copy the code

In addition, there are chunk, stream and series limits for single queries, but we will not adjust them for most scenarios, so we will not analyze them in detail here.

Limits_config: # max_chunks_per_query: | default = 2000000 # is matching to limit the query of the chunk size, the default 0 for no limit max_query_length: | default = 0 # word query log flow number matching the most max_streams_matchers_per_query: | default = 1000 # limit the query log metric number biggest max_query_series: | default = 500 # query concurrency max_query_parallelism | default = 14 # allow tenants to cache the results of effective time max_cache_freshness_per_query | default = 1m.Copy the code

conclusion

In fact, the limits section of Loki does not require any additional adjustments unless we have a clear scenario for Loki’s log store, which gives us the opportunity to adjust these parameters.

For example, if your Loki is an internal log and does not involve multi-tenant, your limit policy is usually set to a high value. If your Loki is a multi-tenant log and each tenant has its own specific log, you may focus on limiting the log acceptance and query of a single tenant. The behavior of a single tenant does not affect the cluster. Loki doesn’t seem to have a shard service for tenant identity yet, but there may be a better way to handle multi-tenant scenarios in the future


Pay attention to the public account “cloud native Xiaobai”, reply [into the group] to enter the Loki learning group