An overview of the

Once we collect the container logs to the message server, what do we do with them? Deploying a dedicated log processing workload can be costly, and it is difficult to estimate the standby of a log processing workload when the volume of logs increases or plummets. This paper provides a Serverless log processing idea, which can reduce the link cost of the task and improve its flexibility.

Our general design is to use the Kafka server as the log sink, and then use the input Kafka server logs as events to drive the Serverless workload to process the logs. The general steps are as follows:

  1. Set up Kafka server as the log receiver of Kubernetes cluster
  2. Deploy OpenFunction to provide Serverless capability for logging workloads
  3. Write log processing functions to capture specific logs to generate alarm messages
  4. Configure the Notification Manager to send alarms to Slack

In this scenario, we will take advantage of the Serverless capabilities that OpenFunction brings.

OpenFunction is an open source FaaS (Serverless) project from the KubeSphere community that aims to let users focus on their business logic without worrying about the underlying operating environment and infrastructure. The program currently has the following key capabilities:

  • Support for building OCI images through Dockerfile or BuildPacks
  • Support for running Serverless workloads using Knative Serving or OpenFunctionAsync (KEDA + Dapr) as Runtime
  • Built-in event-driven framework

Use Kafka as the log sink

First, we turned on the Logging component for the KubeSphere platform (see Enabling Pluggable Components for more information). We then use strimazi-kafka-operator to build a minimal Kafka server.

  1. Install strimazi-kafka-operator in the default namespace:

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    Copy the code
  2. Run the following command to create the Kafka cluster and Kafka Topic in the default namespace. The storage type of the Kafka and Zookeeper clusters created by this command is ephemeral, demonstrated using emptyDir.

    Note that we created a topic called “logs” at this point, which we will use later

    cat <<EOF | kubectl apply -f - apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: Note kafka-logs-Receiver namespace: default spec: kafka: version: 2.8.0 replicas: 1 Listeners: name: plain port: note note: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 log. The message. The format. Version: '2.8' system. The broker. Protocol. Version: "2.8" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {} --- apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: logs namespace: default labels: strimzi.io/cluster: kafka-logs-receiver spec: partitions: 10 replicas: 3 config: retention.ms: 7200000 segment.bytes: 1073741824 EOFCopy the code
  3. Run the following command to check the Pod status and wait for Kafka and Zookeeper to run and start.

    $ kubectl get po
    NAME                                                   READY   STATUS        RESTARTS   AGE
    kafka-logs-receiver-entity-operator-568957ff84-nmtlw   3/3     Running       0          8m42s
    kafka-logs-receiver-kafka-0                            1/1     Running       0          9m13s
    kafka-logs-receiver-zookeeper-0                        1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm              1/1     Running       0          11m
    Copy the code

    Run the following command to view the metadata of the Kafka cluster:

    #Start a tool POD
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    #View the metadata of the Kafka cluster
    $ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092
    Copy the code

We add this Kafka server as a log sink.

  1. Log in to KubeSphere’s Web console as admin. Click platform Management in the upper left corner and select Cluster Management.

    If you have multiple clusters enabled, you can select a cluster.

  2. On the cluster management page, select Log collection under cluster Settings.

  3. Click Add Log sink and select Kafka. Enter the Kafka proxy address and port information, then click OK to continue.

  1. Verify that the Kafka cluster can receive logs from Fluent Bit by running the following command:

    #Start a tool POD
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm 
    #Check logging in Logs Topic
    $ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs
    Copy the code

Deploy OpenFunction

Following the design outlined, we need to deploy OpenFunction first. The OpenFunction project references many third-party projects, such as Knative, Tekton, ShipWright, Dapr, KEDA, etc. Manual installation is cumbersome. The method described in the Prerequisites document is recommended. One-click deployment of OpenFunction’s dependent components.

— with-ShipWright means to deploy shipWright as the build driver for the function — with-OpenFuncasync means to deploy openFuncAsync Runtime as the load driver for the function while your network is accessing When Github and Google are limited, you can use the –poor-network parameter to download related components

sh hack/deploy.sh --with-shipwright --with-openFuncAsync --poor-network
Copy the code

Deploy OpenFunction:

Select the latest stable version to Install here, or you can use the development version, refer to the Install documentation

For ShipWright to work, we provide a default build policy, which can be set using the following command:

kubectl apply -f https://raw.githubusercontent.com/OpenFunction/OpenFunction/main/config/strategy/openfunction.yaml
Copy the code
Kubectl apply -f https://github.com/OpenFunction/OpenFunction/releases/download/v0.3.0/bundle.yamlCopy the code

Write log handlers

Let’s take creating and deploying WordPress as an example to build a WordPress application as a log producer. The namespace where the application workload resides is Demo-project, and the Pod name is WordPress-V1-F54F697C5-hdn2z.

