Implement kubernetes log alarm in serverless mode

Qingyun technology community 2021-09-15 10:01:03

When we collect the container logs to the message server , What should we do with these logs ? Deploying a dedicated log processing workload can be costly , And when the log volume soars 、 It is also difficult to assess the standby number of log processing workloads in case of sudden drop . This paper provides a method based on Serverless Log processing ideas , It can reduce the link cost of the task and improve its flexibility .

Our general design is to use Kafka The server acts as a receiver for logs , Then enter Kafka The server logs as events , drive Serverless The workload processes the logs . The approximate steps are :

  1. build Kafka Server as Kubernetes The log sink of the cluster

  2. Deploy OpenFunction Provides for log processing workloads Serverless Ability

  3. Write log processing functions , Grab a specific log and generate an alarm message

  4. To configure <b>Notification Manager[1]</b> Send alarm to Slack


In this scenario , We'll take advantage of <b>OpenFunction[2]</b> It brings Serverless Ability .

OpenFunction[3] yes KubeSphere An open source community FaaS(Serverless) project , Designed to focus users on their business logic , Without concern for the underlying operating environment and infrastructure . The project currently has the following key capabilities :

  • Supported by dockerfile or buildpacks Way to build OCI Mirror image
  • Support use Knative Serving or OpenFunctionAsync ( KEDA + Dapr ) As runtime function Serverless The workload
  • Built in event driven framework

Use Kafka As a log receiver

First , We are KubeSphere Platform open <b>logging</b> Components ( You can refer to <b> Enable pluggable components [3]</b> For more information ). And then we use <b>strimzi-kafka-operator[5]</b> Build a minimal Kafka The server .

  1. stay default Install in namespace <b>strimzi-kafka-operator[6]</b> :

     helm repo add strimzi
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
  2. Run the following command in default Create... In namespace Kafka Clusters and Kafka Topic, Created by this command Kafka and Zookeeper The storage type of the cluster is <b>ephemeral</b>, Use emptyDir demonstrate .

Be careful , At this point, we created a file named “logs” Of topic, It will be used later

 cat <<EOF | kubectl apply -f -
kind: Kafka
name: kafka-logs-receiver
namespace: default
version: 2.8.0
replicas: 1
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: '2.8' "2.8"
type: ephemeral
replicas: 1
type: ephemeral
topicOperator: {}
userOperator: {}
kind: KafkaTopic
name: logs
namespace: default
labels: kafka-logs-receiver
partitions: 10
replicas: 3
config: 7200000
segment.bytes: 1073741824
  1. Run the following command to see Pod state , And wait for Kafka and Zookeeper Run and start .

     $ kubectl get po
    kafka-logs-receiver-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s
    kafka-logs-receiver-kafka-0 1/1 Running 0 9m13s
    kafka-logs-receiver-zookeeper-0 1/1 Running 0 9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m

Run the following command to see Kafka Metadata of the cluster :

 # Start a tool pod
 $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
 # see Kafka Metadata of the cluster 
 $ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092

This will make us Kafka Add server as Log Sink .

  1. With <b>admin</b> Identity login KubeSphere Of Web Console . Click on the top left <b> Platform management </b>, And then choose <b> Cluster management </b>.

If you enable <b> Multi cluster function [7]</b>, You can select a cluster .

  1. stay <b> Cluster management </b> page , choice <b> Cluster settings </b> Under the <b> Log collection </b>.

  2. Click on <b> Add Log Sink </b> And select <b>Kafka</b>. Input Kafka Proxy address and port information , And then click <b> determine </b> continue .


  1. Run the following command to verify Kafka Whether the cluster can be from Fluent Bit Receiving log : # Start a tool pod $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm # Check logs topic The log situation in $ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs

Deploy OpenFunction

Follow the design in the overview , We need to deploy OpenFunction.OpenFunction The project references many third-party projects , Such as Knative、Tekton、ShipWright、Dapr、KEDA etc. , Manual installation is cumbersome , Recommended <b>Prerequisites file [8]</b> The method in , One key deployment OpenFunction The dependent components of .

among --with-shipwright Indicates deployment shipwright Build drivers as functions --with-openFuncAsync Indicates deployment OpenFuncAsync Runtime Load driven as a function And when your network is accessing Github And Google Limited time , You can add --poor-network Parameter is used to download related components

 $ sh hack/ --with-shipwright --with-openFuncAsync --poor-network

Deploy OpenFunction:

Choose here to install the latest stable version , You can also use the development version , Reference resources <b>Install file [9]</b> For normal use ShipWright , We provide a default build strategy , You can set this policy using the following command : $ kubectl apply -f

 $ kubectl apply -f

Write log processing functions

