Connecting to Upstash Kafka using any Kafka client is very straightforward. If you do not have a Kafka cluster and/or topic already, follow these steps to create one.

After creating a cluster and a topic, just go to cluster details page on the Upstash Console and copy bootstrap endpoint, username and password.

Then replace following parameters in the code snippets of your favourite Kafka client or language below.

  • {{ BOOTSTRAP_ENDPOINT }}
  • {{ UPSTASH_KAFKA_USERNAME }}
  • {{ UPSTASH_KAFKA_PASSWORD }}
  • {{ TOPIC_NAME }}

Create a Topic

const { Kafka } = require("kafkajs");

const kafka = new Kafka({ brokers: ["{{ BOOTSTRAP_ENDPOINT }}"], sasl: {
mechanism: "scram-sha-512", username: "{{ UPSTASH_KAFKA_USERNAME }}", password:
"{{ UPSTASH_KAFKA_PASSWORD }}", }, ssl: true, });

const admin = kafka.admin();

const createTopic = async () => { await admin.connect(); await
admin.createTopics({ validateOnly: false, waitForLeaders: true, topics: [ {
topic: "{{ TOPIC_NAME }}", numPartitions: partitions, replicationFactor:
replicationFactor, }, ], }); await admin.disconnect(); }; createTopic();

Produce a Message

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
  sasl: {
    mechanism: "scram-sha-512",
    username: "{{ UPSTASH_KAFKA_USERNAME }}",
    password: "{{ UPSTASH_KAFKA_PASSWORD }}",
  },
  ssl: true,
});

const producer = kafka.producer();

const produce = async () => {
  await producer.connect();
  await producer.send({
    topic: "{{ TOPIC_NAME }}",
    messages: [{ value: "Hello Upstash!" }],
  });
  await producer.disconnect();
};
produce();

Consume Messages

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
  sasl: {
    mechanism: "scram-sha-512",
    username: "{{ UPSTASH_KAFKA_USERNAME }}",
    password: "{{ UPSTASH_KAFKA_PASSWORD }}",
  },
  ssl: true,
});

const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
const consume = async () => {
  await consumer.connect();
  await consumer.subscribe({
    topic: "{{ TOPIC_NAME }}",
    fromBeginning: true,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        topic: topic,
        partition: partition,
        message: JSON.stringify(message),
      });
    },
  });
};
consume();