When the request result is 404, we receive the following log:

{"@timestamp":1629856477.226758."log":"*. *. *. * - [25 / Aug / 2021:01:54:36 + 0000] \" GET/notfound HTTP / 1.1 \ "404\49923" - \ "\" curl / 7.58.0 \ "\ n"."time":"The 2021-08-25 T01:54:37. 226757612 z"."kubernetes": {"pod_name":"wordpress-v1-f54f697c5-hdn2z"."namespace_name":"demo-project"."container_name":"container-nrdsp1"."docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5"."container_image":"Wordpress: 4.8 apache"}}

Copy the code

When a request result is 404, send an alarm notification to the receiver (you can configure a Slack alarm receiver based on the Slack notification configuration), and record the namespace, Pod name, request path, request method, and so on. Following this requirement, we write a simple handler function:

You can learn how to use openFunction-context from the OpenFunction Context Spec. This is the library of OpenFunction tools for users to write functions. You can learn more about OpenFunction functions through OpenFunction Samples

package logshandler

import (
	"encoding/json"
	"fmt"
	"log"
	"regexp"
	"time"

	ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
	alert "github.com/prometheus/alertmanager/template"
)

const (
	HTTPCodeNotFound = "404"
	Namespace        = "demo-project"
	PodName          = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"
	AlertName        = "404 Request"
	Severity         = "warning"
)

The LogsHandler CTX argument provides a context handle to the user function in the cluster context, such as ctx.SendTo, to send data to the specified destination
The LogsHandler in argument is used to pass data (if any) from the input source to the function as bytes
func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {
	content := string(in)
	// We set up three regular expressions to match the HTTP return code, the resource namespace, and the resource Pod name
	matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)
	matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)
	matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)

	ifmatchHTTPCode && matchNamespace && matchPodName ! =nil {
		log.Printf("Match log - Content: %s", content)

		// If the three regular expressions match at the same time, extract some information from the log content and fill it into the alarm information
		// The information is: HTTP Method, HTTP Path, and Pod name
		match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)
		if match == nil {
			return 500
		}
		path := match[len(match)- 1]
		method := match[len(match)2 -]
		podName := matchPodName[len(matchPodName)- 1]

		// After collecting the key information, use alterManager's Data structure to assemble the alarm information
		notify := &alert.Data{
			Receiver:          "notification_manager",
			Status:            "firing",
			Alerts:            alert.Alerts{},
			GroupLabels:       alert.KV{"alertname": AlertName, "namespace": Namespace},
			CommonLabels:      alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},
			CommonAnnotations: alert.KV{},
			ExternalURL:       "",
		}
		alt := alert.Alert{
			Status: "firing",
			Labels: alert.KV{
				"alertname": AlertName,
				"namespace": Namespace,
				"severity":  Severity,
				"pod":       podName,
				"path":      path,
				"method":    method,
			},
			Annotations:  alert.KV{},
			StartsAt:     time.Now(),
			EndsAt:       time.Time{},
			GeneratorURL: "",
			Fingerprint:  "",
		}
		notify.Alerts = append(notify.Alerts, alt)
		notifyBytes, _ := json.Marshal(notify)

		// Use ctx.SendTo to send the content to the output named "notification-manager" (you can find its definition later in the function configuration logs-handler-function.yaml)
		if err := ctx.SendTo(notifyBytes, "notification-manager"); err ! =nil {
			panic(err)
		}
		log.Printf("Send log to notification manager.")}return 200
}

Copy the code

We upload this function to the repository and record the address of the repository and the directory path of the code in the repository, which we will use in the following function creation steps.

You can find this example in OpenFunction Samples.

Create a function

Next we will use OpenFunction to build the above functions. First set a key file push-secret for accessing the image repository (after the OCI image is built using code, OpenFunction will upload the image to the user’s image repository for subsequent load startup) :

REGISTRY_SERVER=https://index.docker.io/v1/ REGISTRY_USER=<your username> REGISTRY_PASSWORD=<your password>
kubectl create secret docker-registry push-secret \
    --docker-server=$REGISTRY_SERVER \
    --docker-username=$REGISTRY_USER \
    --docker-password=$REGISTRY_PASSWORD
Copy the code

Logs-handler-function. yaml: logs-handler-function.yaml: logs-handler-function.yaml

The function definition contains the use of two key components:

Dapr shields applications from complex middleware, making it easy for logs-Handler to handle events in Kafka

KEDA drives the start of the logs-Handler function by monitoring event traffic in the message server and dynamically extends the logs-Handler instance based on the delay in consuming messages in Kafka

apiVersion: core.openfunction.io/v1alpha1
kind: Function
metadata:
  name: logs-handler
