ksqlDB is a SQL interface for performing stream processing over the Kafka environment.

Upstash Kafka Setup

Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.

ksqlDB Setup

Upstash does not have a managed ksqlDB. Therefore, set up ksqlDB on a docker container and replace UPSTASH-KAFKA-* placeholders with your cluster information.

First, download and install Docker.

Create a docker-compose.yml file as below:

version: "2"

services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.28.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "UPSTASH_KAFKA_ENDPOINT"
      KSQL_SASL_MECHANISM: "SCRAM-SHA-256"
      KSQL_SECURITY_PROTOCOL: "SASL_SSL"
      KSQL_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="UPSTASH_KAFKA_USERNAME" password="UPSTASH_KAFKA_PASSWORD";'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.28.2
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Open your CLI, navigate to the folder that includes the docker-compose.yml file you created and start ksqlDB by running docker-compose up.

When you check your Kafka cluster from console, you will see new topics created after you start ksqlDB.

Streaming From One Topic to Another Topic

Implementing a word count example project can be done with both ksqlDB CLI and Java client. In both ways, it is going to be done by consecutive streams. The operations of the process will be as receive input > split into array > convert to rows > count occurrences.

Using ksqlDB CLI

Start the ksqlDB CLI by running the following command:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Create the first stream, which reads from the “input” topic:

ksql> CREATE STREAM source_stream (sentence VARCHAR) WITH (kafka_topic='input', value_format='json', partitions=1);

Create the second stream, which reads from source_stream, splits the string to an array, and writes to the split_stream topic.

ksql> CREATE STREAM split_stream AS SELECT regexp_split_to_array(sentence, ' ') as word_array FROM source_stream EMIT CHANGES;

Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.

ksql> CREATE STREAM explode_stream AS SELECT explode(word_array) as words FROM split_stream EMIT CHANGES;

Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.

ksql> CREATE TABLE output AS SELECT words as word, count(words) as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;

You can check what you have created so far by running the following commands on ksqlDB CLI.

ksql> show tables;

Table Name  | Kafka Topic	| Key Format	| Value Format	| Windowed
--------------------------------------------------------------------
OUTPUT  	| OUTPUT		| KAFKA  		| JSON			| false
--------------------------------------------------------------------

ksql> show streams;

Stream Name			| Kafka Topic					| Key Format	| Value Format	| Windowed
------------------------------------------------------------------------------------------
EXPLODE_STREAM		| EXPLODE_STREAM				| KAFKA			| JSON	| false
KSQL_PROCESSING_LOG | default_ksql_processing_log	| KAFKA			| JSON  | false
SOURCE_STREAM  		| input  						| KAFKA			| JSON  | false
SPLIT_STREAM  		| SPLIT_STREAM  				| KAFKA			| JSON  | false
------------------------------------------------------------------------------------------

Using Java Client

Project Setup

:pushpin: Note If you already have a project and want to implement Upstash Kafka and ksqlDB integration into it, you can skip this section and continue with Add Ksqldb and Kafka into the Project.

Install Maven to your machine by following Maven Installation Guide

Run mvn –version in a terminal or a command prompt to make sure you have Maven downloaded.

It should print out the version of the Maven you have:

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

To create the Maven project;

Go into the folder that you want to create the project in your terminal or command prompt by running cd <folder path>

Run the following command:

mvn archetype:generate -DgroupId=com.kafkaksqldbinteg.app -DartifactId=kafkaksqldbinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

Add ksqlDB and Kafka into the Project

Open the project folder using an IDE with maven plugins such as Intellij, Visual Studio, Eclipse, etc. Add ksqlDB into the pom.xml file.

<repositories>
  <repository>
  <id>confluent</id>
  <name>confluent-repo</name>
  <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>
<dependencies>
  <dependency>
  <groupId>io.confluent.ksql</groupId>
  <artifactId>ksqldb-api-client</artifactId>
  <version>7.3.0</version>
  </dependency>
</dependencies>

Streaming

Import the following packages.

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;

import java.util.concurrent.CompletableFuture;

Create a ksqlDB client first.

String KSQLDB_SERVER_HOST = "localhost";
int KSQLDB_SERVER_HOST_PORT = 8088;
ClientOptions options = ClientOptions.create()
  .setHost(KSQLDB_SERVER_HOST)
  .setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);

Create the first stream, which reads from “input” topic:

String SOURCE_STREAM = "CREATE STREAM IF NOT EXISTS source_stream (sentence VARCHAR)" +
        " WITH (kafka_topic='input', value_format='json', partitions=1);";
CompletableFuture<ExecuteStatementResult> result =
        client.executeStatement(SOURCE_STREAM);
System.out.println(result);

Create the second stream, which reads from source_stream, split the string to an array, and writes to the split_stream topic.

String SPLIT_STREAM = "CREATE STREAM IF NOT EXISTS split_stream " +
  "AS SELECT regexp_split_to_array(sentence, ' ') " +
  "as word_array FROM source_stream EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result1 =
  client.executeStatement(SPLIT_STREAM);System.out.println(result1);

Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.

String EXPLODE_STREAM = "CREATE STREAM IF NOT EXISTS explode_stream " +
  "AS SELECT explode(word_array) " +
  "as words FROM split_stream EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result2 =
  client.executeStatement(EXPLODE_STREAM);System.out.println(result2);

Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.

String OUTPUT_TABLE = "CREATE TABLE output " +
  "AS SELECT words as word, count(words) " +
  "as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result3 =
  client.executeStatement(OUTPUT_TABLE);System.out.println(result3);

Results

The word count stream we created above is taking input sentences in JSON format from the “input” topic and sends word count results to the “OUTPUT” topic.

You can both send input and observe the output on console.

Send the input sentence to the “input” topic. The key can be a random string, but since we defined “sentence” as a field while creating the source_stream, the value must be a JSON that includes “sentence” as a key for this use case:

{
“sentence”: “This is an example sentence”
}

Once you send this message to “input” topic, you can observe the result at “OUTPUT” topic as following:

Timestamp				Key			Value
2022-12-06 23:39:56		This		{"OCCURRENCE":1}
2022-12-06 23:39:56 	is			{"OCCURRENCE":1}
2022-12-06 23:39:56 	an			{"OCCURRENCE":1}
2022-12-06 23:39:56		example 	{"OCCURRENCE":1}
2022-12-06 23:39:56		sentence	{"OCCURRENCE":1}

Was this page helpful?