In this tutorial series, we will show how to build an end to end real time analytics system. We will stream the traffic (click) events from our web application to Upstash Kafka then we will analyse it on real time. We will implement one simply query with different stream processing tools:

SELECT city, count() FROM kafka_topic_page_views where  timestamp > now() - INTERVAL 15 MINUTE group by city

Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.

If you do not have already set up Kafka pipeline, see the first part of series where we did the set up our pipeline including Upstash Kafka and Cloudflare Workers (or Vercel).

In this part of the series, we will showcase how to use ksqlDB to run a query on a Kafka topic.

ksqlDB Setup

Upstash does not have a managed ksqlDB. In this article we will set up ksqlDB using Docker Compose.

Create a docker-compose.yml file as below:

version: "2"

services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.28.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "REPLACE_YOUR_ENDPOINT"
      KSQL_SASL_MECHANISM: "SCRAM-SHA-256"
      KSQL_SECURITY_PROTOCOL: "SASL_SSL"
      KSQL_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="REPLACE_YOUR_USERNAME" password="REPLACE_YOUR_PASSWORD";'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.28.2
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Don’t forget to replace your endpoint, username and password above. Now start the ksqlDB by running:

docker-compose up

Check your Kafka cluster in Upstash console, you will see new topics auto created by ksqlDB.

Start ksqlDB CLI

We will use ksqlDB cli to create streams and run queries. Start the CLI by running:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Create a stream

You can think of a stream as a schema on top of a Kafka topic. You can query a stream and it will not return until it’s terminated. New updates are pushed to the stream. This type of queries is called push query.

Let’s create a stream:

CREATE STREAM pageViews (country VARCHAR, city VARCHAR, region VARCHAR, url VARCHAR, ip VARCHAR, mobile VARCHAR, platform VARCHAR, useragent VARCHAR )
WITH (kafka_topic='mytopic', value_format='json', partitions=1);

You need to set the same topic which you have created in the the first part of series.

Query the stream (push query)

You can query the stream to get the new updates to your Kafka topic:

SELECT * FROM pageViews EMIT CHANGES;

The query will continue display the updates until you terminate it.

Create a table (materialized view)

Now let’s create a table to query the cities with the highest number of page views in last 10 minutes.

CREATE TABLE topCities AS
  SELECT city, COUNT(*) AS views FROM pageViews
    WINDOW TUMBLING (SIZE 10 MINUTE)
    GROUP BY city
    EMIT CHANGES;

We have used tumbling window to count the views. Check here to learn about the other options.

Query the table (pull query)

We can simply query the table. This is a pull query, it will return the current result and terminate.

select * from topCities

![ksqldb1.png](/img/ksqldb/

.png)

You see the results with the same city but different intervals. If you just need the latest interval (last 10 minutes) then run a query like this:

select * from topCities where WINDOWSTART > (UNIX_TIMESTAMP() - (10*60*1000+1));

In this query, we get the results with a starting window of last 10 minutes.

Resources

Upstash Kafka setup

ksqlDB setup

ksqlDB concepts

Conclusion

We have built a simple data pipeline which collect data from edge to Kafka then create real time reports using SQL. You can easily extend and adapt this example for much more complex architectures and queries.

Was this page helpful?