Upstash Kafka with Apache Pinot
This tutorial shows how to integrate Upstash Kafka with Apache Pinot
Apache Pinot is a real-time distributed OLAP (Online Analytical Processing) data store. It aims to make users able to execute OLAP queries with low latency. It can consume the data from batch data sources or streaming sources, which can be Upstash Kafka.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
Create one topic by following the creating topic steps. This topic is going to be source for Apache Pinot table. Let’s name it “transcript” for this example tutorial.
Apache Pinot Setup
You need a host to run Apache Pinot. For this quick setup, you can run it on your local machine.
First, download Docker. Running in docker container is much better option for running Apache Pinot than running it locally.
Once you have docker on your machine, you can follow the steps on Getting Started run Apache Pinot in docker.
In short, you will need to pull the Apache Pinot image by running following command.
docker pull apachepinot/pinot:latest
Create a file named docker-compose.yml with the following content.
version: "3.7"
services:
pinot-zookeeper:
image: zookeeper:3.5.6
container_name: pinot-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
pinot-controller:
image: apachepinot/pinot:0.12.0
command: "StartController -zkAddress pinot-zookeeper:2181"
container_name: pinot-controller
restart: unless-stopped
ports:
- "9000:9000"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log"
depends_on:
- pinot-zookeeper
pinot-broker:
image: apachepinot/pinot:0.12.0
command: "StartBroker -zkAddress pinot-zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
ports:
- "8099:8099"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.12.0
command: "StartServer -zkAddress pinot-zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
ports:
- "8098:8098"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log"
depends_on:
- pinot-broker
Go into the directory from your terminal and run the following command to start Pinot.
docker-compose --project-name pinot-demo up
Now, Apache Pinot should be up and running. You can check it by running:
docker container ls
You should see the output like this:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ba5cb0868350 apachepinot/pinot:0.9.3 "./bin/pinot-admin.s…" About a minute ago Up About a minute 8096-8099/tcp, 9000/tcp pinot-server
698f160852f9 apachepinot/pinot:0.9.3 "./bin/pinot-admin.s…" About a minute ago Up About a minute 8096-8098/tcp, 9000/tcp, 0.0.0.0:8099->8099/tcp, :::8099->8099/tcp pinot-broker
b1ba8cf60d69 apachepinot/pinot:0.9.3 "./bin/pinot-admin.s…" About a minute ago Up About a minute 8096-8099/tcp, 0.0.0.0:9000->9000/tcp, :::9000->9000/tcp pinot-controller
54e7e114cd53 zookeeper:3.5.6 "/docker-entrypoint.…" About a minute ago Up About a minute 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 8080/tcp pinot-zookeeper
Now, you should add table to your Pinot to store the data streamed from Kafka topic.
You need to open http://localhost:9000/ on your browser.
Click on “Tables” section.
First, click on “Add Schema” and fill it until you see the following JSON as your schema config.
{
"schemaName": "transcript",
"dimensionFieldSpecs": [
{
"name": "studentID",
"dataType": "INT"
},
{
"name": "firstName",
"dataType": "STRING"
},
{
"name": "lastName",
"dataType": "STRING"
},
{
"name": "gender",
"dataType": "STRING"
},
{
"name": "subject",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "timestamp",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Click save and click to “Add Realtime Table” since we will stream the data real-time.
On this page, table name must be the same name with the schema name, which is “transcript” in this case.
Then, go below on this page and replace “segmentsConfig” and “tableIndexConfig” sections in the table config on your browser with the following JSON. Do not forget to replace UPSTASH-KAFKA-* placeholders with your cluster information.
{
"segmentsConfig": {
"timeColumnName": "timestampInEpoch",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1",
"replication": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "transcript",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "UPSTASH-KAFKA-ENDPOINT:9092",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";",
"sasl.mechanism": "SCRAM-SHA-256",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
}
}
Test the Setup
Now, let’s create some events to our Kafka topic. Go to Upstash console, click on your cluster then Topics, click “transcript”. Select Messages tab then click Produce a new message. Send a message in JSON format like the below:
{
"studentID": 205,
"firstName": "Natalie",
"lastName": "Jones",
"gender": "Female",
"subject": "Maths",
"score": 3.8,
"timestampInEpoch": 1571900400000
}
Now, go back to your Pinot console on your browser. Navigate to “Query Console” from the left side bar. When you click on “transcript” table, you will see the result of the following query automatically.
select * from transcript limit 10
The query result should be as following:
Links
Was this page helpful?