ksqlDB
This tutorial shows how to integrate Upstash Kafka with ksqlDB
ksqlDB is a SQL interface for performing stream processing over the Kafka environment.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
ksqlDB Setup
Upstash does not have a managed ksqlDB. Therefore, set up ksqlDB on a docker container and replace UPSTASH-KAFKA-* placeholders with your cluster information.
First, download and install Docker.
Create a docker-compose.yml
file as below:
version: "2"
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "UPSTASH_KAFKA_ENDPOINT"
KSQL_SASL_MECHANISM: "SCRAM-SHA-256"
KSQL_SECURITY_PROTOCOL: "SASL_SSL"
KSQL_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="UPSTASH_KAFKA_USERNAME" password="UPSTASH_KAFKA_PASSWORD";'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
Open your CLI, navigate to the folder that includes the docker-compose.yml file
you created and start ksqlDB by running docker-compose up
.
When you check your Kafka cluster from console, you will see new topics created after you start ksqlDB.
Streaming From One Topic to Another Topic
Implementing a word count example project can be done with both ksqlDB CLI and
Java client. In both ways, it is going to be done by consecutive streams. The
operations of the process will be as
receive input > split into array > convert to rows > count occurrences
.
Using ksqlDB CLI
Start the ksqlDB CLI by running the following command:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Create the first stream, which reads from the “input” topic:
ksql> CREATE STREAM source_stream (sentence VARCHAR) WITH (kafka_topic='input', value_format='json', partitions=1);
Create the second stream, which reads from source_stream, splits the string to an array, and writes to the split_stream topic.
ksql> CREATE STREAM split_stream AS SELECT regexp_split_to_array(sentence, ' ') as word_array FROM source_stream EMIT CHANGES;
Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.
ksql> CREATE STREAM explode_stream AS SELECT explode(word_array) as words FROM split_stream EMIT CHANGES;
Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.
ksql> CREATE TABLE output AS SELECT words as word, count(words) as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;
You can check what you have created so far by running the following commands on ksqlDB CLI.
ksql> show tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
--------------------------------------------------------------------
OUTPUT | OUTPUT | KAFKA | JSON | false
--------------------------------------------------------------------
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
EXPLODE_STREAM | EXPLODE_STREAM | KAFKA | JSON | false
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
SOURCE_STREAM | input | KAFKA | JSON | false
SPLIT_STREAM | SPLIT_STREAM | KAFKA | JSON | false
------------------------------------------------------------------------------------------
Using Java Client
Project Setup
:pushpin: Note If you already have a project and want to implement Upstash Kafka and ksqlDB integration into it, you can skip this section and continue with Add Ksqldb and Kafka into the Project.
Install Maven to your machine by following Maven Installation Guide
Run mvn –version
in a terminal or a command prompt to make sure you have Maven
downloaded.
It should print out the version of the Maven you have:
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running cd <folder path>
Run the following command:
mvn archetype:generate -DgroupId=com.kafkaksqldbinteg.app -DartifactId=kafkaksqldbinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
Add ksqlDB and Kafka into the Project
Open the project folder using an IDE with maven plugins such as Intellij, Visual
Studio, Eclipse, etc. Add ksqlDB into the pom.xml
file.
<repositories>
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.3.0</version>
</dependency>
</dependencies>
Streaming
Import the following packages.
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import java.util.concurrent.CompletableFuture;
Create a ksqlDB client first.
String KSQLDB_SERVER_HOST = "localhost";
int KSQLDB_SERVER_HOST_PORT = 8088;
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);
Create the first stream, which reads from “input” topic:
String SOURCE_STREAM = "CREATE STREAM IF NOT EXISTS source_stream (sentence VARCHAR)" +
" WITH (kafka_topic='input', value_format='json', partitions=1);";
CompletableFuture<ExecuteStatementResult> result =
client.executeStatement(SOURCE_STREAM);
System.out.println(result);
Create the second stream, which reads from source_stream, split the string to an array, and writes to the split_stream topic.
String SPLIT_STREAM = "CREATE STREAM IF NOT EXISTS split_stream " +
"AS SELECT regexp_split_to_array(sentence, ' ') " +
"as word_array FROM source_stream EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result1 =
client.executeStatement(SPLIT_STREAM);System.out.println(result1);
Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.
String EXPLODE_STREAM = "CREATE STREAM IF NOT EXISTS explode_stream " +
"AS SELECT explode(word_array) " +
"as words FROM split_stream EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result2 =
client.executeStatement(EXPLODE_STREAM);System.out.println(result2);
Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.
String OUTPUT_TABLE = "CREATE TABLE output " +
"AS SELECT words as word, count(words) " +
"as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;";
CompletableFuture<ExecuteStatementResult> result3 =
client.executeStatement(OUTPUT_TABLE);System.out.println(result3);
Results
The word count stream we created above is taking input sentences in JSON format from the “input” topic and sends word count results to the “OUTPUT” topic.
You can both send input and observe the output on console.
Send the input sentence to the “input” topic. The key can be a random string,
but since we defined “sentence” as a field while creating the source_stream
,
the value must be a JSON that includes “sentence” as a key for this use case:
{
“sentence”: “This is an example sentence”
}
Once you send this message to “input” topic, you can observe the result at “OUTPUT” topic as following:
Timestamp Key Value
2022-12-06 23:39:56 This {"OCCURRENCE":1}
2022-12-06 23:39:56 is {"OCCURRENCE":1}
2022-12-06 23:39:56 an {"OCCURRENCE":1}
2022-12-06 23:39:56 example {"OCCURRENCE":1}
2022-12-06 23:39:56 sentence {"OCCURRENCE":1}
Was this page helpful?