We use <b> Create and deploy WordPress[10]</b> For example , Building a WordPress The application acts as the producer of logs . The namespace of the workload of the application is “demo-project”,Pod The name is “wordpress-v1-f54f697c5-hdn2z”.

When the request result is 404 when , The log we received is as follows :

 {"@timestamp":1629856477.226758,"log":"*.*.*.* - - [25/Aug/2021:01:54:36 +0000] \"GET /notfound HTTP/1.1\" 404 49923 \"-\" \"curl/7.58.0\"\n","time":"2021-08-25T01:54:37.226757612Z","kubernetes":{"pod_name":"wordpress-v1-f54f697c5-hdn2z","namespace_name":"demo-project","container_name":"container-nrdsp1","docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5","container_image":"wordpress:4.8-apache"}}

Our need is : When a request results in 404 when , Send an alarm notification to the receiver ( According to <b> To configure Slack notice [11]</b> To configure a Slack Alarm receiver ), And record the namespace 、Pod name 、 Request path 、 Request method and other information . According to this demand , We write a simple handler :

You can start your <b>OpenFunction Context Spec[12]</b> I understand openfunction-context How to use , This is a OpenFunction Provide a tool library for users to write functions You can go through <b>OpenFunction Samples[13]</b> Learn more about OpenFunction Function case

