Kafka Streams is a client library, which streams data from one Kafka topic to another.

Upstash Kafka Setup

Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.

Create two topics by following the creating topic steps. Let’s name first topic “input”, since we are going to stream this topic to other one, which we can name it as “output”.

Project Setup

If you already have a project and want to use Kafka Streams with Upstash Kafka in it, you can skip this section and continue with Add Kafka Streams into the Project.

Install Maven to your machine by following Maven Installation Guide.

Run mvn –version in a terminal or in a command prompt to make sure you have Maven downloaded.

It should print out the version of the Maven you have:

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

To create the Maven project;

Go into the folder that you want to create the project in your terminal or command prompt by running cd <folder path>

Run the following command:

mvn archetype:generate -DgroupId=com.kafkastreamsinteg.app -DartifactId=kafkastreamsinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

Add Kafka Streams into the Project

Open the project folder by using an IDE which has maven plugin such as Intellij, Visual Studio, Eclipse etc. Add following dependencies into the dependencies tag in pom.xml file.

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>3.3.1</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-reload4j</artifactId>
  <version>2.0.3</version>
</dependency>

Streaming From One Topic to Another Topic

Import the following packages first:

import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;

Define the names of the topics you are going to work on:

String inputTopic = "input";
String outputTopic = "output";

Create the following properties for Kafka Streams and replace UPSTASH-KAFKA-* placeholders with your cluster information.

final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "UPSTASH-KAFKA-ENDPOINT:9092");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"myLastNewProject");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 604800000); // 7 days for internal repartition topic retention period
props.put(StreamsConfig.topicPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), TopicConfig.CLEANUP_POLICY_DELETE); // delete cleanup policy for internal repartition topic
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_BYTES_CONFIG), 268435456); // 256 MB for internal repartition topic retention size

Start the builder for streaming and assign input topic as the source:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(inputTopic);

Apply the following steps to count the words in the sentence sent to input topic and stream the results to the output topic:

Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("countMapping");
materialized.withLoggingDisabled();
source.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
        .groupBy((key, word) -> word, Grouped.as("groupMapping"))
        .count(materialized).toStream().mapValues(Object::toString)
        .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Since “groupby” function causing repartition and creation of a new internal topic to store the groups intermediately, be sure that there is enough partition capacity on your Upstash Kafka. For detailed information about the max partition capacity of Kafka cluster, check plans.

Just to be sure, you can check from topic section on console if the internal repartition topic created successfully when you run your application and send data to input topic. For reference, naming convention for internal repartition topics:

<application id><group name><repartition>

Next, finalize and build the streams builder. Create a topology of your process. It can be viewed by printing.

final Topology topology = builder.build();
System.out.println(topology.describe());

Here is the example topology in this scenario:

Topologies:

Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> groupMapping
<-- KSTREAM-SOURCE-0000000000
Processor: groupMapping (stores: [])
--> groupMapping-repartition-filter
<-- KSTREAM-FLATMAPVALUES-0000000001
Processor: groupMapping-repartition-filter (stores: [])
--> groupMapping-repartition-sink
<-- groupMapping
Sink: groupMapping-repartition-sink (topic: groupMapping-repartition)
<-- groupMapping-repartition-filter

Sub-topology: 1
Source: groupMapping-repartition-source (topics: [groupMapping-repartition])
--> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [countMapping])
--> KTABLE-TOSTREAM-0000000007
<-- groupMapping-repartition-source
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-MAPVALUES-0000000008
<-- KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KTABLE-TOSTREAM-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: output)
<-- KSTREAM-MAPVALUES-0000000008

Finally, start the Kafka Streams that was built and run it.

final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
try {
	streams.start();
	System.out.println("streams started");
	latch.await();
} catch (final Throwable e) {
    System.exit(1);
}

Runtime.getRuntime().addShutdownHook(new Thread("streams-word-count") {
    @Override
	public void run() {
	    streams.close();
		latch.countDown();
	}
});