upstash-kafka is an HTTP/REST based Kafka client built on top of Upstash Kafka REST API.

It is the only connectionless (HTTP based) Kafka client and designed to work with:

  • Serverless functions (AWS Lambda …)
  • Cloudflare Workers (see the example)
  • Fastly Compute@Edge
  • Next.js Edge, Remix, Nuxt …
  • Client side web/mobile applications
  • WebAssembly
  • and other environments where HTTP is preferred over TCP.

Quick Start

Install

npm install @upstash/kafka

Authenticate

Copy URL, username and password from Upstash Console

import { Kafka } from "@upstash/kafka";

const kafka = new Kafka({
  url: "<UPSTASH_KAFKA_REST_URL>",
  username: "<UPSTASH_KAFKA_REST_USERNAME>",
  password: "<UPSTASH_KAFKA_REST_PASSWORD>",
});

Produce

const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const response = await p.produce("TOPIC", message);
const response2 = await p.produce("TOPIC", message, {
  partition: 1,
  timestamp: 4567,
  key: "KEY",
  headers: [{ key: "TRACE-ID", value: "32h67jk" }],
});

Produce Many

const p = kafka.producer();
const res = await p.produceMany([
  {
    topic: "TOPIC",
    value: "MESSAGE",
    // ...options
  },
  {
    topic: "TOPIC-2",
    value: "MESSAGE-2",
    // ...options
  },
]);

Consume

When a new consumer instance is created, it may return empty messages until consumer group coordination is completed.

const c = kafka.consumer();
const messages = await c.consume({
  consumerGroupId: "group_1",
  instanceId: "instance_1",
  topics: ["test.topic"],
  autoOffsetReset: "earliest",
});

Commit

While consume commits automatically, you can commit manually as below:

const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";

const c = kafka.consumer();
const messages = await c.consume({
  consumerGroupId,
  instanceId,
  topics: [topic],
  autoCommit: false,
});

for (const message of messages) {
  // message handling logic

  await c.commit({
    consumerGroupId,
    instanceId,
    offset: {
      topic: message.topic,
      partition: message.partition,
      offset: message.offset,
    },
  });
}

Fetch

const c = kafka.consumer();
const messages = await c.fetch({
  topic: "greeting",
  partition: 3,
  offset: 42,
  timeout: 1000,
});

Examples

See here for more examples.