package logshandler
import (
ofctx ""
alert ""
const (
HTTPCodeNotFound = "404"
Namespace = "demo-project"
PodName = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"
AlertName = "404 Request"
Severity = "warning"
// LogsHandler ctx Parameter provides the context handle of the user function in the cluster context , Such as ctx.SendTo Used to send data to a specified destination 
// LogsHandler in Parameter is used to input data from the source ( if there be ) With bytes Passed to the function 
func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {
content := string(in)
// Here we set up three regular expressions , Used to match HTTP Return code 、 Resource namespace 、 resources Pod name 
matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)
matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)
matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)
if matchHTTPCode && matchNamespace && matchPodName != nil {
log.Printf("Match log - Content: %s", content)
// If the above three regular expressions hit at the same time , Then we need to extract some information from the log content , Used to fill in the alarm information 
// This information is :404 How to request (HTTP Method)、 Request path (HTTP Path) as well as Pod name 
match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)
if match == nil {
return 500
path := match[len(match)-1]
method := match[len(match)-2]
podName := matchPodName[len(matchPodName)-1]
// After collecting key information , We use altermanager Of Data Structure assembly alarm information 
notify := &alert.Data{
Receiver: "notification_manager",
Status: "firing",
Alerts: alert.Alerts{},
GroupLabels: alert.KV{"alertname": AlertName, "namespace": Namespace},
CommonLabels: alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},
CommonAnnotations: alert.KV{},
ExternalURL: "",
alt := alert.Alert{
Status: "firing",
Labels: alert.KV{
"alertname": AlertName,
"namespace": Namespace,
"severity": Severity,
"pod": podName,
"path": path,
"method": method,
Annotations: alert.KV{},
StartsAt: time.Now(),
EndsAt: time.Time{},
GeneratorURL: "",
Fingerprint: "",
notify.Alerts = append(notify.Alerts, alt)
notifyBytes, _ := json.Marshal(notify)
// Use ctx.SendTo Send the content to a file named "notification-manager" The output of the ( You can configure the following functions logs-handler-function.yaml Find its definition in )
if err := ctx.SendTo(notifyBytes, "notification-manager"); err != nil {
log.Printf("Send log to notification manager.")
return 200

We upload this function to the code warehouse , Record the address of the code warehouse and the directory path of the code in the warehouse , We will use these two values in the following steps to create the function .

You can <b>OpenFunction Samples[14]</b> Find this case in .

Create a function

Next we will use OpenFunction Build the above function . First, set up a secret key file for accessing the image warehouse <b>push-secret</b>( When using code to build OCI After mirroring ,OpenFunction The image will be uploaded to the user's image warehouse , For subsequent load start-up ):

$ kubectl create secret docker-registry push-secret \
--docker-server=$REGISTRY_SERVER \
--docker-username=$REGISTRY_USER \

Application function <b>logs-handler-function.yaml</b>:

The function definition contains the use of two key components : <b>Dapr[15]</b> Shielding the application from complex middleware , bring logs-handler Can be handled very easily Kafka In the event <b>KEDA[16]</b> Drive by monitoring the event traffic in the message server logs-handler Function start , And according to Kafka Dynamic extension of message consumption delay in logs-handler example

kind: Function
name: logs-handler
version: "v1.0.0"
# Here, the upload path of the built image is defined
image: openfunctiondev/logs-async-handler:v1
name: push-secret
builder: openfunctiondev/go115-builder:v0.2.0
FUNC_NAME: "LogsHandler"
# The path of the source code is defined here
# url For the code warehouse address mentioned above
# sourceSubPath For the directory path of the code in the warehouse
url: ""
sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"
# OpenFuncAsync yes OpenFunction adopt KEDA+Dapr An event driven asynchronous function runtime
runtime: "OpenFuncAsync"
# The input of the function is defined here (kafka-receiver) And the output (notification-manager), With the following components The definition in corresponds to the association
- name: kafka-receiver
type: bindings
- name: notification-manager
type: bindings
operation: "post"
type: "bindings"
annotations: "debug"
# Here, the specific definitions of the above input and output are completed ( namely Dapr Components)
- name: kafka-receiver
type: bindings.kafka
version: v1
- name: brokers
value: "kafka-logs-receiver-kafka-brokers:9092"
- name: authRequired
value: "false"
- name: publishTopic
value: "logs"
- name: topics
value: "logs"
- name: consumerGroup
value: "logs-handler"
# Here is KubeSphere Of notification-manager Address
- name: notification-manager
type: bindings.http
version: v1
- name: url
value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
pollingInterval: 15
minReplicaCount: 0
maxReplicaCount: 10
cooldownPeriod: 30
# The trigger of the function is defined here , namely Kafka Server's “logs” topic
# At the same time, the message stacking threshold is defined ( Here is 10), That is, when the message accumulation exceeds 10,logs-handler The number of instances will automatically expand
- type: kafka
topic: logs
bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092
consumerGroup: logs-handler
lagThreshold: "10"

Result demonstration

Let's close it first Kafka Log Sink : stay <b> Log collection </b> page , Click to enter Kafka Log receiver details page , And then click <b> More operations </b> And select <b> Change state </b>, Set it to <b> close </b>.

Some time after deactivation , We can observe that logs-handler The function instance has shrunk to 0 了 .

then Kafka Log Sink <b> Activate </b>,logs-handler Then start .

 $ kubectl get po --watch
kafka-logs-receiver-entity-operator-568957ff84-tdrrx 3/3 Running 0 7m27s
kafka-logs-receiver-kafka-0 1/1 Running 0 7m48s
kafka-logs-receiver-zookeeper-0 1/1 Running 0 8m12s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 2/2 Terminating 0 34s
strimzi-cluster-operator-687fdd6f77-kc8cv 1/1 Running 0 10m
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 2/2 Terminating 0 36s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 37s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f 0/2 Terminating 0 38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 Pending 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 Pending 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 ContainerCreating 0 0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 0/2 ContainerCreating 0 2s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 1/2 Running 0 4s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c 2/2 Running 0 11s

Then we went to WordPress Apply a nonexistent path to initiate the request :

$ curl http://<wp-svc-address>/notfound

You can see Slack This message has been received in ( By contrast , When we normally visit the WordPress Site time , Slack No alarm message will be received in ):


Further exploration

  • Solution of synchronization function

For normal use Knative Serving , We need to set the load balancer address of its gateway .( You can use the local address as workaround) The following "" Replace with the address in the actual scene . file

Except directly by Kafka Server driven functions work ( Asynchronous way ),OpenFunction It also supports docking with its own event framework Kafka The server , Later on Sink Drive... In a way Knative Function operation . You can refer to <b>OpenFunction Samples[17]</b> The case in .

In the scheme , The processing speed of synchronous functions is lower than that of asynchronous functions , Of course, we can also use KEDA To trigger Knative Serving Of concurrency Mechanism , But generally speaking, it lacks the convenience of asynchronous functions .( In the subsequent stages, we will optimize OpenFunction Event framework to solve the defects of synchronization function )

thus it can be seen , Different types of Serverless Functions have their own mission scenarios , For example, an ordered control flow function needs to be handled by synchronous functions rather than asynchronous functions .


Serverless It brings us the expected ability to quickly disassemble and reconstruct business scenarios .

As shown in this case ,OpenFunction Not only with Serverless The method improves log processing 、 Flexibility of alarm notification link , You can also use the functional framework to connect the common functions Kafka The complex configuration steps are simplified into semantically explicit code logic . meanwhile , We are also evolving OpenFunction, It will be implemented by itself in a later version Serverless Ability to drive its own components .

Reference link [1]Notification Manager:



[4] Enable pluggable components :



[7] Multi cluster function :

[8]Prerequisites file :

[9]Install file :

[10] Create and deploy WordPress:

[11] To configure Slack notice :

[12]OpenFunction Context Spec:

[13]OpenFunction Samples:

[14]OpenFunction Samples:



[17]OpenFunction Samples:


Fangtian OpenFunction The open source community Maintainer

This article by the blog one article many sends the platform OpenWrite Release !

Please bring the original link to reprint ,thank
Similar articles