spec:
  version: "v1.0.0"
  # this defines the upload path of the built image
  image: openfunctiondev/logs-async-handler:v1
  imageCredentials:
    name: push-secret
  build:
    builder: Openfunctiondev/go115 - builder: v0.2.0
    env:
      FUNC_NAME: "LogsHandler"
    The path to the source code is defined here
    The url is the code repository address mentioned above
    # sourceSubPath is the directory path of the code in the repository
    srcRepo:
      url: "https://github.com/OpenFunction/samples.git"
      sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"
  serving:
    # OpenFuncAsync is an event-driven asynchronous function runtime implemented by OpenFunction via KEDA+Dapr
    runtime: "OpenFuncAsync"
    openFuncAsync:
      The input (kafka-receiver) and output (notification-Manager) of the function are defined here, corresponding to the definitions in components below
      dapr:
        inputs:
          - name: kafka-receiver
            type: bindings
        outputs:
          - name: notification-manager
            type: bindings
            params:
              operation: "post"
              type: "bindings"
        annotations:
          dapr.io/log-level: "debug"
        This completes the definition of input and output (Dapr Components)
        components:
          - name: kafka-receiver
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-logs-receiver-kafka-brokers:9092"
              - name: authRequired
                value: "false"
              - name: publishTopic
                value: "logs"
              - name: topics
                value: "logs"
              - name: consumerGroup
                value: "logs-handler"
          This is the address of notification-manager for KubeSphere
          - name: notification-manager
            type: bindings.http
            version: v1
            metadata:
              - name: url
                value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
      keda:
        scaledObject:
          pollingInterval: 15
          minReplicaCount: 0
          maxReplicaCount: 10
          cooldownPeriod: 30
          Kafka server logs topic. Kafka server logs topic
          # Also defines the message stack threshold (10), that is, when the message stack exceeds 10, the number of logs-handler instances is automatically expanded
          triggers:
            - type: kafka
              metadata:
                topic: logs
                bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092
                consumerGroup: logs-handler
                lagThreshold: "10"
Copy the code

The results demonstrate

Let’s start by turning off the Kafka log Sink: on the Log Collection page, click to go to the Kafka Log Sink details page, then click More actions and select Change Status to set it to off.

After some time, we can observe that the logs-handler function instance has shrunk to zero.

The Kafka log sink is then activated and logs-Handler starts.

~ # kubectl get po --watch
NAME                                                     READY   STATUS        RESTARTS   AGE
kafka-logs-receiver-entity-operator-568957ff84-tdrrx     3/3     Running       0          7m27s
kafka-logs-receiver-kafka-0                              1/1     Running       0          7m48s
kafka-logs-receiver-zookeeper-0                          1/1     Running       0          8m12s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          34s
strimzi-cluster-operator-687fdd6f77-kc8cv                1/1     Running       0          10m
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          36s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          37s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          2s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   1/2     Running             0          4s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   2/2     Running             0          11s
Copy the code

We then request a non-existent path to the WordPress application:

curl http://<wp-svc-address>/notfound
Copy the code

You can see that Slack has received this message (in contrast, Slack does not receive an alarm message when we visit the WordPress site normally) :

Further exploration

  • Synchronization function solution

    In order for Knative Serving to work properly, we need to set the load balancer address of its gateway. (You can use the native address as workaround)

    Replace “1.2.3.4” with the address in the actual scenario.

    kubectl patch svc -n kourier-system kourier \ -p '{"spec": {"type": "LoadBalancer", "externalIPs": ["1.2.3.4"]}}' kubectl patch configmap/ config-domain-n knative-serving \ --type merge --patch '{" data ": {". 2. Sslip. IO" : ""}}'Copy the code

    In addition to directly running the Kafka server driver functions (asynchronous mode), OpenFunction also supports the use of the built-in event framework to connect to the Kafka server, and then drive the Knative functions in the mode of Sink. You can refer to the example in OpenFunction Samples.

    In this scenario, the synchronous function is slower than the asynchronous function. Of course, we can also use KEDA to trigger the Concurrency mechanism of Knative Serving, but in general it lacks the convenience of the asynchronous function. (We will optimize the Event framework of OpenFunction in the later stages to solve the defects of synchronization function)

    Thus, different types of Serverless functions excel in task scenarios, such as an ordered control flow function that needs to be handled by synchronous rather than asynchronous functions.

review

Serverless brings the ability to quickly disassemble and refactor business scenarios that we have come to expect.

As shown in this case, OpenFunction not only improves the flexibility of log processing and alarm notification link in the way of Serverless, but also simplifies the complex configuration steps of Kafka interconnection into semantically explicit code logic through the function framework. At the same time, we are evolving OpenFunction and will implement our own components driven by our own Serverless capabilities in future releases.

This article is published by OpenWrite!