ksqlDB
This tutorial shows how to integrate Upstash Kafka with ksqlDB
ksqlDB is a SQL interface for performing stream processing over the Kafka environment.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
ksqlDB Setup
Upstash does not have a managed ksqlDB. Therefore, set up ksqlDB on a docker container and replace UPSTASH-KAFKA-* placeholders with your cluster information.
First, download and install Docker.
Create a docker-compose.yml
file as below:
Open your CLI, navigate to the folder that includes the docker-compose.yml file
you created and start ksqlDB by running docker-compose up
.
When you check your Kafka cluster from console, you will see new topics created after you start ksqlDB.
Streaming From One Topic to Another Topic
Implementing a word count example project can be done with both ksqlDB CLI and
Java client. In both ways, it is going to be done by consecutive streams. The
operations of the process will be as
receive input > split into array > convert to rows > count occurrences
.
Using ksqlDB CLI
Start the ksqlDB CLI by running the following command:
Create the first stream, which reads from the “input” topic:
Create the second stream, which reads from source_stream, splits the string to an array, and writes to the split_stream topic.
Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.
Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.
You can check what you have created so far by running the following commands on ksqlDB CLI.
Using Java Client
Project Setup
:pushpin: Note If you already have a project and want to implement Upstash Kafka and ksqlDB integration into it, you can skip this section and continue with Add Ksqldb and Kafka into the Project.
Install Maven to your machine by following Maven Installation Guide
Run mvn –version
in a terminal or a command prompt to make sure you have Maven
downloaded.
It should print out the version of the Maven you have:
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running cd <folder path>
Run the following command:
Add ksqlDB and Kafka into the Project
Open the project folder using an IDE with maven plugins such as Intellij, Visual
Studio, Eclipse, etc. Add ksqlDB into the pom.xml
file.
Streaming
Import the following packages.
Create a ksqlDB client first.
Create the first stream, which reads from “input” topic:
Create the second stream, which reads from source_stream, split the string to an array, and writes to the split_stream topic.
Next, create the third stream, which reads from split_stream created above, converts word_array to rows, and writes to explode_stream.
Lastly, create a table, which will count the words’ occurrences and write it to the “OUTPUT” topic.
Results
The word count stream we created above is taking input sentences in JSON format from the “input” topic and sends word count results to the “OUTPUT” topic.
You can both send input and observe the output on console.
Send the input sentence to the “input” topic. The key can be a random string,
but since we defined “sentence” as a field while creating the source_stream
,
the value must be a JSON that includes “sentence” as a key for this use case:
Once you send this message to “input” topic, you can observe the result at “OUTPUT” topic as following:
Was this page helpful?