As a tutorial for this integration, we’ll implement a real time analytics system. We’ll stream the traffic (click) events from our web application to Upstash Kafka. Here’s the implementation for this simple query:

SELECT city, count() FROM kafka_topic_page_views where  timestamp > now() - INTERVAL 15 MINUTE group by city

Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.

We’ll use Clouflare Workers to intercept incoming requests to the website, and run a serverless function.

Kafka Setup

Create an Upstash Kafka cluster and a topic as explained here.

Project Setup

We will use C3 (create-cloudflare-cli) command-line tool to create our application. You can open a new terminal window and run C3 using the prompt below.

npm create cloudflare@latest

This will install the create-cloudflare package, and lead you through setup. C3 will also install Wrangler in projects by default, which helps us testing and deploying the application.

➜  npm create cloudflare@latest
Need to install the following packages:
  create-cloudflare@2.1.0
Ok to proceed? (y) y

using create-cloudflare version 2.1.0

╭ Create an application with Cloudflare Step 1 of 3
│
├ In which directory do you want to create your application?
│ dir ./cloudflare_starter
│
├ What type of application do you want to create?
│ type "Hello World" Worker
│
├ Do you want to use TypeScript?
│ yes typescript
│
├ Copying files from "hello-world" template
│
├ Do you want to use TypeScript?
│ yes typescript
│
├ Retrieving current workerd compatibility date
│ compatibility date 2023-08-07
│
├ Do you want to use git for version control?
│ yes git
│
╰ Application created

We will also install the Upstash Kafka SDK to connect to Kafka.

npm install @upstash/kafka

The Code

You can update the src/index.ts file with the code below:

import { Kafka } from "@upstash/kafka";

export interface Env {
  UPSTASH_KAFKA_REST_URL: string;
  UPSTASH_KAFKA_REST_USERNAME: string;
  UPSTASH_KAFKA_REST_PASSWORD: string;
}

export default {
  async fetch(
    request: Request,
    env: Env,
    ctx: ExecutionContext
  ): Promise<Response> {
    if (new URL(request.url).pathname == "/favicon.ico") {
      return new Response(null, { status: 200 });
    }

    let message = {
      country: request.cf?.country,
      city: request.cf?.city,
      region: request.cf?.region,
      url: request.url,
      ip: request.headers.get("x-real-ip"),
      mobile: request.headers.get("sec-ch-ua-mobile"),
      platform: request.headers.get("sec-ch-ua-platform"),
      useragent: request.headers.get("user-agent"),
    };

    const kafka = new Kafka({
      url: env.UPSTASH_KAFKA_REST_URL,
      username: env.UPSTASH_KAFKA_REST_USERNAME,
      password: env.UPSTASH_KAFKA_REST_PASSWORD,
    });

    const p = kafka.producer();
    // Please update the topic according to your configuration
    const topic = "mytopic";

    ctx.waitUntil(p.produce(topic, JSON.stringify(message)));
    // if you use CF Workers to intercept your existing site, uncomment below
    // return await fetch(request);
    return new Response("My website");
  },
};

Above, we simply parse the request object and send the useful information to Upstash Kafka. You may add/remove information depending on your own requirements.

Configure Credentials

There are two methods for setting up the credentials for Upstash Kafka client. The recommended way is to use Cloudflare Upstash Integration. Alternatively, you can add the credentials manually.

Using the Cloudflare Integration

Access to the Cloudflare Dashboard and login with the same account that you’ve used while setting up the Worker application. Then, navigate to Workers & Pages > Overview section on the sidebar. Here, you’ll find your application listed.

Clicking on the application will direct you to the application details page, where you can perform the integration process. Switch to the Settings tab in the application details, and proceed to Integrations section. You will see various Worker integrations listed. To proceed, click the Add Integration button associated with the Upstash Kafka.

On the Integration page, connect to your Upstash account. Then, select the related cluster from the dropdown menu. Finalize the process by pressing Add Integration button.

Setting up Manually

Navigate to Upstash Console and copy/paste your UPSTASH_KAFKA_REST_URL, UPSTASH_KAFKA_REST_USERNAME and UPSTASH_KAFKA_REST_PASSWORD credentials to your wrangler.toml as below.

[vars]
UPSTASH_KAFKA_REST_URL="REPLACE_HERE"
UPSTASH_KAFKA_REST_USERNAME="REPLACE_HERE"
UPSTASH_KAFKA_REST_PASSWORD="REPLACE_HERE"

Test and Deploy

You can test the function locally with npx wrangler dev

Deploy your function to Cloudflare with npx wrangler deploy

Once the deployment is done, the endpoint of the function will be provided to you.

You can check if logs are collected in Kafka by copying the curl expression from the console:

curl https://<UPSTASH_KAFKA_REST_URL>/consume/GROUP_NAME/GROUP_INSTANCE_NAME/TOPIC \
  -H "Kafka-Auto-Offset-Reset: earliest" -u \
  REPLACE_HERE