You can find the Github Repository here.


npm install @upstash/kafka


  1. Go to upstash and select your cluster.
  2. Copy the REST API secrets at the bottom of the page
import { Kafka } from "@upstash/kafka";

const kafka = new Kafka({

Produce a single message

const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message);
const res = await p.produce("<my.topic>", message, {
  partition: 1,
  timestamp: 12345,
  key: "<custom key>",
  headers: [{ key: "traceId", value: "85a9f12" }],

Produce multiple messages.

The same options from the example above can be set for every message.

const p = kafka.producer();
const res = await p.produceMany([
    topic: "my.topic",
    value: { hello: "world" },
    // ...options
    topic: "another.topic",
    value: "another message",
    // ...options


The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That’s why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.

const c = kafka.consumer();
const messages = await c.consume({
  consumerGroupId: "group_1",
  instanceId: "instance_1",
  topics: ["test.topic"],
  autoOffsetReset: "earliest",

More examples can be found in the docstring

Commit manually

While consume can handle committing automatically, you can also use Consumer.commit to manually commit.

const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";

const c = kafka.consumer();
const messages = await c.consume({
  topics: [topic],
  autoCommit: false,

for (const message of messages) {
  // message handling logic

  await c.commit({
    offset: {
      topic: message.topic,
      partition: message.partition,
      offset: message.offset,


You can also manage offsets manually by using Consumer.fetch

const c = kafka.consumer();
const messages = await c.fetch({
  topic: "greeting",
  partition: 3,
  offset: 42,
  timeout: 1000,


See examples as well as various examples in the docstrings of each method.