In this tutorial series, we will show how to build an end to end real time analytics system. We will stream the traffic (click) events from our web application to Upstash Kafka then we will analyse it on real time. We will implement one simply query with different stream processing tools:

SELECT city, count() FROM page_views where event_time > now() - INTERVAL 15 MINUTE group by city

Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.

If you do not have already set up Kafka pipeline, see the first part of series where we did the set up our pipeline including Upstash Kafka and Cloudflare Workers (or Vercel).

In this part of the series, we will showcase how to use ClickHouse to run a query on a Kafka topic.

Clickhouse Setup

You can create a managed service from Clickhouse Cloud with a 30 days free trial. Select your region and enter a name for your service. For simplicity, you can allow access to the service from anywhere. If you want to restrict to the IP addresses here is the list of Upstash addresses that needs permission:

52.48.149.7
52.213.40.91
174.129.75.41
34.195.190.47
52.58.175.235
18.158.44.120
63.34.151.162
54.247.137.96
3.78.151.126
3.124.80.204
34.236.200.33
44.195.74.73

Create a table

On Clickhouse service screen click on Open SQL console. Click on + to open a new query window and run the following query to create a table:

CREATE TABLE page_views
(
    country String,
    city String,
    region String,
    url String,
    ip String,
    event_time DateTime DEFAULT now()
)
ORDER BY (event_time)

 

Kafka Setup

We will create an Upstash Kafka cluster. Upstash offers serverless Kafka cluster with per message pricing. Select the same (or nearest) region with region of Clickhouse for the best performance.  

Also create a topic whose messages will be streamed to Clickhouse.  

Connector Setup

We will create a connector on Upstash console. Select your cluster and click on Connectors tab. Select Aiven JDBC Connector - Sink

Click next to skip the Config step as we will enter the configuration manually at the third (Advanced) step.

In the third step. copy paste the below config to the text editor:

{
  "name": "kafka-clickhouse",
  "properties": {
    "auto.create": false,
    "auto.evolve": false,
    "batch.size": 10,
    "connection.password": "KqVQvD4HWMng",
    "connection.url": "jdbc:clickhouse://a8mo654iq4e.eu-central-1.aws.clickhouse.cloud:8443/default?ssl=true",
    "connection.user": "default",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "errors.deadletterqueue.topic.name": "dlqtopic",
    "insert.mode": "insert",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "pk.mode": "none",
    "table.name.format": "page_views",
    "topics": "mytopic",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true
  }
}

Replace the following attributes:

  • “name” : Name your connector.
  • “connection.password”: Copy this from your Clickhouse dashboard. (Connect > View connection string)
  • “connection.url”: Copy this from your Clickhouse dashboard. (Connect > View connection string)
  • “connection.user”: Copy this from your Clickhouse dashboard. (Connect > View connection string)
  • “errors.deadletterqueue.topic.name”: Give a name for your dead letter topic. It will be auto created.
  • “topics”: Enter the name of the topic that you have created.

Note that there should be ?ssl=true as a parameter for the connection.url.

Click the Connect button to create the connector.

Test and Run

Clickhouse expects a schema together with the message payload. We need to go back to the set up step and update the message object to include schema as below:

const message = {
  schema: {
    type: "struct",
    optional: false,
    version: 1,
    fields: [
      {
        field: "country",
        type: "string",
        optional: false,
      },
      {
        field: "city",
        type: "string",
        optional: false,
      },
      {
        field: "region",
        type: "string",
        optional: false,
      },
      {
        field: "url",
        type: "string",
        optional: false,
      },
      {
        field: "ip",
        type: "string",
        optional: false,
      },
    ],
  },
  payload: {
    country: req.geo?.country,
    city: req.geo?.city,
    region: req.geo?.region,
    url: req.url,
    ip: req.headers.get("x-real-ip"),
    mobile: req.headers.get("sec-ch-ua-mobile"),
    platform: req.headers.get("sec-ch-ua-platform"),
    useragent: req.headers.get("user-agent"),
  },
};

It is not ideal to send the schema together with payload. Schema registry is a solution. Upstash will launch managed schema registry service soon.

After deploying the changes (Cloudflare Workers or Vercel function), visit your web app to generate traffic to Kafka.

Now, go to the Clickhouse console. Connect > Open SQL console. Click on page_views (your table’s name) on the left menu. You should see the table is populated like below:

Also run the following query to get most popular cities in last 15 minutes:

SELECT city, count() FROM page_views where event_time > now() - INTERVAL 15 MINUTE group by city

It should return something like below: