Apache Pinot is a real-time distributed OLAP (Online Analytical Processing) data store. It aims to make users able to execute OLAP queries with low latency. It can consume the data from batch data sources or streaming sources, which can be Upstash Kafka.

Upstash Kafka Setup

Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.

Create one topic by following the creating topic steps. This topic is going to be source for Apache Pinot table. Let’s name it “transcript” for this example tutorial.

Apache Pinot Setup

You need a host to run Apache Pinot. For this quick setup, you can run it on your local machine.

First, download Docker. Running in docker container is much better option for running Apache Pinot than running it locally.

Once you have docker on your machine, you can follow the steps on Getting Started run Apache Pinot in docker.

In short, you will need to pull the Apache Pinot image by running following command.

docker pull apachepinot/pinot:latest

Create a file named docker-compose.yml with the following content.

version: "3.7"
services:
  pinot-zookeeper:
    image: zookeeper:3.5.6
    container_name: pinot-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.12.0
    command: "StartController -zkAddress pinot-zookeeper:2181"
    container_name: pinot-controller
    restart: unless-stopped
    ports:
      - "9000:9000"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log"
    depends_on:
      - pinot-zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.12.0
    command: "StartBroker -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-broker"
    ports:
      - "8099:8099"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.12.0
    command: "StartServer -zkAddress pinot-zookeeper:2181"
    restart: unless-stopped
    container_name: "pinot-server"
    ports:
      - "8098:8098"
    environment:
      JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log"
    depends_on:
      - pinot-broker

Go into the directory from your terminal and run the following command to start Pinot.

docker-compose --project-name pinot-demo up

Now, Apache Pinot should be up and running. You can check it by running:

docker container ls

You should see the output like this:

CONTAINER ID   IMAGE                     COMMAND                  CREATED              STATUS              PORTS                                                                     NAMES
ba5cb0868350   apachepinot/pinot:0.9.3   "./bin/pinot-admin.s…"   About a minute ago   Up About a minute   8096-8099/tcp, 9000/tcp                                                   pinot-server
698f160852f9   apachepinot/pinot:0.9.3   "./bin/pinot-admin.s…"   About a minute ago   Up About a minute   8096-8098/tcp, 9000/tcp, 0.0.0.0:8099->8099/tcp, :::8099->8099/tcp        pinot-broker
b1ba8cf60d69   apachepinot/pinot:0.9.3   "./bin/pinot-admin.s…"   About a minute ago   Up About a minute   8096-8099/tcp, 0.0.0.0:9000->9000/tcp, :::9000->9000/tcp                  pinot-controller
54e7e114cd53   zookeeper:3.5.6           "/docker-entrypoint.…"   About a minute ago   Up About a minute   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 8080/tcp   pinot-zookeeper

Now, you should add table to your Pinot to store the data streamed from Kafka topic.

You need to open http://localhost:9000/ on your browser.

Click on “Tables” section.

First, click on “Add Schema” and fill it until you see the following JSON as your schema config.

{
  "schemaName": "transcript",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Click save and click to “Add Realtime Table” since we will stream the data real-time.

On this page, table name must be the same name with the schema name, which is “transcript” in this case.

Then, go below on this page and replace “segmentsConfig” and “tableIndexConfig” sections in the table config on your browser with the following JSON. Do not forget to replace UPSTASH-KAFKA-* placeholders with your cluster information.

{
  "segmentsConfig": {
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1",
    "replication": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "transcript",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "UPSTASH-KAFKA-ENDPOINT:9092",
      "security.protocol": "SASL_SSL",
      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";",
      "sasl.mechanism": "SCRAM-SHA-256",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.threshold.segment.size": "50M",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  }
}

Test the Setup

Now, let’s create some events to our Kafka topic. Go to Upstash console, click on your cluster then Topics, click “transcript”. Select Messages tab then click Produce a new message. Send a message in JSON format like the below:

{
  "studentID": 205,
  "firstName": "Natalie",
  "lastName": "Jones",
  "gender": "Female",
  "subject": "Maths",
  "score": 3.8,
  "timestampInEpoch": 1571900400000
}

Now, go back to your Pinot console on your browser. Navigate to “Query Console” from the left side bar. When you click on “transcript” table, you will see the result of the following query automatically.

select * from transcript limit 10

The query result should be as following:

Running Pinot in Docker

Apache Pinot Stream Ingestion