Kafka API
Upstash uses Apache Kafka for deployments and provides a serverless Kafka cluster access using both native Kafka clients (over TCP) and REST API (over HTTP). As a consequence of this flexible model, there are some restrictions when using Kafka protocol, mainly for administrative Kafka APIs.
Currently following Kafka Protocol APIs are supported by Upstash:
NAME | KEY | NAME | KEY | NAME | KEY |
---|---|---|---|---|---|
Produce | 0 | DescribeGroups | 15 | EndTxn | 26 |
Fetch | 1 | ListGroups | 16 | TxnOffsetCommit | 28 |
ListOffsets | 2 | SaslHandshake | 17 | DescribeConfigs | 32 |
Metadata | 3 | ApiVersions | 18 | AlterConfigs | 33 |
OffsetCommit | 8 | CreateTopics | 19 | DescribeLogDirs | 35 |
OffsetFetch | 9 | DeleteTopics | 20 | SaslAuthenticate | 36 |
FindCoordinator | 10 | DeleteRecords | 21 | CreatePartitions | 37 |
JoinGroup | 11 | InitProducerId | 22 | DeleteGroups | 42 |
Heartbeat | 12 | OffsetForLeaderEpoch | 23 | IncrementalAlterConfigs | 44 |
LeaveGroup | 13 | AddPartitionsToTxn | 24 | OffsetDelete | 47 |
SyncGroup | 14 | AddOffsetsToTxn | 25 | DescribeCluster | 60 |
Some of the unsupported Kafka APIs are in our roadmap to make them available. If you need an API that we do not support at the moment, please drop a note to support@upstash.com. So we can inform you when we are planning to support it.
Connect Using Kafka Clients
Connecting to Upstash Kafka using any Kafka client is very straightforward. If you do not have a Kafka cluster and/or topic already, follow these steps to create one.
After creating a cluster and a topic, just go to cluster details page on the Upstash Console and copy bootstrap endpoint, username and password.
![](https://mintlify.s3-us-west-1.amazonaws.com/upstash/img/kafka/getting_started/cluster-detail.png)
Then replace following parameters in the code snippets of your favourite Kafka client or language below.
{{ BOOTSTRAP_ENDPOINT }}
{{ UPSTASH_KAFKA_USERNAME }}
{{ UPSTASH_KAFKA_PASSWORD }}
{{ TOPIC_NAME }}
Create a Topic
class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
try (var admin = Admin.create(props)) {
admin.createTopics(
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
).all().get();
}
}
}
Produce a Message
class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
producer.flush();
}
}
}
Consume Messages
class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "{{ GROUP_NAME }}");
try(var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}
Was this page helpful?