# Export Kubernetes Pod Logs to Upstash Kafka via FluentBit

> **Source:** https://upstash.com/blog/kafka-fluentbit
> **Date:** 2022-12-14
> **Author(s):** Enes Ozcan
> **Reading time:** 7 min read
> **Tags:** kafka, fluentbit, kubernetes
> **Format:** text/markdown — machine-readable content for agents and LLMs

---

FluentBit is known to be a lightweight log processor and exporter. Having tens of input and output connectors, FluentBit
is one of the most popular options when developers need to keep track of service logs, process, and export them to a
backend or a persistent store.

Kafka output plugin comes along with the built-in FluentBit connectors. That is, FluentBit can read logs from an arbitrary
source and export them to a Kafka topic. Upstash, on the other hand, provides serverless, fully managed Kafka cluster with
pay-per-message model. That rescues developers from the burden of managing, scaling and maintaining clusters. Besides,
its price [scales to zero](/docs/kafka/pricing) if there is no message - as a real serverless offering!
The last but not the least, the first 10.000 messages per day are free of charge.

In this article, we will export the logs of particular Kubernetes pods to [Upstash Kafka](/docs/kafka)
via FluentBit, and then consume, filter and stream these logs to clients through a Go HTTP server.

Before start, make sure you have an Upstash account ([sign up free](https://console.upstash.com/auth/sign-in) if you
haven’t already), access to a Kubernetes cluster (minikube or docker-desktop is fine), and `helm` installed.

## Setup

### Create Upstash Kafka cluster and new topic

Log in to the Upstash console, navigate to Kafka tab and create a new Kafka cluster and a topic called `logs`, in seconds!

![create-cluster](https://user-images.githubusercontent.com/44522401/206859047-edb7ec4d-e973-4779-8d1d-ce81dc8587e9.gif)

### Deploy FluentBit to Kubernetes

Let’s use official charts to deploy FluentBit.

```shell
$ helm repo add fluent https://fluent.github.io/helm-charts
```

Before starting the installation, we need to change a few [default values](https://github.com/fluent/helm-charts/blob/main/charts/fluent-bit/values.yaml)
of the chart, namely:

- `config.inputs`: configuration for log inputs, that is, which logs we want to export.
- `config.filters`: configuration for log processing before exporting them to output plugins.
- `config.outputs`: configuration for log sinks.

Create new files for each and fill them with the content below:

`input.conf`:

```
[INPUT]
    Name tail
    Path /var/log/containers/*_upstashed_*.log
    multiline.parser docker, cri
    Tag kube.*
    Mem_Buf_Limit 5MB
    Skip_Long_Lines On
```

Here we tell FluentBit to use `tail` plugin, which observes the new lines appended to the files specified with Path.
Notice `_upstashed_` value in the `Path` key. Log files are named as `<pod-name>_<namespace>_<container-name-container-id>.log`
and in this blog post, we want FluentBit to export logs from the pods that reside under `upstashed` namespace only.

`filter.conf`

```
[FILTER]
    Name kubernetes
    Match kube.*
    Merge_Log On
    Keep_Log Off
    K8S-Logging.Parser On
    K8S-Logging.Exclude On

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under kubernetes
    Add_prefix k8s_
```

Here we use the built-in `kubernetes` filter of FluentBit - which creates a structured output containing log line,
timestamp, pod info, container info, etc. This is an example output generated by `kubernetes` filter:

```json
{
  "@timestamp": 1670672614.142579,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:43:34.1425787Z",
  "kubernetes": {
    "pod_name": "<pod-name>",
    "namespace_name": "<namespace>",
    "pod_id": "<id>",
    "labels": {
      "app": "<foo>",
      "pod-template-hash": "<bar>"
    },
    "host": "docker-desktop",
    "container_name": "<baz>",
    "docker_id": "<some-id>",
    "container_hash": "<some-hash>",
    "container_image": "<some-image>"
  }
}
```

That’s cool. But we want to move fields under `"kubernetes"` key to the outer block. That’s because we want to refer
to these values, namely `pod_name` when sending logs to Kafka. Since FluentBit’s Kafka output does not support the
[record accessor](https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/classic-mode/record-accessor)
feature (which allows accessing inner values of JSON records), we apply such a workaround. The second filter, named `nest`,
helps us to process the JSON in that sense. Also, it will add `“k8s_”` prefix to all these fields under `"kubernetes"` key.
So the message becomes:

```json
{
  "@timestamp": 1670672943.712912,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:49:03.7129124Z",
  "k8s_pod_name": "<pod-name>",
  "k8s_namespace_name": "<namespace>",
  "k8s_pod_id": "<id>",
  "k8s_labels": {
    "app": "<foo>",
    "pod-template-hash": "<bar>"
  },
  "k8s_host": "docker-desktop",
  "k8s_container_name": "<baz>",
  "k8s_docker_id": "<some-id>",
  "k8s_container_hash": "<some-hash>",
  "k8s_container_image": "<some-image>"
}
```

Alright! We now want these logs to be exported to Upstash Kafka. Let’s configure FluentBit output plugin as the last step:

`output.conf`

```
[OUTPUT]
    Name kafka
    Match kube.*
    Brokers <broker-provided-by-upstash>
    Topics logs
    Message_Key_Field k8s_pod_name
    rdkafka.security.protocol sasl_ssl
    rdkafka.sasl.mechanism SCRAM-SHA-256
    rdkafka.sasl.username <username-provided-by-upstash>
    rdkafka.sasl.password <password-provided-by-upstash>
```

Remember that we created `logs` topic when creating Upstash Kafka cluster. Hence, use this topic to export pod logs.
Also, use `k8s_pod_name` value as the message key during export.

Now we are ready to deploy FluentBit:

```shell
$ helm install fluent-bit -n fluent-bit --create-namespace fluent/fluent-bit \
  --set-file config.inputs=input.conf \
  --set-file config.filters=filter.conf \
  --set-file config.outputs=output.conf
```

That will deploy fluent-bit daemonsets:

```shell
$ kubectl get ds -n fluent-bit

NAME         DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
fluent-bit   1         1         1       1            1           <none>          19h
```

Check if Kafka connection is established:

```shell
$ kubectl logs -f ds/fluent-bit -n fluent-bit

…
…
[2022/12/10 11:49:01] [ info] [output:kafka:kafka.0] brokers='<broker-url>' topics='logs'
…
…
```

### Create a new pod of which logs are to be exported

Remember we configure FluentBit input to observe logs from `upstashed` namespace only. Let’s create a pod under this
namespace that produces random logs:

```shell
$ kubectl create ns upstashed
$ kubectl create deployment random-logger -n upstashed --image=chentex/random-logger:v1.0.1 -- /entrypoint.sh 7500 7500 1000
```

This deployment produces random logs and prints to stdout. The arguments are `minLogInterval`, `maxLogInterval` and `numberOfLogs`
respectively. So the above deployment will produce a log every 7.5 seconds and 1000 lines in total:

```shell
$ kubectl logs -f deployment/random-logger -n upstashed

2022-12-10T15:09:30+0000 ERROR An error is usually an exception that has been caught and not handled.
2022-12-10T15:09:37+0000 INFO This is less important than debug log and is often used to provide context in the current task.
2022-12-10T15:09:45+0000 DEBUG This is a debug log that shows a log that can be ignored.
2022-12-10T15:09:52+0000 WARN A warning that should be ignored is usually at this level and should be actionable.
```

Now navigate back to Upstash console to see if the pod logs arrive:

![logs-gif](https://user-images.githubusercontent.com/44522401/206859094-2428d35d-4506-4118-824b-bce8fbfe1156.gif)

It’s that easy!

## Consume Logs from Upstash Kafka

Upstash Kafka in the above setup behaves like a buffer for the collected logs. We can then configure a sink for these
messages to be processed or persisted. During all these processes, let's say we also want to give some clients access to
real-time logs, but not the entire message. For instance, we do not want clients to see `k8s_container_image` field
of the Kafka messages.

### Create HTTP Server

After adding `Consumer` snippet for Go provided by Upstash console under `Details` tab, start an HTTP server that handles
requests at `/logs` endpoint and streams messages to clients:

```go
func main() {
    dialer := getKafkaDialer()
    http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
        reader := getKafkaReader(dialer)
        defer reader.Close()

        // Errors are ignored for brevity. Note that this is allowed only
        // in blogpost code snippets! Never ignore errors in Golang!

        w.Header().Set("Transfer-Encoding", "chunked")
        w.Write([]byte("------------ Streaming Logs ------------\n"))

        flusher, _ := w.(http.Flusher)
        flusher.Flush()
        for {
            message, err := reader.ReadMessage(r.Context())
            if err != nil {
                // should have been handled and responded properly
                // for context.Canceled and other errors.
                return
            }
            // include only `log` field from the message value
            resp := struct {
                Log string `json:"log"`
            }{}
            _ = json.Unmarshal(message.Value, &resp)
            w.Write(message.Key)
            w.Write([]byte{':', '\t'})
            w.Write([]byte(resp.Log))
            flusher.Flush()
        }
    })
    http.ListenAndServe(":8080", nil)
}
```

This is a minimal HTTP server with a single endpoint `/logs`. The handler reads from `logs` Upstash Kafka topic, then
sends each log line to clients along with the pod name received as the message key.

Let's compare `kubectl logs` vs stream logs from Upstash!

![final-gif](https://user-images.githubusercontent.com/44522401/206859124-3a598538-f588-4a2a-9581-aa2007dcebff.gif)

The command running in the upper terminal tab is

```shell
kubectl logs -f deployment/random-logger -n upstashed
```

while in the lower tab it is

```shell
curl localhost:8080/logs
```

The upper tab retrieves logs from my local kubernetes cluster, where lower logs are received from Upstash Kafka running
in `eu-west`. Both are almost synchronized. Upstash rocks, doesn't it?

## Conclusion

Logs usually contain hints when we encounter an error or unexpected behavior in deployments and hence become the first
place to look by developers during diagnosis. Thus, instead of being deleted and forgotten, logs should be cared for and consumed
properly.

In this article, we exported particular Kubernetes pod logs to the Upstash Kafka topic, from where other consumers can read,
process, and act on them. Moreover, after applying a simple filter we exposed these real-time logs to HTTP clients - which do
not necessarily have access to neither Kubernetes cluster nor Kafka topics.

Now that Kubernetes pod logs are buffered in Upstash Kafka, one can further benefit from this and consume them at other
backends, storage drivers, dashboards, observability tools, and the like.