#### QStash Price Decrease (Sep 15, 2022)
The price is \$1 per 100K requests.
#### [Pulumi Provider is available](https://upstash.com/blog/upstash-pulumi-provider) (August 4, 2022)
#### [QStash is released and announced](https://upstash.com/blog/qstash-announcement) (July 18, 2022)
#### [Announcing Upstash CLI](https://upstash.com/blog/upstash-cli) (May 16, 2022)
#### [Introducing Redis 6 Compatibility](https://upstash.com/blog/redis-6) (April 10, 2022)
#### Strong Consistency Deprecated (March 29, 2022)
We have deprecated Strong Consistency mode for Redis databases due to its
performance impact. This will not be available for new databases. We are
planning to disable it on existing databases before the end of 2023. The
database owners will be notified via email.
#### [Announcing Upstash Redis SDK v1.0.0](https://upstash.com/blog/upstash-redis-sdk-v1) (March 14, 2022)
#### Support for Kafka (Nov 29, 2021)
Kafka Support is released. Check the
[the blog post](https://blog.upstash.com/serverless-kafka-launch).
#### Support for Google Cloud (June 8, 2021)
Google Cloud is available for Upstash Redis databases. We initially support
US-Central-1 (Iowa) region. Check the
[get started guide](https://docs.upstash.com/redis/howto/getstartedgooglecloudfunctions).
#### Support for AWS Japan (March 1, 2021)
ăăă«ăĄăŻæ„æŹ
Support for AWS Tokyo Region was the most requested feature by our users. Now
our users can create their database in AWS Asia Pacific (Tokyo) region
(ap-northeast-1). In addition to Japan, Upstash is available in the regions
us-west-1, us-east-1, eu-west-1.
Click [here](https://console.upstash.com) to start your database for free.
Click [here](https://roadmap.upstash.com) to request new regions to be
supported.
#### Vercel Integration (February 22, 2021)
Upstash\&Vercel integration has been released. Now you are able to integrate
Upstash to your project easily. We believe Upstash is the perfect database for
your applications thanks to its:
* Low latency data
* Per request pricing
* Durable storage
* Ease of use
Below are the resources about the integration:
See [how to guide](https://docs.upstash.com/redis/howto/vercelintegration).
See [integration page](https://vercel.com/integrations/upstash).
See
[Roadmap Voting app](https://github.com/upstash/roadmap)
as a showcase for the integration.
# Compliance
## Upstash Legal & Security Documents
* [Upstash Terms of Service](https://upstash.com/static/trust/terms.pdf)
* [Upstash Privacy Policy](https://upstash.com/static/trust/privacy.pdf)
* [Upstash Data Processing Agreement](https://upstash.com/static/trust/dpa.pdf)
* [Upstash Technical and Organizational Security Measures](https://upstash.com/static/trust/security-measures.pdf)
* [Upstash Subcontractors](https://upstash.com/static/trust/subprocessors.pdf)
## Is Upstash SOC2 Compliant?
Upstash Redis databases under Pro and Enterprise support plans are SOC2 compliant. Check our [trust page](https://trust.upstash.com/) for details.
## Is Upstash ISO-27001 Compliant?
We are in process of getting this certification. Contact us
([support@upstash.com](mailto:support@upstash.com)) to learn about the expected
date.
## Is Upstash GDPR Compliant?
Yes. For more information, see our
[Privacy Policy](https://upstash.com/static/trust/privacy.pdf). We acquire DPAs
from each [subcontractor](https://upstash.com/static/trust/subprocessors.pdf)
that we work with.
## Is Upstash HIPAA Compliant?
We are in process of getting this certification. Contact us
([support@upstash.com](mailto:support@upstash.com)) to learn about the expected
date.
## Is Upstash PCI Compliant?
Upstash does not store personal credit card information. We use Stripe for
payment processing. Stripe is a certified PCI Service Provider Level 1, which is
the highest level of certification in the payments industry.
## Does Upstash conduct vulnerability scanning and penetration tests?
Yes, we use third party tools and work with pen testers. We share the results
with Enterprise customers. Contact us
([support@upstash.com](mailto:support@upstash.com)) for more information.
## Does Upstash take backups?
Yes, we take regular snapshots of the data cluster to the AWS S3 platform.
## Does Upstash encrypt data?
Customers can enable TLS when creating a database or cluster, and we recommend this for production environments. Additionally, we encrypt data at rest upon customer request.
# Integration with Third Parties & Partnerships
## Introduction
In this guideline we will outline the steps to integrate Upstash into your platform (GUI or Web App) and allow your users to create and manage Upstash databases without leaving your interfaces. We will explain how to use OAuth2.0 as the underlying foundation to enable this access seamlessly.
If your product or service offering utilizes Redis, Kafka or QStash or if there is a common use case that your end users enable by leveraging these database resources, we invite you to be a partner with us. By integrating Upstash into your platform, you can offer a more complete package for your customers and become a one stop shop. This will also position yourself at the forefront of innovative cloud computing trends such as serverless and expand your customer base.
This is the most commonly used partnership integration model that can be easily implemented by following this guideline. Recently [Cloudflare workers integration](https://blog.cloudflare.com/cloudflare-workers-database-integration-with-upstash/) is implemented through this methodology. For any further questions or partnership discussions please send us an email at [partnerships@upstash.com](mailto:partnerships@upstash.com)
Before starting development to integrate Upstash into your product, please
send an email to [partnerships@upstash.com](mailto:partnerships@upstash.com) for further assistance and guidance.
**General Flow (High level user flow)**
1. User clicks **`Connect Upstash`**Â button on your platformâs surface (GUI, Web App)
2. This initiates the OAuth 2.0 flow, which opens a new browser page displaying the **`Upstash Login Page`**.
3. If this is an existing user, user logins with their Upstash credentials otherwise they can directly sign up for a new Upstash account.
4. Browser window redirects to **`Your account has been connected`** page and authentication window automatically closes.
5. After the user returns to your interface, they see their Upstash Account is now connected.
## Technical Design (SPA - Regular Web Application)
1. Users click `Connect Upstash` button from Web App.
2. Web App initiate Upstash OAuth 2.0 flow. Web App can use
[Auth0 native libraries](https://auth0.com/docs/libraries).
Please reach [partnerships@upstash.com](mailto:partnerships@upstash.com) to receive client id and callback url.
3. After user returns from OAuth 2.0 flow then web app will have JWT token. Web
App can generate Developer Api key:
```bash
curl -XPOST https://api.upstash.com/apikey \
-H "Authorization: Bearer JWT_KEY" \
-H "Content-Type: application/json" \
-d '{ "name": "APPNAME_API_KEY_TIMESTAMP" }'
```
4. Web App need to save Developer Api Key to the backend.
## Technical Design ( GUI Apps )
1. User clicks **`Connect Upstash`** button from web app.
2. Web app initiates Upstash OAuth 2.0 flow and it can use **[Auth0 native libraries](https://auth0.com/docs/libraries)**.
3. App will open new browser:
```
https://auth.upstash.com/authorize?response_type=code&audience=upstash-api&scope=offline_access&client_id=XXXXXXXXXX&redirect_uri=http%3A%2F%2Flocalhost:3000
```
Please reach [partnerships@upstash.com](mailto:partnerships@upstash.com) to receive client id.
4. After user authenticated Auth0 will redirect user to
`localhost:3000/?code=XXXXXX`
5. APP can return some nice html response when Auth0 returns to `localhost:3000`
6. After getting `code` parameter from the URL query, GUI App will make http
call to the Auth0 code exchange api. Example CURL request
```bash
curl -XPOST 'https://auth.upstash.com/oauth/token' \
--header 'content-type: application/x-www-form-urlencoded' \
--data 'grant_type=authorization_code --data audience=upstash-api' \
--data 'client_id=XXXXXXXXXXX' \
--data 'code=XXXXXXXXXXXX' \
--data 'redirect_uri=localhost:3000'
```
Response:
```json
{
"access_token": "XXXXXXXXXX",
"refresh_token": "XXXXXXXXXXX",
"scope": "offline_access",
"expires_in": 172800,
"token_type": "Bearer"
}
```
7. After 6th Step the response will include `access_token`, it has 3 days TTL.
GUI App will call Upstash API to get a developer api key:
```bash
curl https://api.upstash.com/apikey -H "Authorization: Bearer JWT_KEY" -d '{ "name" : "APPNAME_API_KEY_TIMESTAMP" }'
```
8. GUI App will save Developer Api key locally. Then GUI App can call any
Upstash Developer API [developer.upstash.com/](https://developer.upstash.com/)
## Managing Resources
After obtaining Upstash Developer Api key, your platform surface (web or GUI) can call Upstash API. For example **[Create Database](https://developer.upstash.com/#create-database-global)**, **[List Database](https://developer.upstash.com/#list-databases)**
In this flow, you can ask users for region information and name of the database then can call Create Database API to complete the task
Example CURL request:
```bash
curl -X POST \
https://api.upstash.com/v2/redis/database \
-u 'EMAIL:API_KEY' \
-d '{"name":"myredis", "region":"global", "primary_region":"us-east-1", "read_regions":["us-west-1","us-west-2"], "tls": true}'
```
# Legal
## Upstash Legal Documents
* [Upstash Terms of Service](https://upstash.com/trust/terms.pdf)
* [Upstash Privacy Policy](https://upstash.com/trust/privacy.pdf)
* [Upstash Subcontractors](https://upstash.com/trust/subprocessors.pdf)
# Professional Support
For all Upstash products, we manage everything for you and let you focus on more important things. If you ever need further help, our dedicated Professional Support team are here to ensure you get the most out of our platform, whether youâre just starting or scaling to new heights.
Professional Support is strongly recommended especially for customers who use Upstash as part of their production systems.
# Expert Guidance
Get direct access to our team of specialists who can provide insights, troubleshooting, and best practices tailored to your unique use case. In any urgent incident you might have, our Support team will be standing by and ready to join you for troubleshooting.
Professional Support package includes:
* **Guaranteed Response Time:** Rapid Response Time SLA to urgent support requests, ensuring your concerns are addressed promptly with a **24/7 coverage**.
* **Customer Onboarding:** A personalized session to guide you through utilizing our support services and reviewing your specific use case for a seamless start.
* **Quarterly Use Case Review & Health Check:** On-request sessions every quarter to review your use case and ensure optimal performance.
* **Dedicated Slack Channel:** Direct access to our team via a private Slack channel, so you can reach out whenever you need assistance.
* **Incident Support:** Video call support during critical incidents to provide immediate help and resolution.
* **Root Cause Analysis:** Comprehensive investigation and post-mortem analysis of critical incidents to identify and address the root cause.
# Response Time SLA
We understand that timely assistance is critical for production workloads, so your access to our Support team comes with 24/7 coverage and below SLA:
| Severity | Response Time |
| ------------------------------- | ------------- |
| P1 - Production system down | 30 minutes |
| P2 - Production system impaired | 2 hours |
| P3 - Minor issue | 12 hours |
| P4 - General guidance | 24 hours |
## How to Reach Out?
As a Professional Support Customer, below are the **two methods** to reach out to Upstash Support Team, in case you need to utilize our services:
#### Starting a Chat
You will see a chatbox on the bottom right when viewing Upstash console, docs and website. Once you initiate a chat, Professional Support customers will be prompted to select a severity level:
To be able to see these options in chat, remember to sign into your Upstash Account first.
If you select "P1 - Production down, no workaround", or "P2 - Production impaired with workaround" options, you will be triggering an alert for our team to urgently step in.
#### Sending an Email
Sending an email with details to [support@upstash.com](mailto:support@upstash.com) is another way to submit a support request. In case of an urgency, sending an email with details by using "urgent" keyword in email subject is another alternative to alert our team about a possible incident.
# Pricing
For pricing and further details about Professional Support, please contact us at [support@upstash.com](mailto:support@upstash.com)
# Uptime SLA
This Service Level Agreement ("SLA") applies to the use of the Upstash services,
offered under the terms of our Terms of Service or other agreement with us
governing your use of Upstash. This SLA does not apply to Upstash services in
the Upstash Free and Pay-as-you-go Tier. It is clarified that this SLA is subject to the terms of
the Agreement, and does not derogate therefrom (capitalized terms, unless
otherwise indicated herein, have the meaning specified in the Agreement).
Upstash reserves the right to change the terms of this SLA by publishing updated
terms on its website, such change to be effective as of the date of publication.
### Upstash Database SLA
Upstash will use commercially reasonable efforts to make
databases available with a Monthly Uptime Percentage of at least 99.99%.
In the event any of the services do not meet the SLA, you will be eligible to
receive a Service Credit as described below.
| Monthly Uptime Percentage | Service Credit Percentage |
| --------------------------------------------------- | ------------------------- |
| Less than 99.99% but equal to or greater than 99.0% | 10% |
| Less than 99.0% but equal to or greater than 95.0% | 30% |
| Less than 95.0% | 60% |
### SLA Credits
Service Credits are calculated as a percentage of the monthly bill (excluding
one-time payments such as upfront payments) for the service in the affected
region that did not meet the SLA.
Uptime percentages are recorded and published in the
[Upstash Status Page](https://status.upstash.com).
To receive a Service Credit, you should submit a claim by sending an email to
[support@upstash.com](mailto:support@upstash.com). Your credit request should be
received by us before the end of the second billing cycle after the incident
occurred.
We will apply any service credits against future payments for the applicable
services. At our discretion, we may issue the Service Credit to the credit card
you used. Service Credits will not entitle you to any refund or other payment. A
Service Credit will be applicable and issued only if the credit amount for the
applicable monthly billing cycle is greater than one dollar (\$1 USD). Service
Credits may not be transferred or applied to any other account.
# Support & Contact Us
## Community
[Upstash Discord Channel](https://upstash.com/discord) is the best way to
interact with the community.
## Team
Regardless of your subscription plan, you can contact the team
via [support@upstash.com](mailto:support@upstash.com) for technical support as
well as questions and feedback.
## Follow Us
Follow us on [X](https://x.com/upstash).
## Bugs & Issues
You can help us improve Upstash by reporting issues, suggesting new features and
giving general feedback in
our [Community Github Repo](https://github.com/upstash/issues/issues/new).
## Enterprise Support
Get [Enterprise Support](/common/help/prosupport) for your organization from the Upstash team.
# Uptime Monitor
## Status Page
You can track the uptime status of Upstash databases in
[Upstash Status Page](https://status.upstash.com)
## Latency Monitor
You can see the average latencies for different regions in
[Upstash Latency Monitoring](https://latency.upstash.com) page
# Trials
If you want to try Upstash paid and pro plans, we can offer **Free
Trials**. Email us at [support@upstash.com](mailto:support@upstash.com)
# Overview
Manage Upstash resources in your terminal or CI.
You can find the Github Repository [here](https://github.com/upstash/cli).
# Installation
## npm
You can install upstash's cli directly from npm
```bash
npm i -g @upstash/cli
```
It will be added as `upstash` to your system's path.
## Compiled binaries:
`upstash` is also available from the
[releases page](https://github.com/upstash/cli/releases/latest) compiled
for windows, linux and mac (both intel and m1).
# Usage
```bash
> upstash
Usage: upstash
Version: development
Description:
Official cli for Upstash products
Options:
-h, --help - Show this help.
-V, --version - Show the version number for this program.
-c, --config - Path to .upstash.json file
Commands:
auth - Login and logout
redis - Manage redis database instances
kafka - Manage kafka clusters and topics
team - Manage your teams and their members
Environment variables:
UPSTASH_EMAIL - The email you use on upstash
UPSTASH_API_KEY - The api key from upstash
```
## Authentication
When running `upstash` for the first time, you should log in using
`upstash auth login`. Provide your email and an api key.
[See here for how to get a key.](https://docs.upstash.com/redis/howto/developerapi#api-development)
As an alternative to logging in, you can provide `UPSTASH_EMAIL` and
`UPSTASH_API_KEY` as environment variables.
## Usage
Let's create a new redis database:
```
> upstash redis create --name=my-db --region=eu-west-1
Database has been created
database_id a3e25299-132a-45b9-b026-c73f5a807859
database_name my-db
database_type Pay as You Go
region eu-west-1
type paid
port 37090
creation_time 1652687630
state active
password 88ae6392a1084d1186a3da37fb5f5a30
user_email andreas@upstash.com
endpoint eu1-magnetic-lacewing-37090.upstash.io
edge false
multizone false
rest_token AZDiASQgYTNlMjUyOTktMTMyYS00NWI5LWIwMjYtYzczZjVhODA3ODU5ODhhZTYzOTJhMTA4NGQxMTg2YTNkYTM3ZmI1ZjVhMzA=
read_only_rest_token ApDiASQgYTNlMjUyOTktMTMyYS00NWI5LWIwMjYtYzczZjVhODA3ODU5O_InFjRVX1XHsaSjq1wSerFCugZ8t8O1aTfbF6Jhq1I=
You can visit your database details page: https://console.upstash.com/redis/a3e25299-132a-45b9-b026-c73f5a807859
Connect to your database with redis-cli: redis-cli -u redis://88ae6392a1084d1186a3da37fb5f5a30@eu1-magnetic-lacewing-37090.upstash.io:37090
```
## Output
Most commands support the `--json` flag to return the raw api response as json,
which you can parse and automate your system.
```bash
> upstash redis create --name=test2113 --region=us-central1 --json | jq '.endpoint'
"gusc1-clean-gelding-30208.upstash.io"
```
# Authentication
Authentication for the Upstash Developer API
The Upstash API requires API keys to authenticate requests. You can view and
manage API keys at the Upstash Console.
Upstash API uses HTTP Basic authentication. You should pass `EMAIL` and
`API_KEY` as basic authentication username and password respectively.
With a client such as `curl`, you can pass your credentials with the `-u`
option, as the following example shows:
```curl
curl https://api.upstash.com/v2/redis/database -u EMAIL:API_KEY
```
Replace `EMAIL` and `API_KEY` with your email and API key.
# HTTP Status Codes
The Upstash API uses the following HTTP Status codes:
| Code | Description | |
| ---- | ------------------------- | ------------------------------------------------------------------------------- |
| 200 | **OK** | Indicates that a request completed successfully and the response contains data. |
| 400 | **Bad Request** | Your request is invalid. |
| 401 | **Unauthorized** | Your API key is wrong. |
| 403 | **Forbidden** | The kitten requested is hidden for administrators only. |
| 404 | **Not Found** | The specified kitten could not be found. |
| 405 | **Method Not Allowed** | You tried to access a kitten with an invalid method. |
| 406 | **Not Acceptable** | You requested a format that isn't JSON. |
| 429 | **Too Many Requests** | You're requesting too many kittens! Slow down! |
| 500 | **Internal Server Error** | We had a problem with our server. Try again later. |
| 503 | **Service Unavailable** | We're temporarily offline for maintenance. Please try again later. |
# Getting Started
Using Upstash API, you can develop applications that can create and manage
Upstash databases and Upstash kafka clusters. You can automate everything that
you can do in the console. To use developer API, you need to create an API key
in the console.
### Create an API key
1. Log in to the console then in the left menu click the
`Account > Management API` link.
2. Click the `Create API Key` button.
3. Enter a name for your key. You can not use the same name for multiple keys.
You need to download or copy/save your API key. Upstash does not remember or
keep your API for security reasons. So if you forget your API key, it becomes
useless; you need to create a new one.
You can create multiple keys. It is recommended to use different keys in
different applications. By default one user can create up to 37 API keys. If you
need more than that, please send us an email at
[support@upstash.com](mailto:support@upstash.com)
### Deleting an API key
When an API key is exposed (e.g. accidentally shared in a public repository) or
not being used anymore; you should delete it. You can delete the API keys in
`Account > API Keys` screen.
### Roadmap
**Role based access:** You will be able to create API keys with specific
privileges. For example you will be able to create a key with read-only access.
**Stats:** We will provide reports based on usage of your API keys.
# Create Kafka Cluster
POST https://api.upstash.com/v2/kafka/cluster
This endpoint creates a new kafka cluster.
## Request Parameters
Name of the new Kafka cluster
The region the cluster will be deployed in
**Options:** `eu-west-1` or `us-east-1`
Set true to enable multi-zone replication
## Response Parameters
ID of the created kafka cluster
Name of the kafka cluster
The region the kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the kafka cluster
REST endpoint to connect to the kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/cluster \
-u 'EMAIL:API_KEY' \
-d '{"name":"mykafkacluster","region":"eu-west-1","multizone":true}'
```
```python Python
import requests
data = '{"name":"mykafkacluster","region":"eu-west-1","multizone":true}'
response = requests.post('https://api.upstash.com/v2/kafka/cluster', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name": "test_kafka_cluster_4",
"region": "eu-west-1",
"multizone": true
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/cluster", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"name": "mykafkacluster",
"region": "eu-west-1",
"type": "paid",
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"state": "active",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "zlQgc0nbgcqF6MxOqnh7tKjJsGnSgLFS89uS-FXzMVqhL2dgFbmHwB-IXAAsOYXzUYj40g==",
"max_retention_size": 1073741824000,
"max_retention_time": 2592000000,
"max_messages_per_second": 1000,
"creation_time": 1643978975,
"max_message_size": 1048576,
"max_partitions": 100
}
```
# Delete Kafka Cluster
DELETE https://api.upstash.com/v2/kafka/cluster/{id}
This endpoint deletes a kafka cluster.
## URL Parameters
The ID of the Kafka cluster to be deleted
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/kafka/cluster/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/kafka/cluster/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/kafka/cluster/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Get Kafka Cluster
GET https://api.upstash.com/v2/Kafka/cluster/{id}
This endpoint gets details of a Kafka cluster.
## URL Parameters
The ID of the Kafka cluster
## Response Parameters
ID of the created Kafka cluster
Name of the Kafka cluster
The region the Kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/cluster/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/cluster/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/cluster/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"name": "test_kafka_cluster",
"region": "eu-west-1",
"type": "paid",
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"state": "active",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "zlQgc0nbgcqF6MxOqnh7tKjJsGnSgLFS89uS-FXzMVqhL2dgFbmHwB-IXAAsOYXzUYj40g==",
"max_retention_size": 1073741824000,
"max_retention_time": 2592000000,
"max_messages_per_second": 1000,
"creation_time": 1643978975,
"max_message_size": 1048576,
"max_partitions": 100
}
```
# List Kafka Clusters
GET https://api.upstash.com/v2/kafka/clusters
This endpoint lists all kafka clusters of user.
## Response Parameters
ID of the created kafka cluster
Name of the kafka cluster
The region the kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the kafka cluster
REST endpoint to connect to the kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/clusters \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/clusters', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/clusters", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"name": "test_kafka_cluster",
"region": "eu-west-1",
"type": "paid",
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"state": "active",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "zlQgc0nbgcqF6MxOqnh7tKjJsGnSgLFS89uS-FXzMVqhL2dgFbmHwB-IXAAsOYXzUYj40g==",
"max_retention_size": 1073741824000,
"max_retention_time": 2592000000,
"max_messages_per_second": 1000,
"creation_time": 1643978975,
"max_message_size": 1048576,
"max_partitions": 100
}
]
```
# Rename Kafka Cluster
POST https://api.upstash.com/v2/kafka/rename-cluster/{id}
This endpoint gets details of a Kafka cluster.
## URL Parameters
The ID of the Kafka cluster
## Request Parameters
The new name of the kafka cluster
## Response Parameters
ID of the created Kafka cluster
Name of the Kafka cluster
The region the Kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/rename-cluster/:id \
-u 'EMAIL:API_KEY' \
-d '{"name":"mykafkacluster-2"}'
```
```python Python
import requests
data = '{"name":"mykafkacluster-2"}'
response = requests.post('https://api.upstash.com/v2/kafka/rename-cluster/:id', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name":"mykafkacluster-2"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/rename-cluster/:id", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"name": "mykafkacluster-2",
"region": "eu-west-1",
"type": "paid",
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"state": "active",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "zlQgc0nbgcqF6MxOqnh7tKjJsGnSgLFS89uS-FXzMVqhL2dgFbmHwB-IXAAsOYXzUYj40g==",
"max_retention_size": 1073741824000,
"max_retention_time": 2592000000,
"max_messages_per_second": 1000,
"creation_time": 1643978975,
"max_message_size": 1048576,
"max_partitions": 100
}
```
# Reset Kafka Cluster Password
POST https://api.upstash.com/v2/kafka/reset-password/{id}
This endpoint updates the password of a kafka cluster
## URL Parameters
The ID of the Kafka cluster to reset password
## Response Parameters
ID of the created Kafka cluster
Name of the Kafka cluster
The region the Kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/reset-password/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/kafka/reset-password/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/reset-password/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"name": "mykafkacluster-2",
"region": "eu-west-1",
"type": "paid",
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"state": "active",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "7ea02715ceeb4fd3ba1542a5f3bf758e",
"max_retention_size": 1073741824000,
"max_retention_time": 2592000000,
"max_messages_per_second": 1000,
"creation_time": 1643978975,
"max_message_size": 1048576,
"max_partitions": 100
}
```
# Get Kafka Cluster Stats
GET https://api.upstash.com/v2/kafka/stats/topic/{id}
This endpoint gets detailed stats of a database.
## URL Parameters
The ID of the Kafka cluster
## Response Parameters
Timestamp indicating when the measurement was taken.
Number of monthly messages in kafka cluster
Timestamp indicating when the measurement was taken.
Number of monthly messages produced in kafka cluster
Timestamp indicating when the measurement was taken.
Number of monthly messages consumed in kafka cluster
Timestamp indicating when the measurement was taken.
Total disk usage of the kafka cluster
String representation of last 5 days of the week starting from the current day
Last 5 days daily produced message count in kafka cluster
Last 5 days daily consumed message count in kafka cluster
Average storage size of the kafka cluster in the current month
Total cost of the kafka cluster in current month
Total number of produced message in current month
Total number of consumed message in current month
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/stats/cluster/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/stats/cluster/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/stats/cluster/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"throughput": [
{
"x": "2022-02-07 11:30:28",
"y": 0
}
...
],
"produce_throughput": [
{
"x": "2022-02-07 11:30:28",
"y": 0
}
...
],
"consume_throughput": [
{
"x": "2022-02-07 11:30:28",
"y": 0
}
...
],
"diskusage": [
{
"x": "2022-02-07 11:45:28",
"y": 0
}
...
],
"days": [
"Thursday",
"Friday",
"Saturday",
"Sunday",
"Monday"
],
"dailyproduce": [
{
"x": "2022-02-07 11:30:28.937259962 +0000 UTC",
"y": 0
}
...
],
"dailyconsume": [
{
"x": "2022-02-07 11:30:28.937256776 +0000 UTC",
"y": 0
}
...
],
"total_monthly_storage": 0,
"total_monthly_billing": 0,
"total_monthly_produce": 0,
"total_monthly_consume": 0
}
```
# Create Kafka Connector
POST https://api.upstash.com/v2/kafka/connector
This endpoint creates a new kafka connector in a cluster.
## Request Parameters
Name of the new kafka topic
ID of the cluster the topic will be deployed in
Properties of the connector. Custom config for different types of connectors.
## Response Parameters
ID of the new kafka connector
Name of the new kafka connector
Owner of the connector
ID of the kafka cluster of the connector
Creation time of the topic
Creation time of the topic
State of the connector
Error message, if the connector failed
State of the connector
Tasks for the connector
Topics that are given with properties config
Class of the created connector
Encoded username for the connector
Time to live for connector
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/connector \
-u 'EMAIL:API_KEY' \
-d '{"name":"connectorName","cluster_id":"7568431c-88d5-4409-a808-2167f22a7133", "properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}}'
```
```python Python
import requests
data = '{"name":"connectorName","cluster_id":"7568431c-88d5-4409-a808-2167f22a7133", "properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}}'
response = requests.post('https://api.upstash.com/v2/kafka/connector', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name": "connectorName",
"cluster_id": "7568431c-88d5-4409-a808-2167f22a7133",
"properties":{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri"}
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/connector", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"connector_id":"431ec970-b59d-4b00-95fe-5f3abcc52c2f",
"name":"connectorName",
"customer_id":"EMAIL",
"cluster_id":"7568431c-88d5-4409-a808-2167f22a7133",
"creation_time":1684369147,
"deletion_time":0,
"state":"pending",
"state_error_message":"",
"connector_state":"",
"tasks":[],
"topics":[],
"connector_class":"com.mongodb.kafka.connect.MongoSourceConnector",
"encoded_username":"YXBwYXJlbnQta2l0ZS0xMTMwMiTIqFhTItzgDdE56au6LgnnbtlN7ITzh4QATDw",
"TTL":1684370947
}
```
# Delete Kafka Connector
DELETE https://api.upstash.com/v2/kafka/connector/{id}
This endpoint deletes a Kafka Connector.
## URL Parameters
The ID of the Kafka Connector to be deleted
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/kafka/connector/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/kafka/connector/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/kafka/connector/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Get Kafka Connector
GET https://api.upstash.com/v2/kafka/connector/{id}
This endpoint gets details of a kafka connector.
## URL Parameters
The ID of the Kafka Connector
## Response Parameters
ID of the Kafka connector
Name of the Kafka connector
ID of the kafka cluster of the connector
Creation time of the topic
Owner of the connector
State of the connector
Error message, if the connector failed
State of the connector
Tasks for the connector
Topics that are given with properties config
Class of the created connector
Properties that the connector was configured with
Encoded username for the connector
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/connector/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/connector/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/connector/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"connector_id": "431ec970-b59d-4b00-95fe-5f3abcc52c2f",
"name": "connectorName",
"customer_id": "EMAIL",
"cluster_id": "7568431c-88d5-4409-a808-2167f22a7133",
"creation_time": 1684369147,
"deletion_time": 0,
"state": "failed",
"state_error_message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value connection-uri-update for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv://\n",
"connector_state": "",
"tasks": [],
"topics": [],
"connector_class": "com.mongodb.kafka.connect.MongoSourceConnector",
"properties": {
"connection.uri": "connection-uri-update",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector"
},
"encoded_username": "YXBwYXJlbnQta2l0ZS0xMTMwMiTIqFhTItzgDdE56au6LgnnbtlN7ITzh4QATDw"
}
```
# List Kafka Connectors in Cluster
GET https://api.upstash.com/v2/kafka/connectors/{id}
This endpoint lists kafka connectors in a cluster.
## URL Parameters
The ID of the Kafka Cluster
## Response Parameters
ID of the Kafka connector
Name of the Kafka connector
ID of the kafka cluster of the connector
Creation time of the topic
Owner of the connector
State of the connector
Error message, if the connector failed
State of the connector
Tasks for the connector
Topics that are given with properties config
Class of the created connector
Properties that the connector was configured with
Encoded username for the connector
Time to live for connector
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/connectors/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/connectors/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/connectors/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"connector_id": "431ec970-b59d-4b00-95fe-5f3abcc52c2f",
"name": "connectorName",
"customer_id": "EMAIL",
"cluster_id": "7568431c-88d5-4409-a808-2167f22a7133",
"creation_time": 1684369147,
"deletion_time": 0,
"state": "failed",
"state_error_message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value connection-uri-update for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv://\n",
"connector_state": "",
"tasks": [],
"topics": [],
"connector_class": "com.mongodb.kafka.connect.MongoSourceConnector",
"properties": {
"connection.uri": "connection-uri-update",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector"
},
"encoded_username": "YXBwYXJlbnQta2l0ZS0xMTMwMiTIqFhTItzgDdE56au6LgnnbtlN7ITzh4QATDw",
"TTL": 1684370947
}
]
```
# Pause Kafka Connector
POST https://api.upstash.com/v2/kafka/connector/{id}/pause
This endpoint pauses an existing connector.
## URL Parameters
The ID of the Kafka Connector to be paused
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/connector/:id/pause \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/kafka/connector/:id/start', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/connector/:id/start", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Reconfigure Kafka Connector
POST https://api.upstash.com/v2/kafka/update-connector/{id}
This endpoint reconfigures an existing kafka connector.
## Request Parameters
The ID of the Kafka Connector
## Request Parameters
Custom property values, depending on the connector type. Given values will be
changed on the connector. You can check the documentation of the related
connector.
## Response Parameters
ID of the Kafka connector
Name of the Kafka connector
ID of the kafka cluster of the connector
Creation time of the topic
Owner of the connector
State of the connector
Error message, if the connector failed
State of the connector
Tasks for the connector
Topics that are given with properties config
Class of the created connector
Encoded username for the connector
Time to live for connector
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/update-connector/:id \
-u 'EMAIL:API_KEY' \
-d '{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri-update"}'
```
```python Python
import requests
data = '{"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "connection-uri-update"}'
response = requests.post('https://api.upstash.com/v2/kafka/update-connector/:id', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "connection-uri-update"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/update-connector/:id", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"connector_id": "431ec970-b59d-4b00-95fe-5f3abcc52c2f",
"name": "connectorName",
"customer_id": "EMAIL",
"cluster_id": "7568431c-88d5-4409-a808-2167f22a7133",
"creation_time": 1684369147,
"deletion_time": 0,
"state": "failed",
"state_error_message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value connection-uri-update for configuration connection.uri: The connection string is invalid. Connection strings must start with either 'mongodb://' or 'mongodb+srv://\n",
"connector_state": "",
"tasks": [],
"topics": [],
"connector_class": "com.mongodb.kafka.connect.MongoSourceConnector",
"encoded_username": "YXBwYXJlbnQta2l0ZS0xMTMwMiTIqFhTItzgDdE56au6LgnnbtlN7ITzh4QATDw",
"TTL": 1684370947
}
```
# Restart Kafka Connector
POST https://api.upstash.com/v2/kafka/connector/{id}/restart
This endpoint restarts an existing connector.
## URL Parameters
The ID of the Kafka Connector to be restarted
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/connector/:id/restart \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/kafka/connector/:id/restart', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/connector/:id/restart", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Start Kafka Connector
POST https://api.upstash.com/v2/kafka/connector/{id}/start
This endpoint starts an existing connector.
## URL Parameters
The ID of the Kafka Connector to be started
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/connector/:id/start \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/kafka/connector/:id/start', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/connector/:id/start", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Create Kafka Credential
POST https://api.upstash.com/v2/kafka/credential
This endpoint creates a kafka credential.
## Request Parameters
The ID of the kafka topic
ID of the kafka cluster
Name of the kafka topic the credential will be used for
Permission scope of the credential
**Options:** `ALL`, `PRODUCE` or `CONSUME`
## Response Parameters
ID of the created Kafka credential
Name of the created Kafka credential
Name of the topic of the created Kafka credential
Permission scope given to the kafka credential
ID of the kafka cluster
Username to be used for the kafka credential
Creation time of the credential
Password to be used in authenticating to the cluster
State of the credential\ `active` or `deleted`
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/credential \
-u 'EMAIL:API_KEY' \
-d '{"credential_name": "mycreds", "cluster_id":"1793bfa1-d96e-46de-99ed-8f91f083209d", "topic": "testtopic", "permissions": "ALL"}'
```
```python Python
import requests
data = '{"credential_name": "mycreds", "cluster_id":"1793bfa1-d96e-46de-99ed-8f91f083209d", "topic": "testtopic", "permissions": "ALL"}'
response = requests.post('https://api.upstash.com/v2/credential', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"credential_name": "mycreds",
"cluster_id":"1793bfa1-d96e-46de-99ed-8f91f083209d",
"topic": "testopic",
"permissions": "ALL"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/credential", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"credential_id": "27172269-da05-471b-9e8e-8fe4195871bc",
"credential_name": "mycreds",
"topic": "testtopic",
"permissions": "ALL",
"cluster_id": "1793bfa1-d96e-46de-99ed-8f91f083209d",
"cluster_slug":"easy-haddock-7753",
"username":"ZWFzeS1oYWRkb2NrLTc3NTMkeeOs0FG4DZ3GxK99cArT0slAC37KLJgbe0fs7dA",
"creation_time": 1655886853,
"password": "xE1ypRHMq50jAhpbzu8qBb8jHNAxzezn6bkuRUvc2RZr7X1sznbhampm9p-feT61jnz6ewHJjUd5N6cQHhs84zCjQiP5somCY17FTQ7t6n0uPhWeyf-Fcw==",
"state": "active"
}
```
# Delete Kafka Credential
DELETE https://api.upstash.com/v2/kafka/credential/{id}
This endpoint deletes a kafka credential.
## URL Parameters
The ID of the kafka credential to delete
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/kafka/credential/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/kafka/credential/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/kafka/credential/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# List Kafka Credentials
GET https://api.upstash.com/v2/kafka/credentials
This endpoint lists created kafka credentials other than the default one.
## Request Parameters
## Response Parameters
ID of the created Kafka credential
Name of the created Kafka credential
Name of the topic of the created Kafka credential
Permission scope given to the kafka credential
ID of the kafka cluster
ID of the kafka cluster
Username to be used for the kafka credential
Creation time of the credential
Password to be used in authenticating to the cluster
State of the credential\ `active` or `deleted`
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/credentials \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/credentials', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/credentials", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"credential_id": "27172269-da05-471b-9e8e-8fe4195871bc",
"credential_name": "mycreds",
"topic": "testopic",
"permissions": "ALL",
"cluster_id": "1793bfa1-d96e-46de-99ed-8f91f083209d",
"cluster_slug":"noted-hamster-9151",
"username":"bm90ZWQtaGFtc3Rlci05MTUxJPGKdKDkmwdObf8yMzmJ8jUqhmN1UQ7VmDe1xkk",
"creation_time": 1655886853,
"password": "xE1ypRHMq50jAhpbzu8qBb8jHNAxzezn6bkuRUvc2RZr7X1sznbhampm9p-feT61jnz6ewHJjUd5N6cQHhs84zCjQiP5somCY17FTQ7t6n0uPhWeyf-Fcw==",
"state": "active"
}
]
```
# Create Kafka Topic
POST https://api.upstash.com/v2/kafka/topic
This endpoint creates a new kafka topic in a cluster.
## Request Parameters
Name of the new kafka topic
The number of partitions the topic will have
Retention time of messsages in the topic `-1` for highest possible value
Retention size of the messages in the topic `-1` for highest possible value
Max message size in the topic `-1` for highest possible value
Cleanup policy will be used in the topic `compact` or `delete`
ID of the cluster the topic will be deployed in
## Response Parameters
ID of the new kafka topic
Name of the new kafka topic
ID of the created Kafka cluster
Name of the Kafka cluster
The region the Kafka cluster is deployed in
Shows whether the cluster is free or paid
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Current state of the cluster(active, deleted)
Username to be used in authenticating to the cluster
Password to be used in authenticating to the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max messages allowed to be produced per second
Cluster creation timestamp
Max message size will be allowed in topics in the cluster
Max total number of partitions allowed in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/topic \
-u 'EMAIL:API_KEY' \
-d '{"name":"test-kafka-topic","partitions":1,"retention_time":1234,"retention_size":4567,"max_message_size":8912,"cleanup_policy":"delete","cluster_id":"9bc0e897-cbd3-4997-895a-fd77ad00aec9"}'
```
```python Python
import requests
data = '{"name":"test-kafka-topic","partitions":1,"retention_time":1234,"retention_size":4567,"max_message_size":8912,"cleanup_policy":"delete","cluster_id":"9bc0e897-cbd3-4997-895a-fd77ad00aec9"}'
response = requests.post('https://api.upstash.com/v2/kafka/topic', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name": "test-kafka-topic",
"partitions": 1,
"retention_time": 1234,
"retention_size": 4567,
"max_message_size": 8912,
"cleanup_policy": "delete",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/topic", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"topic_id": "0f458c88-2dc6-4f69-97bb-05060e0be934",
"topic_name": "test-kafka-topic",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"region": "eu-west-1",
"creation_time": 1643981720,
"state": "active",
"partitions": 1,
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "eu8K3rYRS-ma0AsINDo7MMemmHjjRSldHJcG3c1LUMZkFfdSf9u_Kd4xCWO9_oQc",
"cleanup_policy": "delete",
"retention_size": 4567,
"retention_time": 1234,
"max_message_size": 8912
}
```
# Delete Kafka Topic
DELETE https://api.upstash.com/v2/kafka/topic/{id}
This endpoint deletes a kafka topic in a cluster.
## URL Parameters
The ID of the Kafka Topic to be deleted
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/kafka/topic/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/kafka/topic/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/kafka/topic/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Get Kafka Topic
GET https://api.upstash.com/v2/kafka/topic/{id}
This endpoint gets details of a kafka topic.
## URL Parameters
The ID of the kafka topic
## Response Parameters
ID of the new kafka topic
Name of the new kafka topic
ID of the created Kafka cluster
The region the Kafka cluster is deployed in
Cluster creation timestamp
State of the topic\ `active` or `deleted`
Number of partitions the topic has
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Cleanup policy to be used in the topic\ `compact` or `delete`
Password to be used in authenticating to the cluster
Max total number of partitions allowed in the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max message size will be allowed in topics in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/topic \
-u 'EMAIL:API_KEY' \
-d '{"name":"test-kafka-topic","partitions":1,"retention_time":1234,"retention_size":4567,"max_message_size":8912,"cleanup_policy":"delete","cluster_id":"9bc0e897-cbd3-4997-895a-fd77ad00aec9"}'
```
```python Python
import requests
data = '{"name":"test-kafka-topic","partitions":1,"retention_time":1234,"retention_size":4567,"max_message_size":8912,"cleanup_policy":"delete","cluster_id":"9bc0e897-cbd3-4997-895a-fd77ad00aec9"}'
response = requests.post('https://api.upstash.com/v2/kafka/topic', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name": "test-kafka-topic",
"partitions": 1,
"retention_time": 1234,
"retention_size": 4567,
"max_message_size": 8912,
"cleanup_policy": "delete",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/topic", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"topic_id": "0f458c88-2dc6-4f69-97bb-05060e0be934",
"topic_name": "test-kafka-topic",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"region": "eu-west-1",
"creation_time": 1643981720,
"state": "active",
"partitions": 1,
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "eu8K3rYRS-ma0AsINDo7MMemmHjjRSldHJcG3c1LUMZkFfdSf9u_Kd4xCWO9_oQc",
"cleanup_policy": "delete",
"retention_size": 4567,
"retention_time": 1234,
"max_message_size": 8912
}
```
# List Kafka Topics in Cluster
GET https://api.upstash.com/v2/kafka/topics/{id}
This endpoint list kafka topics in a cluster.
## URL Parameters
The ID of the Kafka cluster
## Response Parameters
ID of the new kafka topic
Name of the new kafka topic
ID of the created Kafka cluster
The region the Kafka cluster is deployed in
Cluster creation timestamp
State of the topic\ **Options**: `active` or `deleted`
Number of partitions the topic has
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Cleanup policy to be used in the topic\ **Options**: `compact` or `delete`
Password to be used in authenticating to the cluster
Max total number of partitions allowed in the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max message size will be allowed in topics in the cluster
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/topics/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/topics/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/topics/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"topic_id": "0f458c88-2dc6-4f69-97bb-05060e0be934",
"topic_name": "test-kafka-topic",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"region": "eu-west-1",
"creation_time": 1643981720,
"state": "active",
"partitions": 1,
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "eu8K3rYRS-ma0AsINDo7MMemmHjjRSldHJcG3c1LUMZkFfdSf9u_Kd4xCWO9_oQc",
"cleanup_policy": "delete",
"retention_size": 4568,
"retention_time": 1235,
"max_message_size": 8913
}
]
```
# Reconfigure Kafka Topic
POST https://api.upstash.com/v2/kafka/update-topic/{id}
This endpoint reconfigures an existing kafka topic.
## URL Parameters
The unique ID of the topic
## Request Parameters
Retention time of messsages in the topic\ `-1` for highest possible value
Retention size of the messages in the topic\ `-1` for highest possible value
Max message size in the topic\\
## Response Parameters
ID of the new kafka topic
Name of the new kafka topic
ID of the created Kafka cluster
The region the Kafka cluster is deployed in
Cluster creation timestamp
State of the topic\ `active` or `deleted`
Number of partitions the topic has
Whether the multizone replication is enabled for the cluster or not
TCP endpoint to connect to the Kafka cluster
REST endpoint to connect to the Kafka cluster
Cleanup policy to be used in the topic\ `compact` or `delete`
Password to be used in authenticating to the cluster
Max total number of partitions allowed in the cluster
Max retention size will be allowed to topics in the cluster
Max retention time will be allowed to topics in the cluster
Max message size will be allowed in topics in the cluster
```shell curl
curl -X POST \
https://api.upstash.com/v2/kafka/update-topic/:id \
-u 'EMAIL:API_KEY' \
-d '{"retention_time":1235,"retention_size":4568,"max_message_size":8913}'
```
```python Python
import requests
data = '{"retention_time":1235,"retention_size":4568,"max_message_size":8913}'
response = requests.post('https://api.upstash.com/v2/kafka/update-topic/:id', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"retention_time": 1235,
"retention_size": 4568,
"max_message_size": 8913
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/kafka/update-topic/:id", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"topic_id": "0f458c88-2dc6-4f69-97bb-05060e0be934",
"topic_name": "test-kafka-topic",
"cluster_id": "9bc0e897-cbd3-4997-895a-fd77ad00aec9",
"region": "eu-west-1",
"creation_time": 1643981720,
"state": "active",
"partitions": 1,
"multizone": true,
"tcp_endpoint": "sharing-mastodon-12819-eu1-kafka.upstashdev.com",
"rest_endpoint": "sharing-mastodon-12819-eu1-rest-kafka.upstashdev.com",
"username": "c2hhcmluZy1tYXN0b2Rvbi0xMjgxOSRV1ipriSBOwd0PHzw2KAs_cDrTXzvUKIs",
"password": "eu8K3rYRS-ma0AsINDo7MMemmHjjRSldHJcG3c1LUMZkFfdSf9u_Kd4xCWO9_oQc",
"cleanup_policy": "delete",
"retention_size": 4568,
"retention_time": 1235,
"max_message_size": 8913
}
```
# Get Kafka Topic Stats
GET https://api.upstash.com/v2/Kafka/stats/topic/{id}
This endpoint gets detailed stats of a Kafka cluster.
## URL Parameters
The ID of the Kafka topic
## Response Parameters
Timestamp indicating when the measurement was taken.
Number of monthly messages in Kafka topic
Timestamp indicating when the measurement was taken.
Number of monthly messages produced in Kafka topic
Timestamp indicating when the measurement was taken.
Number of monthly messages consumed in Kafka topic
Timestamp indicating when the measurement was taken.
Total disk usage of the Kafka topic
Average storage size of the Kafka topic in the current month
Total number of monthly produced messages to the Kafka topic
Total number of monthly consumed messages from the Kafka topic
```shell curl
curl -X GET \
https://api.upstash.com/v2/kafka/stats/topic/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/kafka/stats/topic/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/kafka/stats/topic/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"throughput": [
{
"x": "2022-02-07 12:05:11",
"y": 0
}
...
],
"produce_throughput": [
{
"x": "2022-02-07 12:05:11",
"y": 0
}
...
],
"consume_throughput": [
{
"x": "2022-02-07 12:05:11",
"y": 0
}
...
],
"diskusage": [
{
"x": "2022-02-07 12:20:11",
"y": 0
}
...
],
"total_monthly_storage": 0,
"total_monthly_produce": 0,
"total_monthly_consume": 0
}
```
# null
# Create Backup
POST https://api.upstash.com/v2/redis/create-backup/{id}
This endpoint creates a backup for a Redis database.
## URL Parameters
The ID of the Redis database
## Request Parameters
Name of the backup
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/create-backup/{id} \
-u 'EMAIL:API_KEY' \
-d '{"name" : "backup_name"}'
```
```python Python
import requests
data = '{"name" : "backup_name"}'
response = requests.post('https://api.upstash.com/v2/redis/create-backup/{id}', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name":"backup_name"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/create-backup/{id}", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
"OK"
```
# Delete Backup
DELETE https://api.upstash.com/v2/redis/delete-backup/{id}/{backup_id}
This endpoint deletes a backup of a Redis database.
## URL Parameters
The ID of the Redis database
The ID of the backup to delete
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/redis/delete-backup/:id/:backup_id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/redis/delete-backup/:id/:backup_id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/redis/delete-backup/:id/:backup_id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
"OK"
```
# Disable Daily Backup
PATCH https://api.upstash.com/v2/redis/disable-dailybackup/{id}
This endpoint disables daily backup for a Redis database.
## URL Parameters
The ID of the Redis database
```shell curl
curl -X PATCH \
https://api.upstash.com/v2/redis/disable-dailybackup/{id} \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.patch('https://api.upstash.com/v2/redis/disable-dailybackup/{id}', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("PATCH", "https://api.upstash.com/v2/redis/disable-dailybackup/{id}", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
"OK"
```
# Enable Daily Backup
PATCH https://api.upstash.com/v2/redis/enable-dailybackup/{id}
This endpoint enables daily backup for a Redis database.
## URL Parameters
The ID of the Redis database
```shell curl
curl -X PATCH \
https://api.upstash.com/v2/redis/enable-dailybackup/{id} \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.patch('https://api.upstash.com/v2/redis/enable-dailybackup/{id}', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("PATCH", "https://api.upstash.com/v2/redis/enable-dailybackup/{id}", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
"OK"
```
# List Backup
GET https://api.upstash.com/v2/redis/list-backup/{id}
This endpoint lists all backups for a Redis database.
## URL Parameters
The ID of the Redis database
## Response Parameters
ID of the database
Customer ID
Name of the backup
ID of the backup
Creation time of the backup as Unix time
State of the backup (e.g., completed)
Size of the backup
Daily backup status
Hourly backup status
```shell curl
curl -X GET \
https://api.upstash.com/v2/redis/list-backup/{id} \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/redis/list-backup/{id}', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/redis/list-backup/{id}", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
[
{
"database_id":"6gceaafd-9627-4fa5-8g71-b3359g19a5g4",
"customer_id":"customer_id",
"name":"test2",
"backup_id":"1768e55b-c137-4339-b46e-449dcd33a62e",
"creation_time":1720186545,
"state":"completed",
"backup_size":0,
"daily_backup":"false",
"hourly_backup":"false"
},
{
"database_id":"6gceaafd-9627-4fa5-8g71-b3359g19a5g4",
"customer_id":"customer_id",
"name":"test1",
"backup_id":"39310b84-21b3-45c3-5318-403553a2466d",
"creation_time":1720096600,
"state":"completed",
"backup_size":0,
"daily_backup":"false",
"hourly_backup":"false"
}
]
```
# Restore Backup
POST https://api.upstash.com/v2/redis/restore-backup/{id}
This endpoint restores data from an existing backup.
## URL Parameters
The ID of the Redis database
## Request Parameters
ID of the backup to restore
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/restore-backup/{id} \
-u 'EMAIL:API_KEY'
-d '{"backup_id" : "backup_id"}'
```
```python Python
import requests
data = '{"backup_id" : "backup_id"}'
response = requests.post('https://api.upstash.com/v2/redis/restore-backup/{id}', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"backup_id":"backup_id"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/restore-backup/{id}", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s
", bodyText);
```
```json 200 OK
"OK"
```
# Create a Redis Database (Regional - DEPRECATED)
POST https://api.upstash.com/v2/redis/database
This endpoint creates a new regional Redis database. This behaviour is deprecated in favor of Global databases and support for it will be removed in the upcoming releases.
## Request Parameters
Name of the database
Region of the database.\ **Options:** `eu-west-1`, `us-east-1`, `us-west-1`,
`ap-northeast-1` or `us-central1`
Set true to enable tls.
## Response Parameters
ID of the created database
Name of the database
Type of the database in terms of pricing model(Free, Pay as You Go or
Enterprise)
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database (active or deleted)
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/database \
-u 'EMAIL:API_KEY' \
-d '{"name":"myredis","region":"eu-west-1","tls": true}'
```
```python Python
import requests
data = '{"name":"myredis","region":"eu-west-1","tls":true}'
response = requests.post('https://api.upstash.com/v2/redis/database', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name":"myredis",
"region":"eu-west-1",
"tls": true
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/database", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "038a8e27c45e43068d5f186085399884",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
"rest_token": "AXW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUxMDM4YThlMjdjNDVlNDMwNjhkNWYxODYwODUzOTk4ODQ=",
"read_only_rest_token": "AnW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUx8sbmiEcMm9u7Ks5Qx-kHNiWr_f-iUXSIH8MlziKMnpY="
}
```
# Create a Redis Database (Global)
POST https://api.upstash.com/v2/redis/database
This endpoint creates a new Redis database.
## Request Parameters
Name of the database
Region of the database. Only valid option is `global`.
Primary Region of the Global Database.
Available regions: `us-east-1`, `us-west-1`, `us-west-2`, `eu-west-1`,
`eu-central-1`, `ap-southeast-1`, `ap-southeast-2`, `sa-east-1`
Array of Read Regions of
the Database.
Available regions: `us-east-1`, `us-west-1`, `us-west-2`, `eu-west-1`,
`eu-central-1`, `ap-southeast-1`, `ap-southeast-2`, `ap-northeast-1`, `sa-east-1`
## Response Parameters
ID of the created database
Name of the database
Type of the database in terms of pricing model(Free, Pay as You Go or
Enterprise)
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database (active or deleted)
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS is always enabled for new databases
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/database \
-u 'EMAIL:API_KEY' \
-d '{"name":"myredis", "region":"global", "primary_region":"us-east-1", "read_regions":["us-west-1","us-west-2"], "tls": true}'
```
```python Python
import requests
data = '{"name":"myredis", "region":"global", "primary_region":"us-east-1", "read_regions":["us-west-1","us-west-2"], "tls":true}'
response = requests.post('https://api.upstash.com/v2/redis/database', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name":"myredis",
"region":"global",
"primary_region"":"us-east-1",
"read_regions":["us-west-1","us-west-2"],
"tls": true
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/database", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "93e3a3e-342c-4683-ba75-344c08ae143b",
"database_name": "global-test",
"database_type": "Pay as You Go",
"region": "global",
"type": "paid",
"port": 32559,
"creation_time": 1674596896,
"state": "active",
"password": "dd1803832a2746309e118373549e574d",
"user_email": "support@upstash.com",
"endpoint": "steady-stud-32559.upstash.io",
"tls": false,
"rest_token": "AX8vACQgOTMyY2UyYy00NjgzLWJhNzUtMzQ0YzA4YWUxNDNiZMyYTI3NDYzMDllMTE4MzczNTQ5ZTU3NGQ=",
"read_only_rest_token": "An8vACQg2UtMzQyYy00NjgzLWJhNzUtMzQ0YzA4YBVsUsyn19xDnTAvjbsiq79GRDrURNLzIYIOk="
}
```
# Delete Database
DELETE https://api.upstash.com/v2/redis/database/{id}
This endpoint deletes a database.
## URL Parameters
The ID of the database to be deleted
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/redis/database/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/redis/database/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/redis/database/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Disable Auto Upgrade
POST https://api.upstash.com/v2/redis/disable-autoupgrade/{id}
This endpoint disables Auto Upgrade for given database.
## URL Parameters
The ID of the database to disable auto upgrade
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/disable-autoupgrade/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/disable-autoupgrade/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/disable-autoupgrade/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
`json 200 OK "OK" `
# Disable Eviction
POST https://api.upstash.com/v2/redis/disable-eviction/{id}
This endpoint disables eviction for given database.
## URL Parameters
The ID of the database to disable eviction
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/disable-eviction/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/disable-eviction/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/disable-eviction/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Enable Auto Upgrade
POST https://api.upstash.com/v2/redis/enable-autoupgrade/{id}
This endpoint enables Auto Upgrade for given database.
## URL Parameters
The ID of the database to enable auto upgrade
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/enable-autoupgrade/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/enable-autoupgrade/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/enable-autoupgrade/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Enable Eviction
POST https://api.upstash.com/v2/redis/enable-eviction/{id}
This endpoint enables eviction for given database.
## URL Parameters
The ID of the database to enable eviction
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/enable-eviction/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/enable-eviction/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/enable-eviction/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Enable TLS
POST https://api.upstash.com/v2/redis/enable-tls/{id}
This endpoint enables tls on a database.
## URL Parameters
The ID of the database to rename
## Response Parameters
ID of the created database
Name of the database
Type of the database in terms of pricing model\ `Free`, `Pay as You Go` or
`Enterprise`
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database\ `active` or `deleted`
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/enable-tls/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/enable-tls/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/enable-tls/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"cluster_id": "dea1f974",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "49665a1710f3434d8be008aab50f38d2",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
}
```
# Get Database
GET https://api.upstash.com/v2/redis/database/{id}
This endpoint gets details of a database.
## Request
The ID of the database to reset password
Set to `hide` to remove credentials from the response.
## Response
ID of the created database
Name of the database
Type of the database in terms of pricing model(Free, Pay as You Go or
Enterprise)
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database (active or deleted)
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
Token for rest based communication with the database
Read only token for rest based communication with the database
Max number of concurrent clients can be opened on this database currently
Max size of a request that will be accepted by the database currently(in
bytes)
Total disk size limit that can be used for the database currently(in bytes)
Max size of an entry that will be accepted by the database currently(in bytes)
Max size of a memory the database can use(in bytes)
Max daily bandwidth can be used by the database(in bytes)
Max number of commands can be sent to the database per second
Total number of commands can be sent to the database
```shell curl
curl -X GET \
https://api.upstash.com/v2/redis/database/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/redis/database/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/redis/database/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "038a8e27c45e43068d5f186085399884",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
"rest_token": "AXW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUxMDM4YThlMjdjNDVlNDMwNjhkNWYxODYwODUzOTk4ODQ=",
"read_only_rest_token": "AnW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUx8sbmiEcMm9u7Ks5Qx-kHNiWr_f-iUXSIH8MlziKMnpY=",
"db_max_clients": 1000,
"db_max_request_size": 1048576,
"db_disk_threshold": 107374182400,
"db_max_entry_size": 104857600,
"db_memory_threshold": 1073741824,
"db_daily_bandwidth_limit": 53687091200,
"db_max_commands_per_second": 1000,
"db_request_limit": 9223372036854775808
}
```
# Get Database Stats
GET https://api.upstash.com/v2/redis/stats/{id}
This endpoint gets detailed stats of a database.
## URL Parameters
The ID of the database
## Response Parameters
Timestamp indicating when the measurement was taken.
Total number of connections momentarily
Timestamp indicating when the measurement was taken.
Total number keys exists in the database
Timestamp indicating when the measurement was taken.
Throughput seen on the database connections
Timestamp indicating when the measurement was taken.
Throughput seen on the database connections for write requests
Timestamp indicating when the measurement was taken.
Throughput seen on the database connections for read requests
Timestamp indicating when the measurement was taken.
Total amount of this usage of the database
Timestamp indicating when the measurement was taken.
Maximum server latency observed in the last hour
Timestamp indicating when the measurement was taken.
Minimum server latency observed in the last hour
Timestamp indicating when the measurement was taken.
The average read latency value measured in the last hour
Timestamp indicating when the measurement was taken.
The 99th percentile server read latency observed in the last hour
Timestamp indicating when the measurement was taken.
The average write latency value measured in the last hour
Timestamp indicating when the measurement was taken.
The 99th percentile server write latency observed in the last hour
Timestamp indicating when the measurement was taken.
Total number requests made to the database that are hit
Timestamp indicating when the measurement was taken.
Total number requests made to the database that are miss
Timestamp indicating when the measurement was taken.
Total number read requests made to the database
Timestamp indicating when the measurement was taken.
Total number write requests made to the database
Timestamp indicating when the measurement was taken.
Total number requests made to the database on the corresponding day
The total daily bandwidth usage (in bytes).
Timestamp indicating when the measurement was taken.
The total bandwidth size for that specific timestamp
A list of the days of the week for the measurement
Timestamp indicating when the measurement was taken.
The billing amount for that specific date.
Total number of daily produced commands
Total number of daily consumed commands
The total number of requests made in the current month.
The total number of read requests made in the current month.
The total number of write requests made in the current month.
The total amount of storage used (in bytes) in the current month.
Total cost of the database in the current month
Total number of produce commands in the current month
Total number of consume commands in the current month
```shell curl
curl -X GET \
https://api.upstash.com/v2/redis/stats/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/redis/stats/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/redis/stats/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"connection_count": [
{
"x": "2023-05-22 10:59:23.426 +0000 UTC",
"y": 320
},
...
],
"keyspace": [
{
"x": "2023-05-22 10:59:23.426 +0000 UTC",
"y": 344725564
},
...
],
"throughput": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 181.88333333333333
},
...
],
"produce_throughput": null,
"consume_throughput": null,
"diskusage": [
{
"x": "2023-05-22 10:59:23.426 +0000 UTC",
"y": 532362818323
},
...
],
"latencymean": [
{
"x": "2023-05-22 10:59:23.426 +0000 UTC",
"y": 0.176289
},
...
],
"read_latency_mean": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"read_latency_99": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"write_latency_mean": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"write_latency_99": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"hits": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"misses": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 0
},
...
],
"read": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 82.53333333333333
},
...
],
"write": [
{
"x": "2023-05-22 11:00:23.426 +0000 UTC",
"y": 99.35
},
...
],
"dailyrequests": [
{
"x": "2023-05-18 11:58:23.534505371 +0000 UTC",
"y": 68844080
},
...
],
"days": [
"Thursday",
"Friday",
"Saturday",
"Sunday",
"Monday"
],
"dailybilling": [
{
"x": "2023-05-18 11:58:23.534505371 +0000 UTC",
"y": 145.72694911244588
},
...
],
"dailybandwidth": 50444740913,
"bandwidths": [
{
"x": "2023-05-18 11:58:23.534505371 +0000 UTC",
"y": 125391861729
},
...
],
"dailyproduce": null,
"dailyconsume": null,
"total_monthly_requests": 1283856937,
"total_monthly_read_requests": 1034567002,
"total_monthly_write_requests": 249289935,
"total_monthly_storage": 445942383672,
"total_monthly_billing": 222.33902763855485,
"total_monthly_produce": 0,
"total_monthly_consume": 0
}
```
# List Databases
GET https://api.upstash.com/v2/redis/databases
This endpoint list all databases of user.
## Response Parameters
ID of the database
Name of the database
Type of the database in terms of pricing model\ `Free`, `Pay as You Go` or
`Enterprise`
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database\ `active` or `deleted`
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
Token for rest based communication with the database
Read only token for rest based communication with the database
```shell curl
curl -X GET \
https://api.upstash.com/v2/redis/databases \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/redis/databases', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/redis/databases", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "038a8e27c45e43068d5f186085399884",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
"rest_token": "AXW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUxMDM4YThlMjdjNDVlNDMwNjhkNWYxODYwODUzOTk4ODQ=",
"read_only_rest_token": "AnW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUx8sbmiEcMm9u7Ks5Qx-kHNiWr_f-iUXSIH8MlziKMnpY="
}
]
```
# Move To Team
POST https://api.upstash.com/v2/redis/move-to-team
This endpoint moves database under a target team
## URL Parameters
The ID of the target team
The ID of the database to be moved
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/move-to-team \
-u 'EMAIL:API_KEY' \
-d '{"team_id": "6cc32556-0718-4de5-b69c-b927693f9282","database_id": "67b6af16-acb2-4f00-9e38-f6cb9bee800d"}'
```
```python Python
import requests
data = '{"team_id": "6cc32556-0718-4de5-b69c-b927693f9282","database_id": "67b6af16-acb2-4f00-9e38-f6cb9bee800d"}'
response = requests.post('https://api.upstash.com/v2/redis/move-to-team', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"team_id": "6cc32556-0718-4de5-b69c-b927693f9282",
"database_id": "67b6af16-acb2-4f00-9e38-f6cb9bee800d"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/move-to-team", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
`json 200 OK "OK" `
# Rename Database
POST https://api.upstash.com/v2/redis/rename/{id}
This endpoint renames a database.
## URL Parameters
The ID of the database to be renamed
## Request Parameters
The new name of the database
## Response Parameters
ID of the created database
New name of the database
Type of the database in terms of pricing model\ `Free`, `Pay as You Go` or
`Enterprise`
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database\ `active` or `deleted`
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/reset-password/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/reset-password/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/reset-password/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"cluster_id": "dea1f974",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "49665a1710f3434d8be008aab50f38d2",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
}
```
```
```
# Reset Password
POST https://api.upstash.com/v2/redis/reset-password/{id}
This endpoint updates the password of a database.
## Request
The ID of the database to reset password
## Response
ID of the created database
Name of the database
Type of the database in terms of pricing model\ `Free`, `Pay as You Go` or
`Enterprise`
The region where database is hosted
Database port for clients to connect
Creation time of the database as Unix time
State of database\ `active` or `deleted`
Password of the database
Email or team id of the owner of the database
Endpoint URL of the database
TLS/SSL is enabled or not
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/reset-password/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/redis/reset-password/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/reset-password/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"database_id": "96ad0856-03b1-4ee7-9666-e81abd0349e1",
"cluster_id": "dea1f974",
"database_name": "MyRedis",
"database_type": "Pay as You Go",
"region": "eu-central-1",
"port": 30143,
"creation_time": 1658909671,
"state": "active",
"password": "49665a1710f3434d8be008aab50f38d2",
"user_email": "example@upstash.com",
"endpoint": "eu2-sought-mollusk-30143.upstash.io",
"tls": true,
"consistent": false,
"pool_id": "f886c7f3",
"rest_token": "AXW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUxNDk2NjVhMTcxMGYzNDM0ZDhiZTAwOGFhYjUwZjM4ZDI=",
"read_only_rest_token": "AnW_ASQgOTZhZDA4NTYtMDNiMS00ZWU3LTk2NjYtZTgxYWJkMDM0OWUxB5sRhCROkPsxozFcDzDgVGRAxUI7UUr0Y6uFB7jMIOI="
}
```
# Update Regions (Global)
POST https://api.upstash.com/v2/redis/update-regions/{id}
Update the regions of global database
## Request
The ID of your database
Array of read regions of the database
**Options:** `us-east-1`, `us-west-1`, `us-west-2`, `eu-west-1`, `eu-central-1`,
`ap-southeast-1`, `ap-southeast-2`, `sa-east-1`
```shell curl
curl -X POST \
https://api.upstash.com/v2/redis/update-regions/:id \
-u 'EMAIL:API_KEY' \
-d '{ "read_regions":["us-west-1"] }'
```
```python Python
import requests
data = '{"read_regions":["eu-west-1"]}'
response = requests.post('https://api.upstash.com/v2/redis/database', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{,
"read_regions":["us-west-1"]
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/redis/read-regions/:id", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Add Team Member
POST https://api.upstash.com/v2/teams/member
This endpoint adds a new team member to the specified team.
## Request Parameters
Id of the team to add the member to
Email of the new team member
Role of the new team member
**Options:** `admin`, `dev` or `finance`
## Response Parameters
ID of the created team
Name of the created team
Email of the new team member
Role of the new team member
```shell curl
curl -X POST \
https://api.upstash.com/v2/teams/member \
-u 'EMAIL:API_KEY' \
-d '{"team_id":"95849b27-40d0-4532-8695-d2028847f823","member_email":"example@upstash.com","member_role":"dev"}'
```
```python Python
import requests
data = '{"team_id":"95849b27-40d0-4532-8695-d2028847f823","member_email":"example@upstash.com","member_role":"dev"}'
response = requests.post('https://api.upstash.com/v2/teams/member', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"team_id":"95849b27-40d0-4532-8695-d2028847f823",
"member_email":"example@upstash.com",
"member_role":"dev"
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/teams/member", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"team_id": "95849b27-40d0-4532-8695-d2028847f823",
"team_name": "test_team_name",
"member_email": "example@upstash.com",
"member_role": "dev"
}
```
# Create Team
POST https://api.upstash.com/v2/team
This endpoint creates a new team.
## Request Parameters
Name of the new team
Whether to copy existing credit card information to team or not\ Options:
`true` or `false`
## Response Parameters
ID of the created team
Name of the created team
Whether creditcard information added to team during creation or not
```shell curl
curl -X POST \
https://api.upstash.com/v2/team \
-u 'EMAIL:API_KEY' \
-d '{"team_name":"myteam","copy_cc":true}'
```
```python Python
import requests
data = '{"team_name":"myteam","copy_cc":true}'
response = requests.post('https://api.upstash.com/v2/team', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"team_name":"myteam",
"copy_cc":true
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/team", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"team_id": "75b471f2-15a1-47b0-8ce5-12a57682bfc9",
"team_name": "test_team_name_2",
"copy_cc": true
}
```
# Delete Team
DELETE https://api.upstash.com/v2/team/{id}
This endpoint deletes a team.
## URL Parameters
The ID of the team to delete
## Response Parameters
"OK"
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/team/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.delete('https://api.upstash.com/v2/team/:id' auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/team/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Delete Team Member
DELETE https://api.upstash.com/v2/teams/member
This endpoint deletes a team member from the specified team.
## Request Parameters
Id of the team to add the member to
Email of the new team member
## Response Parameters
"OK"
```shell curl
curl -X DELETE \
https://api.upstash.com/v2/teams/member \
-u 'EMAIL:API_KEY' \
-d '{"team_id":"95849b27-40d0-4532-8695-d2028847f823","member_email":"example@upstash.com"}'
```
```python Python
import requests
data = '{"team_id":"95849b27-40d0-4532-8695-d2028847f823","member_email":"example@upstash.com"}'
response = requests.delete('https://api.upstash.com/v2/teams/member', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"team_id":"95849b27-40d0-4532-8695-d2028847f823",
"member_email":"example@upstash.com"
}`)
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/teams/member", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Get Team Members
GET https://api.upstash.com/v2/teams/{team_id}
This endpoint list all members of a team.
## Request Parameters
ID of the team
## Response Parameters
ID of the team
Name of the team
Email of the team member
Role of the team member
```shell curl
curl -X GET \
https://api.upstash.com/v2/teams/:id \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/teams/:id', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/teams/:id", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"team_id": "3423cb72-e50d-43ec-a9c0-f0f359941223",
"team_name": "test_team_name_2",
"member_email": "example@upstash.com",
"member_role": "dev"
},
{
"team_id": "3423cb72-e50d-43ec-a9c0-f0f359941223",
"team_name": "test_team_name_2",
"member_email": "example_2@upstash.com",
"member_role": "owner"
}
]
```
# List Teams
GET https://api.upstash.com/v2/teams
This endpoint lists all teams of user.
## Response Parameters
ID of the created team
Role of the user in this team
Name of the created team
Whether creditcard information added to team during creation or not
```shell curl
url -X GET \
https://api.upstash.com/v2/teams \
-u 'EMAIL:API_KEY'
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/teams', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/teams", nil)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"team_id": "95849b27-40d0-4532-8695-d2028847f823",
"team_name": "test_team_name",
"member_role": "owner",
"copy_cc": true
}
]
```
# Create Index
POST https://api.upstash.com/v2/vector/index
This endpoint creates an index.
## Request Parameters
Name of the index.
Region of the database.\
**Options:** `eu-west-1`, `us-east-1`,
Similarity function that's used to calculate the distance between two
vectors.\
**Options:** `COSINE`, `EUCLIDIAN`, `DOT_PRODUCT`
The amount of values in a single vector.
The payment plan of your index.\
**Options:** `payg`, `fixed`
The payment plan of your index.\
**Options:** `BGE_SMALL_EN_V1_5`, `BGE_BASE_EN_V1_5`, `BGE_LARGE_EN_V1_5`, `BGE_M3`, `BERT_BASE_UNCASED`, `UAE_Large_V1`, `ALL_MINILM_L6_V2`, `MXBAI_EMBED_LARGE_V1`
## Response Parameters
The associated ID of the owner of the index
Unique ID of the index
The name of the index.
Similarity function that's used to calculate the distance between two
vectors
The amount of values in a single vector
The REST endpoint of the index
The REST authentication token for the index
The REST authentication read only token for the index
The payment plan of the index
The region where the index is currently deployed.
The number of maximum that your index can contain.
The number of maximum update operations you can perform in a day. Only upsert operations are included in update count.
The number of maximum query operations you can perform in a day. Only query operations are included in query count.
The maximum amount of monthly bandwidth for the index. Unit is bytes. `-1` if the limit is unlimited.
The number of maximum write operations you can perform per second. Only upsert operations are included in write count.
The number of maximum query operations you can perform per second. Only query operations are included in query count.
The number of maximum vectors in a read operation. Query and fetch operations are included in read operations.
The number of maximum vectors in a write operation. Only upsert operations are included in write operations.
The amount of maximum size for the total metadata sizes in your index.
Monthly pricing of your index. Only available for fixed and pro plans.
The creation time of the vector index in UTC as unix timestamp.
The predefined embedding model to vectorize your plain text.
```shell curl
curl -X POST https://api.upstash.com/v2/vector/index \
-u 'EMAIL:API_KEY' \
-d '{
"name": "myindex",
"region": "eu-west-1",
"similarity_function": "COSINE",
"dimension_count": 1536
}' \
```
```javascript JavaScript
const axios = require('axios');
const postData = {
name: "myindex",
region: "eu-west-1",
similarity_function: "COSINE",
dimension_count: 1536,
};
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
headers: {
'Content-Type': 'application/json',
},
};
axios.post('https://api.upstash.com/v2/vector/index', postData, config)
.then((response) => {
console.log('Response:', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
data = '{"name":"myindex","region":"eu-west-1","similarity_function":"COSINE","dimension_count":1536}'
response = requests.post('https://api.upstash.com/v2/vector/index', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader('{
"name":"myindex",
"region":"eu-west-1",
"similarity_function":"COSINE",
"dimension_count":1536}'
)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"customer_id": "test@upstash.com",
"id": "0639864f-ece6-429c-8118-86a287b0e808",
"name": "myindex",
"similarity_function": "COSINE",
"dimension_count": 5,
"embedding_model": "BGE_SMALL_EN_V1_5"
"endpoint": "test-index-3814-eu1-vector.upstash.io",
"token": "QkZGMk5heGltdW0tdXBkYXRlZC0zNzM1LWV1MkFkbWlOeGZGZ1J5Wm1GdE5tTXhNQzB1TmpsbExUb3hOekF0TVRJbFpqMTJORFUxTm1GZw==",
"read_only_token": "QkZGRk1heGltdW0tdXBkYXRlZC0zNzM1LWV1MnJlYWRvbmx5TmtaZ05qS3JNWVV0Wm1aZ01pMDBOV1poTHRob05qY3RNR0U0TkRjejNqazJU"
"type": "paid",
"region": "eu-west-1",
"max_vector_count": 400000000,
"max_daily_updates": -1,
"max_daily_queries": -1,
"max_monthly_bandwidth": -1,
"max_writes_per_second": 1000,
"max_query_per_second": 1000,
"max_reads_per_request": 1000,
"max_writes_per_request": 1000,
"max_total_metadata_size": 53687091200,
"creation_time": 1707313165
}
```
# Delete Index
DELETE https://api.upstash.com/v2/vector/index/{id}
This endpoint deletes an index.
## Request Parameters
The unique ID of the index to be deleted.
## Response Parameters
`"OK"` on successfull deletion operation.
```shell curl
curl -X DELETE https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808 \
-u 'EMAIL:API_KEY'
```
```javascript JavaScript
const axios = require('axios');
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
};
const url = 'https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808';
axios.delete(url, config)
.then((response) => {
console.log('Deleted successfully', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
id="0639864f-ece6-429c-8118-86a287b0e808"
response = requests.delete(f"https://api.upstash.com/v2/vector/index/{id}", auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("DELETE", "https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Get Index
GET https://api.upstash.com/v2/vector/index/{id}
This endpoint returns the data associated to a index.
## Request Parameters
The unique ID of the index to fetch.
## Response Parameters
The associated ID of the owner of the index
Unique ID of the index
The name of the index.
Similarity function that's used to calculate the distance between two
vectors
The amount of values in a single vector
The REST endpoint of the index
The REST authentication token for the index
The REST authentication read only token for the index
The payment plan of the index
The region where the index is currently deployed.
The number of maximum that your index can contain.
The number of maximum update operations you can perform in a day. Only upsert operations are included in update count.
The number of maximum query operations you can perform in a day. Only query operations are included in query count.
The maximum amount of monthly bandwidth for the index. Unit is bytes. `-1` if the limit is unlimited.
The number of maximum write operations you can perform per second. Only upsert operations are included in write count.
The number of maximum query operations you can perform per second. Only query operations are included in query count.
The number of maximum vectors in a read operation. Query and fetch operations are included in read operations.
The number of maximum vectors in a write operation. Only upsert operations are included in write operations.
The amount of maximum size for the total metadata sizes in your index.
Monthly pricing of your index. Only available for fixed and pro plans.
The creation time of the vector index in UTC as unix timestamp.
```shell curl
curl -X GET https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808 \
-u 'EMAIL:API_KEY' \
```
```javascript JavaScript
const axios = require('axios');
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
};
const url = 'https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808';
axios.get(url, config)
.then((response) => {
console.log(response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
id = "0639864f-ece6-429c-8118-86a287b0e808"
response = requests.post(f"https://api.upstash.com/v2/vector/index/{id}", auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index/0639864f-ece6-429c-8118-86a287b0e808", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
{
"customer_id": "test@upstash.com",
"id": "0639864f-ece6-429c-8118-86a287b0e808",
"name": "myindex",
"similarity_function": "COSINE",
"dimension_count": 5,
"endpoint": "test-index-3814-eu1-vector.upstash.io",
"token": "QkZGMk5heGltdW0tdXBkYXRlZC0zNzM1LWV1MkFkbWlOeGZGZ1J5Wm1GdE5tTXhNQzB1TmpsbExUb3hOekF0TVRJbFpqMTJORFUxTm1GZw==",
"read_only_token": "QkZGRk1heGltdW0tdXBkYXRlZC0zNzM1LWV1MnJlYWRvbmx5TmtaZ05qS3JNWVV0Wm1aZ01pMDBOV1poTHRob05qY3RNR0U0TkRjejNqazJU"
"type": "paid",
"region": "eu-west-1",
"max_vector_count": 400000000,
"max_daily_updates": -1,
"max_daily_queries": -1,
"max_monthly_bandwidth": -1,
"max_writes_per_second": 1000,
"max_query_per_second": 1000,
"max_reads_per_request": 1000,
"max_writes_per_request": 1000,
"max_total_metadata_size": 53687091200,
"creation_time": 1707313165
}
```
# List Indices
GET https://api.upstash.com/v2/vector/index/
This endpoint returns the data related to all indices of an account as a list.
## Request Parameters
This endpoint doesn't require any additional data.
## Response Parameters
The associated ID of the owner of the index
Unique ID of the index
The name of the index.
Similarity function that's used to calculate the distance between two
vectors
The amount of values in a single vector
The REST endpoint of the index
The payment plan of the index
The region where the index is currently deployed.
The number of maximum that your index can contain.
The number of maximum update operations you can perform in a day. Only upsert operations are included in update count.
The number of maximum query operations you can perform in a day. Only query operations are included in query count.
The maximum amount of monthly bandwidth for the index. Unit is bytes. `-1` if the limit is unlimited.
The number of maximum write operations you can perform per second. Only upsert operations are included in write count.
The number of maximum query operations you can perform per second. Only query operations are included in query count.
The number of maximum vectors in a read operation. Query and fetch operations are included in read operations.
The number of maximum vectors in a write operation. Only upsert operations are included in write operations.
The amount of maximum size for the total metadata sizes in your index.
Monthly pricing of your index. Only available for fixed and pro plans.
The creation time of the vector index in UTC as unix timestamp.
```shell curl
curl -X GET \
https://api.upstash.com/v2/vector/index \
-u 'EMAIL:API_KEY' \
```
```javascript JavaScript
const axios = require('axios');
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
};
const url = 'https://api.upstash.com/v2/vector/index';
axios.get(url, config)
.then((response) => {
console.log(response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
response = requests.get('https://api.upstash.com/v2/vector/index', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("GET", "https://api.upstash.com/v2/vector/index")
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
[
{
"customer_id": "test@upstash.com",
"id": "0639864f-ece6-429c-8118-86a287b0e808",
"name": "myindex",
"similarity_function": "COSINE",
"dimension_count": 5,
"endpoint": "test-index-3814-eu1-vector.upstash.io",
"token": "QkZGMk5heGltdW0tdXBkYXRlZC0zNzM1LWV1MkFkbWlOeGZGZ1J5Wm1GdE5tTXhNQzB1TmpsbExUb3hOekF0TVRJbFpqMTJORFUxTm1GZw==",
"read_only_token": "QkZGRk1heGltdW0tdXBkYXRlZC0zNzM1LWV1MnJlYWRvbmx5TmtaZ05qS3JNWVV0Wm1aZ01pMDBOV1poTHRob05qY3RNR0U0TkRjejNqazJU"
"type": "paid",
"region": "eu-west-1",
"max_vector_count": 400000000,
"max_daily_updates": -1,
"max_daily_queries": -1,
"max_monthly_bandwidth": -1,
"max_writes_per_second": 1000,
"max_query_per_second": 1000,
"max_reads_per_request": 1000,
"max_writes_per_request": 1000,
"max_total_metadata_size": 53687091200,
"creation_time": 1707313165
}
]
```
# Rename Index
POST https://api.upstash.com/v2/vector/index/{id}/rename
This endpoint is used to change the name of an index.
## Request Parameters
The unique ID of the index to be deleted.
The new name of the index.
## Response Parameters
`"OK"` on successfull deletion operation.
```shell curl
curl -X POST \
https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/rename \
-u 'EMAIL:API_KEY' \
-d '{"name":"myindex"}'
```
```javascript JavaScript
const axios = require('axios');
const postData = {
name: "myindex",
};
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
headers: {
'Content-Type': 'application/json',
},
};
const url = 'https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/rename';
axios.post(url, postData, config)
.then((response) => {
console.log('Rename successful:', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
data = '{"name":"myindex"}'
response = requests.post('https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/rename', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"name":"myindex",
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/rename", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Reset Index Passwords
POST https://api.upstash.com/v2/vector/index/{id}/reset-password
This endpoint is used to reset regular and readonly tokens of an index.
## Request Parameters
The unique ID of the index to reset the password for..
## Response Parameters
`"OK"` on successfull deletion operation.
```shell curl
curl -X POST \
https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/reset-password \
-u 'EMAIL:API_KEY' \
```
```javascript JavaScript
const axios = require('axios');
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
headers: {
'Content-Type': 'application/json',
},
};
const url = 'https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/reset-password';
axios.post(url, {}, config) // Sending an empty object as data since no payload is required.
.then((response) => {
console.log('Operation successful:', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
response = requests.post('https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/reset-password', auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/reset-password")
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Set Index Plan
POST https://api.upstash.com/v2/vector/index/{id}/setplan
This endpoint is used to change the plan of an index.
## Request Parameters
The unique ID of the index to be deleted.
The new plan for the index.
## Response Parameters
`"OK"` on successfull deletion operation.
```shell curl
curl -X POST \
https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/setplan \
-u 'EMAIL:API_KEY' \
-d '{"target_plan":"fixed"}'
```
```javascript JavaScript
const axios = require('axios');
const postData = {
target_plan: "fixed",
};
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
headers: {
'Content-Type': 'application/json',
},
};
const url = 'https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/setplan';
axios.post(url, postData, config)
.then((response) => {
console.log('Plan set successfully:', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
data = '{"target_plan":"fixed"}'
response = requests.post('https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/setplan', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"target_plan":"fixed",
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/setplan", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Transfer Index
POST https://api.upstash.com/v2/vector/index/{id}/transfer
This endpoint is used to transfer an index to another team.
## Request Parameters
The unique ID of the index to be deleted.
The ID of the target account. If the target is a team, then use the format `team:`, else if the target is your personal account use the format ``.
## Response Parameters
`"OK"` on successfull deletion operation.
```shell curl
curl -X POST \
https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/transfer \
-u 'EMAIL:API_KEY' \
-d '{"target_account":"team:team-id-1"}'
```
```javascript JavaScript
const axios = require('axios');
const postData = {
target_account: "team:team-id-1",
};
const config = {
auth: {
username: 'EMAIL',
password: 'API_KEY',
},
headers: {
'Content-Type': 'application/json',
},
};
const url = 'https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/transfer';
axios.post(url, postData, config)
.then((response) => {
console.log('Transfer successful:', response.data);
})
.catch((error) => {
console.error('Error:', error);
});
```
```python Python
import requests
data = '{"target_account":"team:team-id-1"}'
response = requests.post('https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/transfer', data=data, auth=('EMAIL', 'API_KEY'))
response.content
```
```go Go
client := &http.Client{}
var data = strings.NewReader(`{
"target_account":"team:team-id-1",
}`)
req, err := http.NewRequest("POST", "https://api.upstash.com/v2/vector/index/14841111-b834-4788-925c-04ab156d1123/transfer", data)
if err != nil {
log.Fatal(err)
}
req.SetBasicAuth("email", "api_key")
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
bodyText, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", bodyText);
```
```json 200 OK
"OK"
```
# Overview
The Upstash Pulumi Provider lets you manage [Upstash](https://upstash.com) Redis and Kafka resources programmatically.
You can find the Github Repository [here](https://github.com/upstash/pulumi-upstash).
## Installing
This package is available for several languages/platforms:
### Node.js (JavaScript/TypeScript)
To use from JavaScript or TypeScript in Node.js, install using either `npm`:
```bash
npm install @upstash/pulumi
```
or `yarn`:
```bash
yarn add @upstash/pulumi
```
### Python
To use from Python, install using `pip`:
```bash
pip install upstash_pulumi
```
### Go
To use from Go, use `go get` to grab the latest version of the library:
```bash
go get github.com/upstash/pulumi-upstash/sdk/go/...
```
## Configuration
The following configuration points are available for the `upstash` provider:
* `upstash:apiKey` (environment: `UPSTASH_API_KEY`) - the API key for `upstash`. Can be obtained from the [console](https://console.upstash.com).
* `upstash:email` (environment: `UPSTASH_EMAIL`) - owner email of the resources
## Some Examples
### TypeScript:
```typescript
import * as pulumi from "@pulumi/pulumi";
import * as upstash from "@upstash/pulumi";
// multiple redis databases in a single for loop
for (let i = 0; i < 5; i++) {
new upstash.RedisDatabase("mydb" + i, {
databaseName: "pulumi-ts-db" + i,
region: "eu-west-1",
tls: true,
});
}
```
### Go:
```go
package main
import (
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"github.com/upstash/pulumi-upstash/sdk/go/upstash"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
createdTeam, err := upstash.NewTeam(ctx, "exampleTeam", &upstash.TeamArgs{
TeamName: pulumi.String("pulumi go team"),
CopyCc: pulumi.Bool(false),
TeamMembers: pulumi.StringMap{
"": pulumi.String("owner"),
"": pulumi.String("dev"),
},
})
if err != nil {
return err
}
return nil
})
}
```
### Python:
```python
import pulumi
import upstash_pulumi as upstash
created_cluster = upstash.KafkaCluster(
resource_name="myCluster",
cluster_name="pulumi-python-cluster",
multizone=False,
region="eu-west-1"
)
```
# null
# upstash_kafka_cluster_data
```hcl example.tf
data "upstash_kafka_cluster_data" "clusterData" {
cluster_id = resource.upstash_kafka_cluster.exampleCluster.cluster_id
}
```
## Schema
### Required
Unique Cluster ID for requested cluster
### Read-Only
Name of the cluster
Creation time of the cluster
The ID of this resource.
Max Message Size for the cluster
Max Messages Per Second for the cluster
Max Partitions for the cluster
Max Retention Size of the cluster
Max Retention Time of the cluster
Whether multizone replication is enabled
Password for the cluster
Region of the cluster. Possible values (may change) are: `eu-west-1`,
`us-east-1`
Name of the cluster
Current state of the cluster
Possible values: `active` or `deleted`
TCP Endpoint of the cluster
Type of the cluster
Base64 encoded username for the cluster
# upstash_kafka_connector_data
```hcl example.tf
data "upstash_kafka_connector_data" "kafkaConnectorData" {
topic_id = resource.upstash_kafka_connector.exampleKafkaConnector.connector_id
}
```
## Schema
### Required
Unique Connector ID for created connector
### Read-Only
Unique Cluster ID for cluster that the connector is tied to
Connector class of the connector
State error message of the connector
Creation time of the connector
Encoded username for the connector
The ID of this resource.
Name of the connector
Properties of the connector. Custom config for different types of connectors.
Encrypted properties for the connector
State of the connector
State error message of the connector
Tasks of the connector
Topics for the connector
TTL for the connector
User password for the connector
# upstash_kafka_credential_data
```hcl example.tf
data "upstash_kafka_credential_data" "kafkaCredentialData" {
credential_id = upstash_kafka_credential.exampleKafkaCredential.credential_id
}
```
## Schema
### Required
Unique ID of the kafka credential
### Read-Only
ID of the kafka cluster
Creation time of the credential
Name of the kafka credential
The ID of this resource.
Password to be used in authenticating to the cluster
Permission scope given to the kafka credential
State of the credential. `active` or `deleted`
Name of the kafka topic
Username to be used for the kafka credential
# upstash_kafka_topic_data
```hcl example.tf
data "upstash_kafka_topic_data" "kafkaTopicData" {
topic_id = resource.upstash_kafka_topic.exampleKafkaTopic.topic_id
}
```
## Schema
### Required
Unique Topic ID for requested kafka topic
### Read-Only
Cleanup policy will be used in the topic (`compact` or `delete`)
ID of the cluster the topic will be deployed in
Creation time of the topic
The ID of this resource.
Max message size in the topic
Whether multizone replication is enabled
The number of partitions the topic will have
Password to be used in authenticating to the cluster
Region of the kafka topic. Possible values (may change) are: `eu-west-1`,
`us-east-1`
REST Endpoint of the kafka topic
The number of partitions the topic will have
Retention time of messages in the topic
State of the credential. `active` or `deleted`
TCP Endpoint of the kafka topic
Unique Cluster ID for created topic
Base64 encoded username to be used in authenticating to the cluster
# upstash_qstash_endpoint_data
```hcl example.tf
data "upstash_qstash_endpoint_data" "exampleQStashEndpointData" {
endpoint_id = resource.upstash_qstash_endpoint.exampleQStashEndpoint.endpoint_id
}
```
## Schema
### Required
Topic Id that the endpoint is added to
### Read-Only
Unique QStash Endpoint ID
The ID of this resource.
Unique QStash Topic Name for Endpoint
# upstash_qstash_schedule_data
```hcl example.tf
data "upstash_qstash_schedule_data" "exampleQStashScheduleData" {
schedule_id = resource.upstash_qstash_schedule.exampleQStashSchedule.schedule_id
}
```
## Schema
### Required
Unique QStash Schedule ID for requested schedule
### Read-Only
Body to send for the POST request in string format. Needs escaping () double
quotes.
Creation time for QStash Schedule
Cron string for QStash Schedule
Destination for QStash Schedule. Either Topic ID or valid URL
Forward headers to your API
The ID of this resource.
Start time for QStash Scheduling.
Retries for QStash Schedule requests.
# upstash_qstash_topic_data
```hcl example.tf
data "upstash_qstash_topic_data" "exampleQstashTopicData" {
topic_id = resource.upstash_qstash_topic.exampleQstashTopic.topic_id
}
```
## Schema
### Required
Unique QStash Topic ID for requested topic
### Read-Only
Endpoints for the QStash Topic
The ID of this resource.
Name of the QStash Topic
# upstash_redis_database_data
```hcl example.tf
data "upstash_redis_database_data" "exampleDBData" {
database_id = resource.upstash_redis_database.exampleDB.database_id
}
```
## Schema
### Required
Unique Database ID for created database
### Read-Only
Upgrade to higher plans automatically when it hits quotas
Creation time of the database
Name of the database
Type of the database
Daily bandwidth limit for the database
Disk threshold for the database
Max clients for the database
Max commands per second for the database
Max entry size for the database
Max request size for the database
Memory threshold for the database
Database URL for connection
The ID of this resource.
Password of the database
Port of the endpoint
Primary region for the database (Only works if region='global'. Can be one of
\[us-east-1, us-west-1, us-west-2, eu-central-1, eu-west-1, sa-east-1,
ap-southeast-1, ap-southeast-2])
Rest Token for the database.
Read regions for the database (Only works if region='global' and
primary\_region is set. Can be any combination of \[us-east-1, us-west-1,
us-west-2, eu-central-1, eu-west-1, sa-east-1, ap-southeast-1,
ap-southeast-2], excluding the one given as primary.)
Region of the database. Possible values are: `global`, `eu-west-1`,
`us-east-1`, `us-west-1`, `ap-northeast-1` , `eu-central1`
Rest Token for the database.
State of the database
When enabled, data is encrypted in transit. (If changed to false from true,
results in deletion and recreation of the resource)
User email for the database
# upstash_team_data
```hcl example.tf
data "upstash_team_data" "teamData" {
team_id = resource.upstash_team.exampleTeam.team_id
}
```
## Schema
### Required
Unique Cluster ID for created cluster
### Read-Only
Whether Credit Card is copied
The ID of this resource.
Members of the team. (Owner must be specified, which is the owner of the api
key.)
Name of the team
# Overview
The Upstash Terraform Provider lets you manage Upstash Redis and Kafka resources programmatically.
You can find the Github Repository for the Terraform Provider [here](https://github.com/upstash/terraform-provider-upstash).
## Installation
```hcl
terraform {
required_providers {
upstash = {
source = "upstash/upstash"
version = "x.x.x"
}
}
}
provider "upstash" {
email = var.email
api_key = var.api_key
}
```
`email` is your registered email in Upstash.
`api_key` can be generated from Upstash Console. For more information please check our [docs](https://docs.upstash.com/howto/developerapi).
## Create Database Using Terraform
Here example code snippet that creates database:
```hcl
resource "upstash_redis_database" "redis" {
database_name = "db-name"
region = "eu-west-1"
tls = "true"
multi_zone = "false"
}
```
## Import Resources From Outside of Terraform
To import resources created outside of the terraform provider, simply create the resource in .tf file as follows:
```hcl
resource "upstash_redis_database" "redis" {}
```
after this, you can run the command:
```
terraform import upstash_redis_database.redis
```
Above example is given for an Upstash Redis database. You can import all of the resources by changing the resource type and providing the resource id.
You can check full spec and [doc from here](https://registry.terraform.io/providers/upstash/upstash/latest/docs).
## Support, Bugs Reports, Feature Requests
If you need support then you can ask your questions Upstash Team in [upstash.com](https://upstash.com) chat widget.
There is also discord channel available for community. [Please check here](https://docs.upstash.com/help/support) for more information.
# upstash_kafka_cluster
Create and manage Kafka clusters on Upstash.
```hcl example.tf
resource "upstash_kafka_cluster" "exampleCluster" {
cluster_name = "TerraformCluster"
region = "eu-west-1"
multizone = false
}
```
## Schema
### Required
Name of the cluster
Region of the cluster. Possible values (may change) are: `eu-west-1`,
`us-east-1`
### Optional
Whether cluster has multizone attribute
### Read-Only
Unique cluster ID for created cluster
Creation time of the cluster
The ID of this resource.
Max message size for the cluster
Max messages per second for the cluster
Max partitions for the cluster
Max retention size of the cluster
Max retention time of the cluster
Password for the cluster
REST endpoint of the cluster
State, where the cluster is originated
TCP endpoint of the cluster
Type of the cluster
Base64 encoded username for the cluster
# upstash_kafka_connector
Create and manage Kafka Connectors.
```hcl example.tf
# Not necessary if the topic belongs to an already created cluster.
resource "upstash_kafka_cluster" "exampleKafkaCluster" {
cluster_name = "Terraform_Upstash_Cluster"
region = "eu-west-1"
multizone = false
}
resource "upstash_kafka_topic" "exampleKafkaTopic" {
topic_name = "TerraformTopic"
partitions = 1
retention_time = 625135
retention_size = 725124
max_message_size = 829213
cleanup_policy = "delete"
# Here, you can use the newly created kafka_cluster resource (above) named exampleKafkaCluster.
# And use its ID so that the topic binds to it.
# Alternatively, provide the ID of an already created cluster.
cluster_id = resource.upstash_kafka_cluster.exampleKafkaCluster.cluster_id
}
resource "upstash_kafka_connector" "exampleKafkaConnector" {
name = var.connector_name
cluster_id = upstash_kafka_cluster.exampleKafkaCluster.cluster_id
properties = {
"collection": "user123",
"connection.uri": "mongodb+srv://test:test@cluster0.fohyg7p.mongodb.net/?retryWrites=true&w=majority",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"database": "myshinynewdb2",
"topics": "${upstash_kafka_topic.exampleKafkaTopic.topic_name}"
}
# OPTIONAL: change between restart-running-paused
# running_state = "running"
}
```
## Schema
### Required
Unique cluster ID related to the connector
Name of the connector
Properties that the connector will have. Please check the documentation of the
related connector.
### Optional
Running state of the connector
### Read-Only
Unique connector ID for created connector
Creation of the connector
The ID of this resource.
# upstash_kafka_credential
Create and manage credentials for a kafka cluster.
```hcl example.tf
resource "upstash_kafka_cluster" "exampleKafkaCluster" {
cluster_name = var.cluster_name
region = var.region
multizone = var.multizone
}
resource "upstash_kafka_topic" "exampleKafkaTopic" {
topic_name = var.topic_name
partitions = var.partitions
retention_time = var.retention_time
retention_size = var.retention_size
max_message_size = var.max_message_size
cleanup_policy = var.cleanup_policy
cluster_id = resource.upstash_kafka_cluster.exampleKafkaCluster.cluster_id
}
resource "upstash_kafka_credential" "exampleKafkaCredential" {
cluster_id = upstash_kafka_cluster.exampleKafkaCluster.cluster_id
credential_name = "credentialFromTerraform"
topic = upstash_kafka_topic.exampleKafkaTopic.topic_name
permissions = "ALL"
}
resource "upstash_kafka_credential" "exampleKafkaCredentialAllTopics" {
cluster_id = upstash_kafka_cluster.exampleKafkaCluster.cluster_id
credential_name = "credentialFromTerraform"
topic = "*"
permissions = "ALL"
}
```
## Schema
### Required
Name of the cluster
Name of the kafka credential
Properties that the connector will have. Please check the documentation of the
related connector.
Name of the kafka topic
### Read-Only
Creation time of the credential
Unique ID of the kafka credential
The ID of this resource.
Password to be used in authenticating to the cluster
State of the credential. `active` or `deleted`
Username to be used for the kafka credential
# upstash_kafka_topic
Create and manage Kafka topics in Upstash.
```hcl example.tf
# Not necessary if the topic belongs to an already created cluster.
resource "upstash_kafka_cluster" "exampleKafkaCluster" {
cluster_name = "Terraform_Upstash_Cluster"
region = "eu-west-1"
multizone = false
}
resource "upstash_kafka_topic" "exampleKafkaTopic" {
topic_name = "TerraformTopic"
partitions = 1
retention_time = 625135
retention_size = 725124
max_message_size = 829213
cleanup_policy = "delete"
# Here, you can use the newly created kafka_cluster resource (above) named exampleKafkaCluster.
# And use its ID so that the topic binds to it.
# Alternatively, provide the ID of an already created cluster.
cluster_id = resource.upstash_kafka_cluster.exampleKafkaCluster.cluster_id
}
```
## Schema
### Required
Cleanup policy will be used in the topic. `compact` or `delete`
ID of the cluster the topic will be deployed in
Max message size in the topic
The number of partitions the topic will have
Retention size of the messages in the topic
Retention time of messages in the topic
Name of the topic
### Read-Only
Creation time of the topic
The ID of this resource.
Whether multizone replication is enabled
Password to be used in authenticating to the cluster
Region of the kafka topic
REST endpoint of the kafka topic
State of the credential. `active` or `deleted`
TCP endpoint of the kafka topic
Unique cluster ID for created topic
Base64 encoded username to be used in authenticating to the cluster
# upstash_qstash_endpoint
Create and manage QStash endpoints.
```hcl example.tf
resource "upstash_qstash_endpoint" "exampleQStashEndpoint" {
url = "https://***.***"
topic_id = resource.upstash_qstash_topic.exampleQstashTopic.topic_id
}
```
## Schema
### Required
Topic ID that the endpoint is added to
URL of the endpoint
### Read-Only
Unique QStash endpoint ID
The ID of this resource.
Unique QStash topic name for endpoint
# upstash_qstash_schedule
Create and manage QStash schedules.
```hcl example.tf
resource "upstash_qstash_schedule" "exampleQStashSchedule" {
destination = resource.upstash_qstash_topic.exampleQstashTopic.topic_id
cron = "* * * * */2"
# or simply provide a link
# destination = "https://***.***"
}
```
## Schema
### Required
Cron string for QStash Schedule
Destination for QStash Schedule. Either Topic ID or valid URL
### Optional
Body to send for the POST request in string format. Needs escaping () double
quotes.
Callback URL for QStash Schedule.
Content based deduplication for QStash Scheduling.
Content type for QStash Scheduling.
Deduplication ID for QStash Scheduling.
Delay for QStash Schedule.
Forward headers to your API
Start time for QStash Scheduling.
Retries for QStash Schedule requests.
### Read-Only
Creation time for QStash Schedule.
The ID of this resource.
Unique QStash Schedule ID for requested schedule
# upstash_qstash_topic
Create and manage QStash topics
```hcl example.tf
resource "upstash_qstash_topic" "exampleQStashTopic" {
name = "exampleQStashTopicName"
}
```
## Schema
### Required
Name of the QStash topic
### Read-Only
Endpoints for the QStash topic
The ID of this resource.
Unique QStash topic ID for requested topic
# upstash_redis_database
Create and manage Upstash Redis databases.
```hcl example.tf
resource "upstash_redis_database" "exampleDB" {
database_name = "Terraform DB6"
region = "eu-west-1"
tls = "true"
multizone = "true"
}
```
## Schema
### Required
Name of the database
Region of the database. Possible values are: `global`, `eu-west-1`,
`us-east-1`, `us-west-1`, `ap-northeast-1` , `eu-central1`
### Optional
Upgrade to higher plans automatically when it hits quotas
Enable eviction, to evict keys when your database reaches the max size
Primary region for the database (Only works if region='global'. Can be one of
\[us-east-1, us-west-1, us-west-2, eu-central-1, eu-west-1, sa-east-1,
ap-southeast-1, ap-southeast-2])
Read regions for the database (Only works if region='global' and
primary\_region is set. Can be any combination of \[us-east-1, us-west-1,
us-west-2, eu-central-1, eu-west-1, sa-east-1, ap-southeast-1,
ap-southeast-2], excluding the one given as primary.)
When enabled, data is encrypted in transit. (If changed to false from true,
results in deletion and recreation of the resource)
### Read-Only
Creation time of the database
Unique Database ID for created database
Type of the database
Daily bandwidth limit for the database
Disk threshold for the database
Max clients for the database
Max commands per second for the database
Max entry size for the database
Max request size for the database
Memory threshold for the database
Database URL for connection
The ID of this resource.
Password of the database
Port of the endpoint
Rest Token for the database.
Rest Token for the database.
State of the database
User email for the database
# upstash_team
Create and manage teams on Upstash.
```hcl example.tf
resource "upstash_team" "exampleTeam" {
team_name = "TerraformTeam"
copy_cc = false
team_members = {
# Owner is the owner of the api_key.
"X@Y.Z": "owner",
"A@B.C": "dev",
"E@E.F": "finance",
}
}
```
## Schema
### Required
Whether Credit Card is copied
Members of the team. (Owner must be specified, which is the owner of the api
key.)
Name of the team
### Read-Only
The ID of this resource.
Unique Cluster ID for created cluster
# Get Started
Create a Redis Database within seconds
Create a Vector Database for AI & LLMs
Publish your first message
Write durable serverless functions
## Concepts
Upstash is serverless. You don't need to provision any infrastructure. Just
create a database and start using it.
Price scales to zero. You don't pay for idle or unused resources. You pay
only for what you use.
Upstash Redis replicates your data for the best latency all over the world.
Upstash REST APIs enable access from all types of runtimes.
## Get In touch
Follow us on X for the latest news and updates.
Join our Discord Community and ask your questions to the team and other
developers.
Raise an issue on GitHub.
# Aiven Http Sink Connector
Aiven Http Sink Connector calls a given http endpoint for each item published to
your Kafka Topics.
In this guide, we will walk you through creating an Aiven Http Sink Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Prepare the Test Environment
If you already have an HTTP endpoint that you will call, you can skip this step and continue from the [Create The Connector](#create-the-connector) section.
We will use [webhook.site](https://webhook.site/) to verify if the connector is
working. Go to [webhook.site](https://webhook.site/) and copy the unique url to
pass it in the connector config later.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Aiven Http Sink Connector**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created an Aiven Http Sink Connector.
As you put data into your selected topics, the requests should be visible in
[webhook.site](https://webhook.site/)
# Supported Drivers
These are the currently supported databases and JDBC drivers:
| Database | JDBC Driver |
| ------------- | ----------------------------- |
| PostgreSQL | postgresql-42.3.3 |
| MySQL | mysql-connector-java-8.0.28 |
| MS SQL Server | mssql-jdbc-10.2.0 |
| Snowflake | snowflake-jdbc-3.13.16 |
| ClickHouse | clickhouse-jdbc-0.3.2-patch11 |
| SQLite | sqlite-jdbc-3.36.0.3 |
# Aiven JDBC Sink Connector
Aiven JDBC Sink Connector allows you to continuously store the data from your
Kafka Topics in any sql dialect relational database like Mysql,PostgreSql etc.
In this guide, we will walk you through creating an Aiven JDBC Sink Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Aiven JDBC Connector Sink**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created an Aiven JDBC Sink Connector.
As you put data into your selected topics, the data will be written into your
relational database.
## Supported Databases
# Aiven JDBC Source Connector
Aiven JDBC Source Connector allows you to capture any changes in your SQL
dialect relational databases and store them as messages on your Kafka topics. In
this guide, we will walk you through creating the Aiven JDBC Source Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Aiven JDBC Connector Source**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created an Aiven JDBC Source Connector. As you put
data into your relational database, your topics will be created and populated
with new data.
You can go to the **Messages** section to see latest events as they are coming
from Kafka.
## Supported Databases
# Aiven OpenSearch Sink Connector
Aiven OpenSearch Sink Connector allows you to continuously store the data from
your Kafka Topics in any OpenSearch compatible product like Amazon OpenSearch,
Elasticsearch, etc.
In this guide, we will walk you through creating an Aiven OpenSearch Sink
Connector with Elasticsearch.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Prepare the Elasticsearch Environment
If you already have an Elasticsearch environment with the following information,
skip this step and continue from the
[Create The Connector](#create-the-connector) section.
* `connection.url`
* `connection.username`
* `connection.password`
Go to [Elastic Cloud](https://cloud.elastic.co/deployments) Create or a
deployment. Aside from the name, default configurations should be fine for this
guide.
Don't forget to save the deployment credentials. We need them to create the
connector later.
Lastly, we need the connection endpoint. Click on your deployment to see the
details and click to the "Copy Endpoint" of Elasticsearch in Applications
section.
These three(username, password, and endpoint) should be enough to create the
connector.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Aiven OpenSearch Sink Connector**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created an OpenSearch Sink Connector.
As you put data into your selected topics, the data will be written into
ElasticSearch.
# Aiven Amazon S3 Sink Connector
Aiven Amazon S3 Sink Connector allows you to continuously store the data from
your Kafka Topics in Amazon S3. In this guide, we will walk you through creating
a Amazon S3 Sink Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Prepare the Amazon S3 Environment
If you already have a Amazon S3 environment with the following information, skip
this step and continue from the [Create The Connector](#create-the-connector)
section. Note that the user with the given access keys, should have permission
to modify the given bucket.
* `aws.access.key.id`
* `aws.secret.access.key`
* `aws.s3.bucket.name`
* `aws.s3.region`
Go to [AWS S3 Console](https://s3.console.aws.amazon.com/s3/) Create or select a
bucket. Note that this bucket name will be used later to configure the
connector.
To make this guide simple, we will allow public access to this bucket(not
recommended in production).
You can disable public access and allow only following IP's coming from Upstash
:
```
52.48.149.7
52.213.40.91
174.129.75.41
34.195.190.47
52.58.175.235
18.158.44.120
63.34.151.162
54.247.137.96
3.78.151.126
3.124.80.204
34.236.200.33
44.195.74.73
```
Aside from bucket name and public access changes, default configurations should
be fine for this guide.
Next, we will create a user account with permissions to modify S3 buckets Go to
[AWS IAM](https://console.aws.amazon.com/iam) , then "Access Management" and
"Users". Click on "Add Users".
Give a name to the user and continue with the next screen.
On the "Set Permissions" screen, we will give the "AmazonFullS3Access" to this
user.
This gives more permissions than needed. You can create a custom policy with
following json for more restrictive policy.
```
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Resource": "*"
}
]
}
```
After creating the user, we will go into the details of that user to create a
key. Click on the user, then go to the "Security Credentials". An the "Access
Keys" section, click on the "Create access key" button.
We will choose "Application running outside AWS" and create the access key.
Don't forget to store access key id and secret key. We will use these two when
creating the connector.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Aiven Amazon S3 Connector**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created an Aiven Amazon S3 Sink Connector.
As you put data into your selected topics, the data will be written into Amazon
S3. You can see the data coming from your related bucket in the Amazon Console.
# Google BigQuery Sink Connector
Google BigQuery Sink Connector allows you to continuously store the data from
your Kafka Topics in Google BigQuery. In this guide, we will walk you through
creating a Google BigQuery Sink Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Prepare the Google BigQuery Environment
If you already have a Google BigQuery environment with the following
information, skip this step and continue from the
[Create The Connector](#create-the-connector) section.
* project name
* a data set
* an associated google service account with permission to modify the google big
query dataset.
Go to [Google Cloud BigQuery](https://console.cloud.google.com/bigquery). Create
or select a project. Note that this project name will be used later to configure
the connector.
Create a dataset for the project. Note that this dataset name will be used later
to configure the connector.
Default configurations should be fine for this guide.
Next, we will create a service account which later we will connect to this
project. Go to [Google Cloud Console](https://console.cloud.google.com/), then
"IAM & admin" and "Service accounts"
Click on "Create Service Account".
Give a name to your service account.
Configure permissions for the service account. To keep it simple, we will make
this service account "Owner" to allow everything. You may want to be more
specific.
The rest of the config can be left empty. After creating the service account, we
will go to its settings to attach a key to it. Go to the "Actions" tab, and
select "Manage keys".
Then create a new key, if you don't have one already. We will select the "JSON"
key type as recommended.
We will use the content of this JSON file when creating the connector. For
reference it should look something like this:
```json
{
"type": "service_account",
"project_id": "bigquerysinkproject",
"private_key_id": "b5e8b29ed62171aaaa2b5045f04826635bcf78c4",
"private_key": "-----BEGIN PRIVATE A_LONG_PRIVATE_KEY_WILL_BE_HERE PRIVATE KEY-----\n",
"client_email": "serviceforbigquerysink@bigquerysinkproject.iam.gserviceaccount.com",
"client_id": "109444138898162952667",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/serviceforbigquerysink%40bigquerysinkproject.iam.gserviceaccount.com"
}
```
Then we need to give permission to this service account from the dataset that we
created. From [BigQuery Console](https://console.cloud.google.com/bigquery) go
to your dataset settings and click share.
The "Dataset Permissions" view will open. Click to "Add Principal" We will add
the service account we have created as a principal here. And we will assign the
"Owner" role to it to make this example simple. You may want to be more specific
here.
With this step, the BigQuery dataset should be ready to use with the connector.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Google BigQuery Sink Connector**
Enter the required properties.
Note that the Google BigQuery Connector expects the data to have a schema. That
is why we choose JsonConvertor with schema included. Alternatively AvroConvertor
with SchemaRegistry can be used as well.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created a Google BigQuery Sink Connector.
As you put data into your selected topics, the data will be written into Google
BigQuery. You can view it from the Google BigQuery Console.
# Supported Connect Plugins
You can use several types of plugins together with your connectors.
Here are all the supported plugins supported.
# Common Plugins supported by all connectors
## Transforms
Related documentation: [https://kafka.apache.org/documentation/#connect\_transforms](https://kafka.apache.org/documentation/#connect_transforms)
org.apache.kafka.connect.transforms.Cast\$Key
org.apache.kafka.connect.transforms.Cast\$Value
org.apache.kafka.connect.transforms.DropHeaders
org.apache.kafka.connect.transforms.ExtractField\$Key
org.apache.kafka.connect.transforms.ExtractField\$Value
org.apache.kafka.connect.transforms.Filter
org.apache.kafka.connect.transforms.Flatten\$Key
org.apache.kafka.connect.transforms.Flatten\$Value
org.apache.kafka.connect.transforms.HeaderFrom\$Key
org.apache.kafka.connect.transforms.HeaderFrom\$Value
org.apache.kafka.connect.transforms.HoistField\$Key
org.apache.kafka.connect.transforms.HoistField\$Value
org.apache.kafka.connect.transforms.InsertField\$Key
org.apache.kafka.connect.transforms.InsertField\$Value
org.apache.kafka.connect.transforms.InsertHeader
org.apache.kafka.connect.transforms.MaskField\$Key
org.apache.kafka.connect.transforms.MaskField\$Value
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.ReplaceField\$Key
org.apache.kafka.connect.transforms.ReplaceField\$Value
org.apache.kafka.connect.transforms.SetSchemaMetadata\$Key
org.apache.kafka.connect.transforms.SetSchemaMetadata\$Value
org.apache.kafka.connect.transforms.TimestampConverter\$Key
org.apache.kafka.connect.transforms.TimestampConverter\$Value
org.apache.kafka.connect.transforms.TimestampRouter
org.apache.kafka.connect.transforms.ValueToKey
## Predicates
Related documentation: [https://kafka.apache.org/documentation/#connect\_predicates](https://kafka.apache.org/documentation/#connect_predicates)
org.apache.kafka.connect.transforms.predicates.HasHeaderKey
org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
## Converters
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.ShortConverter
org.apache.kafka.connect.json.JsonConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.storage.SimpleHeaderConverter
io.confluent.connect.avro.AvroConverter
# Plugins Supported By Only Debezium Connectors
## Transforms
Related documentation: [https://debezium.io/documentation/reference/stable/transformations/index.html](https://debezium.io/documentation/reference/stable/transformations/index.html)
io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
io.debezium.connector.mysql.transforms.ReadToInsertEvent
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.ExtractChangedRecordState
io.debezium.transforms.ExtractNewRecordState
io.debezium.transforms.HeaderToValue
io.debezium.transforms.UnwrapFromEnvelope
io.debezium.transforms.outbox.EventRouter
io.debezium.transforms.partitions.ComputePartition
io.debezium.transforms.partitions.PartitionRouting
## Converters
io.debezium.converters.BinaryDataConverter
io.debezium.converters.ByteArrayConverter
io.debezium.converters.ByteBufferConverter
io.debezium.converters.CloudEventsConverter
# Plugins Supported By Only Debezium Mongo Connector
## Transforms
Related documentation: [https://debezium.io/documentation/reference/stable/transformations/index.html](https://debezium.io/documentation/reference/stable/transformations/index.html)
io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
# Plugins Supported By Only Snowflake Sink Connector
## Converters
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
# Troubleshooting
# Allowlist (whitelist) Upstash IP addresses
For security purposes, some external services may require adding the upstash IP addresses to be listed in their systems.
Here is the complete IP list that Upstash will send traffic from:
```
52.48.149.7
52.213.40.91
174.129.75.41
34.195.190.47
52.58.175.235
18.158.44.120
63.34.151.162
54.247.137.96
3.78.151.126
3.124.80.204
34.236.200.33
44.195.74.73
```
# Debezium MongoDB Source Connector
Debezium MongoDB Source Connector allows you to capture any changes in your
MongoDB database and store them as messages in your Kafka topics. In this guide,
we will walk you through creating a Debezium MongoDB Source Connector with
MongoDB database to Upstash Kafka.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose **Debezium MongoDB Connector**
Enter a connector name and MongoDB URI(connection string). Other configurations
are optional. We will skip them for now.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation.
If your MongoDB database is SSL enabled, don't forget to add
`"mongodb.ssl.enabled": true` at this step. For example, MongoDB Atlas is
always SSL enabled.
After that we can continue by clicking **Connect**.
Congratulations! You have created a Debezium MongoDB Source Connector to Kafka.
Note that no topics will be created until some data is available on the MongoDB
database.
You can go to **Messages** section of the related topic to see latest events as
they are coming from Kafka.
# Debezium Mysql Source Connector
Debezium Mysql Source Connector allows you to capture any changes on your Mysql
DB and store them as messages on your Kafka topics. In this guide, we will walk
you through creating Debezium Mysql Source Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Debezium Mysql Connector** for this example
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created a Debezium Mysql Source Connector. As you put
data into your Mysql DB, you will see that topics prefixed with given **Server
Name** will be created and populated with new data.
You can go to **Messages** section to see latest events as they are coming from
Kafka.
# Debezium PostgreSQL Source Connector
Debezium PostgreSQL Source Connector allows you to capture any changes on your
PostgreSQL DB and store them as messages on your Kafka topics. In this guide, we
will walk you through creating Debezium PostgreSQL Source Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Debezium PostgreSQL Connector** for this example
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created a Debezium PostgreSQL Source Connector. As you
put data into your PostgreSQL DB, you will see that related topics will be
created and populated with new data.
You can go to **Messages** section to see latest events as they are coming from
Kafka.
# Deprecation Notice
As of April 2024, Kafka Connectors are deprecated and will be removed in October, 1st 2024. Please check our [blog post](https://upstash.com/blog/kafka-connectors-deprecation) for more information.
If you were previously using Kafka Connect provided by Upstash, please follow [this guide](https://github.com/upstash/kafka-connectors?tab=readme-ov-file#migration-guide-from-upstash-kafka-connect) to migrate to your own self-hosted Kafka Connect.
If you have any questions or need further assistance, reach out to us at [support@upstash.com](mailto:support@upstash.com) or join our community on [Discord](https://upstash.com/discord).
# Introduction
Kafka Connect is a tool for streaming data between Apache Kafka and other
systems without writing a single line of code. Via Kafka Sink Connectors, you
can export your data into any other storage. Via Kafka Source Connectors, you
can pull data to your Kafka topics from other systems.
Kafka Connectors can be self hosted but it requires you to setup and maintain
extra processes/machines. Upstash provides hosted versions of connectors for
your Kafka cluster. This will remove the burden of maintaining an extra system and also improve performance, as it will be closer to your cluster.
## Pricing
Connectors are **free** to use. We don't charge anything extra for connectors
other than per message pricing of Kafka topics. Check out
[Pricing](https://upstash.com/pricing/kafka) for details on our per message pricing.
## Get Started
We will create a MongoDB source connector as an example. You can find examples
for all supported connectors on the left side bar under `Connectors` section.
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create a MongoDB Database
Let's prepare our MongoDB Atlas Database. Go to
[MongoDB Atlas](https://www.mongodb.com/cloud/atlas/register) and register.
Select `Build Database` and choose the `Free Shared` option for this example.
Proceed with `Create Cluster` as the defaults should be fine. If this is
your first time, you will see `Security Quickstart` screen.
Choose a username and password. You will need these later to put it in
connection string to MongoDB.
You will be allowing Upstash to connect to your MongoDB database in the next
screen. So be careful in this step.
Select Cloud Environment and then IP Access List. Enter following static Upstash
IP addresses to IP Access List.
```
52.48.149.7
52.213.40.91
174.129.75.41
34.195.190.47
52.58.175.235
18.158.44.120
63.34.151.162
54.247.137.96
3.78.151.126
3.124.80.204
34.236.200.33
44.195.74.73
```
From here, you will be redirected to Database Deployments screen. Go to
`Connect` and select `Connect your application` to find the MongoDB
URI(connection string). Copy this string to use later when creating our Kafka
Connector. Don't forget to replace the password that you selected earlier for
your MongoDB user.
### Create the Connector
Head over to [console.upstash.com](https://console.upstash.com) and select
your Kafka cluster. Go the Connectors tab, and create your first connector with
`New Connector` button.
Then choose your connector as `MongoDB Connector Source` for this example.
Choose a connector name and enter MongoDB URI(connection string) that we
prepared earlier in Config screen. Other configurations are optional. We will
skip them for now.
Advanced screen is for any other configuration that selected Connector supports.
At the top of this screen, you can find a link to related documentation. For
this example, we can proceed with what we have and click `Connect` button
directly.
Congratulations you have created your first source connector to Kafka. Note that
no topics will be created until some data is available on the MongoDB.
### See It In Action
With this setup, anything that you have introduced in your MongoDB will be
available on your Kafka topic immediately.
Lets go to MongoDB and populate it with some data.
From main `Database` screen, choose `Browse Collections` , and then click
`Add My Own Data`. Create your database in the next screen.
Select `Insert Document` on the right.
And lets put some data here.
Shortly, we should see a topic created in Upstash Console Kafka with
`DATABASE_NAME.COLLECTION_NAME` in MongoDB database.
After selecting the topic, you can go to `Messages` section to see latest events
as they are coming from Kafka.
## Next
Check our list of available connectors and how to use them from following links:
* [MongoDB Source Connector](./mongosource)
* [MongoDB Sink Connector](./mongosink)
* [Debezium MongoDB Source Connector](./debeziummongo)
* [Debezium MysqlDB Source Connector](./debeziummysql)
* [Debezium PostgreSql Source Connector](./debeziumpsql)
* [Aiven JDBC Source Connector](./aivenjdbcsource)
* [Aiven JDBC Sink Connector](./aivenjdbcsink)
* [Google BigQuery Sink Connector](./bigquerysink)
* [Aiven Amazon S3 Sink Connector](./aivens3sink)
* [Aiven OpenSearch(Elasticsearch) Sink Connector](./aivenopensearchsink)
* [Aiven Http Sink Connector](./aivenhttpsink)
* [Snowflake Sink Connector](./snowflakesink)
If the connector that you need is not in this list, please add a request to our
[Road Map](https://roadmap.upstash.com/)
# MongoDB Sink Connector
MongoDB Sink Connector allows you to continuously store the data that appears in your
Kafka Topics to MongoDB database. In this guide, we will walk you through
creating DB Sink Connector with MongoDB database to Upstash Kafka.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose **MongoDB Connector Sink**
Enter a connector name and MongoDB URI(connection string). Select single or
multiple topics from existing topics to read data from.
Enter Database and Collection that the selected topics are written into. We
entered "new" as Database and "test" as Collection. It is not required for this
database and collection to exist on MongoDB database. They will be created
automatically.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created your MongoDB Sink Connector. As you put data
into your selected topics, the data will be written into your MongoDB database.
# MongoDB Source Connector
MongoDB Source Connector allows you to capture any changes in your MongoDB and
store them as messages on your Kafka topics. In this guide, we will walk you
through creating MongoDB Source Connector with MongoDB database to Upstash
Kafka.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose **MongoDB Connector Source**
Enter a connector name and MongoDB URI(connection string). Other configurations
are optional. We will skip them for now.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created your MongoDB Source Connector to Kafka. Note
that no topics will be created until some data is available on the MongoDB
database.
You can go to **Messages** section of your topic to see latest events as they
are coming from Kafka.
# Snowflake Sink Connector
The Snowflake Sink Connector allows you to continuously store the data from your
Kafka Topics to Snowflake.
In this guide, we will walk you through creating a Snowflake Sink Connector.
## Get Started
### Create a Kafka Cluster
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
### Prepare the Snowflake Environment
From the snowflake console, the following configurations need to be obtained:
1. `snowflake.url.name`
2. `snowflake.user.name`
3. `snowflake.private.key`
4. `snowflake.database.name`
5. `snowflake.schema.name`
If you already have these and configured the required roles and keys for the
database and the user, you can skip to the
[Create The Connector](#create-the-connector) section.
For more detailed configurations see
[the snowflake connector documentation](https://docs.snowflake.com/en/user-guide/kafka-connector-install#kafka-configuration-properties)
#### snowflake.url.name
`snowflake.url.name` can be found at the home page of
[the snowflake app](https://app.snowflake.com). Click on the account identifier
and navigate to `copy account URL` as shown below.
A URL similar to [https://mn93536.eu-central-1.snowflakecomputing.com](https://mn93536.eu-central-1.snowflakecomputing.com) will be
copied. We need to append port 443 while passing it to the connector. At the end
`snowflake.url.name` will look like the following.
```
https://mn93536.eu-central-1.snowflakecomputing.com:443
```
#### snowflake.user.name
`snowflake.user.name` can be seen on the profile view. To open the profile view,
go to the top left and click on the profile as shown below.
#### snowflake.private.key
`snowflake.private.key` will be generated by you locally. A pair of private and
public keys need to be generated. `public.key` will be set to the user on the
snowflake and the private key will be set to the connector as
`snowflake.private.key`.
See
[the following document](https://docs.snowflake.com/en/user-guide/kafka-connector-install#using-key-pair-authentication-key-rotation)
to learn how to generate the keys and set the public key to snowflake.
#### snowflake.database.name & snowflake.schema.name
From [the snowflake app](https://app.snowflake.com), create a database and a
schema. To be able to use this schema and connector we need to create and assign
a custom role to the database and the schema. You can follow
[this document](https://docs.snowflake.com/en/user-guide/kafka-connector-install#creating-a-role-to-use-the-kafka-connector)
to see how to do it.
Make sure that the script described in the document above is running on the
desired database and schema by selecting them at the top of the script as
follows:
Now, everything should be ready on the snowflake side. We can move on the
creating the connector.
### Create the Connector
Go to the Connectors tab, and create your first connector by clicking the **New
Connector** button.
Choose your connector as **Snowflake Connector**
Enter the required properties.
The advanced screen is for any other configuration that the selected connector
supports. At the top of this screen, you can find a link to related
documentation. We can proceed with what we have and click the **Connect** button
directly.
Congratulations! You have created a Snowflake Sink Connector.
As you put data into your selected topics, the data will be written into
Snowflake. You should see the data in
[the snowflake app](https://app.snowflake.com) as follows:
# Compliance
## Upstash Legal & Security Documents
* [Upstash Terms of Service](https://upstash.com/static/trust/terms.pdf)
* [Upstash Privacy Policy](https://upstash.com/static/trust/privacy.pdf)
* [Upstash Data Processing Agreement](https://upstash.com/static/trust/dpa.pdf)
* [Upstash Technical and Organizational Security Measures](https://upstash.com/static/trust/security-measures.pdf)
* [Upstash Subcontractors](https://upstash.com/static/trust/subprocessors.pdf)
## Is Upstash SOC2 Compliant?
As of July 2023, Upstash Redis and Kafka are SOC2 compliant. Check our [trust page](https://trust.upstash.com/) for details.
## Is Upstash ISO-27001 Compliant?
We are in process of getting this certification. Contact us
([support@upstash.com](mailto:support@upstash.com)) to learn about the expected
date.
## Is Upstash GDPR Compliant?
Yes. For more information, see our
[Privacy Policy](https://upstash.com/static/trust/privacy.pdf). We acquire DPAs
from each [subcontractor](https://upstash.com/static/trust/subprocessors.pdf)
that we work with.
## Is Upstash HIPAA Compliant?
Upstash is currently not HIPAA compliant. Contact us
([support@upstash.com](mailto:support@upstash.com)) if HIPAA is important for
you and we can share more details.
## Is Upstash PCI Compliant?
Upstash does not store personal credit card information. We use Stripe for
payment processing. Stripe is a certified PCI Service Provider Level 1, which is
the highest level of certification in the payments industry.
## Does Upstash conduct vulnerability scanning and penetration tests?
Yes, we use third party tools and work with pen testers. We share the results
with Enterprise customers. Contact us
([support@upstash.com](mailto:support@upstash.com)) for more information.
## Does Upstash take backups?
Yes, we take regular snapshots of the data cluster to the AWS S3 platform.
## Does Upstash encrypt data?
Customers can enable TLS while creating database/cluster, and we recommend it
for production databases/clusters. Also we encrypt data at rest at request of
customers.
# Integration with Third Parties & Partnerships
## Introduction
In this guideline we will outline the steps to integrate Upstash into your platform (GUI or Web App) and allow your users to create and manage Upstash databases without leaving your interfaces. We will explain how to use OAuth2.0 as the underlying foundation to enable this access seamlessly.
If your product or service offering utilizes Redis, Kafka or QStash or if there is a common use case that your end users enable by leveraging these database resources, we invite you to be a partner with us. By integrating Upstash into your platform, you can offer a more complete package for your customers and become a one stop shop. This will also position yourself at the forefront of innovative cloud computing trends such as serverless and expand your customer base.
This is the most commonly used partnership integration model that can be easily implemented by following this guideline. Recently [Cloudflare workers integration](https://blog.cloudflare.com/cloudflare-workers-database-integration-with-upstash/) is implemented through this methodology. For any further questions or partnership discussions please send us an email at [partnerships@upstash.com](mailto:partnerships@upstash.com)
Before starting development to integrate Upstash into your product, please
send an email to [partnerships@upstash.com](mailto:partnerships@upstash.com) for further assistance and guidance.
**General Flow (High level user flow)**
1. User clicks **`Connect Upstash`**Â button on your platformâs surface (GUI, Web App)
2. This initiates the OAuth 2.0 flow, which opens a new browser page displaying the **`Upstash Login Page`**.
3. If this is an existing user, user logins with their Upstash credentials otherwise they can directly sign up for a new Upstash account.
4. Browser window redirects to **`Your account has been connected`** page and authentication window automatically closes.
5. After the user returns to your interface, they see their Upstash Account is now connected.
## Technical Design (SPA - Regular Web Application)
1. Users click `Connect Upstash` button from Web App.
2. Web App initiate Upstash OAuth 2.0 flow. Web App can use
[Auth0 native libraries](https://auth0.com/docs/libraries).
Please reach [partnerships@upstash.com](mailto:partnerships@upstash.com) to receive client id and callback url.
3. After user returns from OAuth 2.0 flow then web app will have JWT token. Web
App can generate Developer Api key:
```bash
curl -XPOST https://api.upstash.com/apikey \
-H "Authorization: Bearer JWT_KEY" \
-H "Content-Type: application/json" \
-d '{ "name": "APPNAME_API_KEY_TIMESTAMP" }'
```
4. Web App need to save Developer Api Key to the backend.
## Technical Design ( GUI Apps )
1. User clicks **`Connect Upstash`** button from web app.
2. Web app initiates Upstash OAuth 2.0 flow and it can use **[Auth0 native libraries](https://auth0.com/docs/libraries)**.
3. App will open new browser:
```
https://auth.upstash.com/authorize?response_type=code&audience=upstash-api&scope=offline_access&client_id=XXXXXXXXXX&redirect_uri=http%3A%2F%2Flocalhost:3000
```
Please reach [partnerships@upstash.com](mailto:partnerships@upstash.com) to receive client id.
4. After user authenticated Auth0 will redirect user to
`localhost:3000/?code=XXXXXX`
5. APP can return some nice html response when Auth0 returns to `localhost:3000`
6. After getting `code` parameter from the URL query, GUI App will make http
call to the Auth0 code exchange api. Example CURL request
```bash
curl -XPOST 'https://auth.upstash.com/oauth/token' \
--header 'content-type: application/x-www-form-urlencoded' \
--data 'grant_type=authorization_code --data audience=upstash-api' \
--data 'client_id=XXXXXXXXXXX' \
--data 'code=XXXXXXXXXXXX' \
--data 'redirect_uri=localhost:3000'
```
Response:
```json
{
"access_token": "XXXXXXXXXX",
"refresh_token": "XXXXXXXXXXX",
"scope": "offline_access",
"expires_in": 172800,
"token_type": "Bearer"
}
```
7. After 6th Step the response will include `access_token`, it has 3 days TTL.
GUI App will call Upstash API to get a developer api key:
```bash
curl https://api.upstash.com/apikey -H "Authorization: Bearer JWT_KEY" -d '{ "name" : "APPNAME_API_KEY_TIMESTAMP" }'
```
8. GUI App will save Developer Api key locally. Then GUI App can call any
Upstash Developer API [developer.upstash.com/](https://developer.upstash.com/)
## Managing Resources
After obtaining Upstash Developer Api key, your platform surface (web or GUI) can call Upstash API. For example **[Create Database](https://developer.upstash.com/#create-database-global)**, **[List Database](https://developer.upstash.com/#list-databases)**
In this flow, you can ask users for region information and name of the database then can call Create Database API to complete the task
Example CURL request:
```bash
curl -X POST \
https://api.upstash.com/v2/redis/database \
-u 'EMAIL:API_KEY' \
-d '{"name":"myredis", "region":"global", "primary_region":"us-east-1", "read_regions":["us-west-1","us-west-2"], "tls": true}'
```
# Legal
## Upstash Legal Documents
* [Upstash Terms of Service](https://upstash.com/trust/terms.pdf)
* [Upstash Privacy Policy](https://upstash.com/trust/privacy.pdf)
* [Upstash Subcontractors](https://upstash.com/trust/subprocessors.pdf)
# Enterprise Support
Enterprise Support is recommended for customers who use Upstash as part of
their production systems.
Enterprise Support includes the following services:
* Response time SLA
* Dedicated Slack/Discord Channels
* Dedicated real time support: We reserve our engineers for you to help you for
cases like architecture review, product launch or data migration. Max 10 hours
per / month.
### Response Time SLA
* General guidance: 24 hours
* System impaired: \< 12 hours
* Production system impaired: \< 4 hours
* Production system down: \< 1 hour
### Pricing
To purchase or learn more about Enterprise Support, please contact us at [support@upstash.com](mailto:support@upstash.com)
# Uptime SLA
This Service Level Agreement ("SLA") applies to the use of the Upstash services,
offered under the terms of our Terms of Service or other agreement with us
governing your use of Upstash. This SLA does not apply to Upstash services in
the Upstash Free Tier. It is clarified that this SLA is subject to the terms of
the Agreement, and does not derogate therefrom (capitalized terms, unless
otherwise indicated herein, have the meaning specified in the Agreement).
Upstash reserves the right to change the terms of this SLA by publishing updated
terms on its website, such change to be effective as of the date of publication.
### Regional and Global Database SLA
Upstash will use commercially reasonable efforts to make regional and global
databases available with a Monthly Uptime Percentage of at least 99.99%.
In the event any of the services do not meet the SLA, you will be eligible to
receive a Service Credit as described below.
| Monthly Uptime Percentage | Service Credit Percentage |
| --------------------------------------------------- | ------------------------- |
| Less than 99.99% but equal to or greater than 99.0% | 10% |
| Less than 99.0% but equal to or greater than 95.0% | 30% |
| Less than 95.0% | 60% |
### SLA Credits
Service Credits are calculated as a percentage of the monthly bill (excluding
one-time payments such as upfront payments) for the service in the affected
region that did not meet the SLA.
Uptime percentages are recorded and published in the
[Upstash Status Page](https://status.upstash.com).
To receive a Service Credit, you should submit a claim by sending an email to
[support@upstash.com](mailto:support@upstash.com). Your credit request should be
received by us before the end of the second billing cycle after the incident
occurred.
We will apply any service credits against future payments for the applicable
services. At our discretion, we may issue the Service Credit to the credit card
you used. Service Credits will not entitle you to any refund or other payment. A
Service Credit will be applicable and issued only if the credit amount for the
applicable monthly billing cycle is greater than one dollar (\$1 USD). Service
Credits may not be transferred or applied to any other account.
# Support & Contact Us
## Community
The [Upstash Discord Channel](https://upstash.com/discord) is the best way to
interact with the community.
## Team
You can contact the team
via [support@upstash.com](mailto:support@upstash.com) for technical support as
well as for questions and feedback.
## Follow Us
Follow us on [X](https://x.com/upstash).
## Bugs & Issues
You can help us improve Upstash by reporting issues, suggesting new features, and
giving general feedback in
our [Community GitHub Repo](https://github.com/upstash/issues/issues/new).
## Enterprise Support
Get [Enterprise Support](/common/help/prosupport) from the Upstash team.
# Uptime Monitor
## Status Page
You can track the uptime status of Upstash databases in
[Upstash Status Page](https://status.upstash.com)
## Latency Monitor
You can see the average latencies for different regions in
[Upstash Latency Monitoring](https://latency.upstash.com) page
# Connect Using Kafka Clients
Connecting to Upstash Kafka using any Kafka client is very straightforward. If
you do not have a Kafka cluster and/or topic already, follow
[these steps](../overall/getstarted) to create one.
After creating a cluster and a topic, just go to cluster details page on the
[Upstash Console](https://console.upstash.com) and copy bootstrap endpoint,
username and password.
Then replace following parameters in the code snippets of your favourite Kafka
client or language below.
* `{{ BOOTSTRAP_ENDPOINT }}`
* `{{ UPSTASH_KAFKA_USERNAME }}`
* `{{ UPSTASH_KAFKA_PASSWORD }}`
* `{{ TOPIC_NAME }}`
## Create a Topic
```typescript TypeScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({ brokers: ["{{ BOOTSTRAP_ENDPOINT }}"], sasl: {
mechanism: "scram-sha-512", username: "{{ UPSTASH_KAFKA_USERNAME }}", password:
"{{ UPSTASH_KAFKA_PASSWORD }}", }, ssl: true, });
const admin = kafka.admin();
const createTopic = async () => { await admin.connect(); await
admin.createTopics({ validateOnly: false, waitForLeaders: true, topics: [ {
topic: "{{ TOPIC_NAME }}", numPartitions: partitions, replicationFactor:
replicationFactor, }, ], }); await admin.disconnect(); }; createTopic();
```
```py Python
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
admin = KafkaAdminClient(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
)
admin.create_topics([NewTopic(name='{{ TOPIC_NAME }}', num_partitions=partitions, replication_factor=replicationFactor)])
admin.close()
```
```java Java
class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
try (var admin = Admin.create(props)) {
admin.createTopics(
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
).all().get();
}
}
}
```
```go Go
import (
"context"
"crypto/tls"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
conn, err := dialer.Dial("tcp", "{{ BOOTSTRAP_ENDPOINT }}")
if err != nil {
log.Fatalln(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
log.Fatalln(err)
}
controllerConn, err := dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
log.Fatalln(err)
}
defer controllerConn.Close()
err = controllerConn.CreateTopics(kafka.TopicConfig{
Topic: "{{ TOPIC_NAME }}",
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
})
if err != nil {
log.Fatalln(err)
}
}
```
## Produce a Message
```typescript TypeScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const producer = kafka.producer();
const produce = async () => {
await producer.connect();
await producer.send({
topic: "{{ TOPIC_NAME }}",
messages: [{ value: "Hello Upstash!" }],
});
await producer.disconnect();
};
produce();
```
```py Python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
)
future = producer.send('{{ TOPIC_NAME }}', b'Hello Upstash!')
record_metadata = future.get(timeout=10)
print (record_metadata)
producer.close()
```
```java Java
class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (var producer = new KafkaProducer(props)) {
producer.send(new ProducerRecord("{{ TOPIC_NAME }}", "Hello Upstash!"));
producer.flush();
}
}
}
```
```go Go
import (
"context"
"crypto/tls"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"{{ BOOTSTRAP_ENDPOINT }}"},
Topic: "{{ TOPIC_NAME }}",
Dialer: dialer,
})
defer w.Close()
err = w.WriteMessages(context.Background(),
kafka.Message{
Value: []byte("Hello Upstash!"),
},
)
if err != nil {
log.Fatalln("failed to write messages:", err)
}
}
```
## Consume Messages
```typescript TypeScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
const consume = async () => {
await consumer.connect();
await consumer.subscribe({
topic: "{{ TOPIC_NAME }}",
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic: topic,
partition: partition,
message: JSON.stringify(message),
});
},
});
};
consume();
```
```py Python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
sasl_mechanism='SCRAM-SHA-512',
security_protocol='SASL_SSL',
sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
group_id='{{ GROUP_NAME }}',
auto_offset_reset='earliest',
)
consumer.subscribe(['{{ TOPIC_NAME }}'])
records = consumer.poll(timeout_ms=10000)
print(records)
consumer.close()
```
```java Java
class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "{{ GROUP_NAME }}");
try(var consumer = new KafkaConsumer(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}
```
```go Go
import (
"context"
"crypto/tls"
"log"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
func main() {
mechanism, err := scram.Mechanism(scram.SHA512,
"{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
if err != nil {
log.Fatalln(err)
}
dialer := &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"{{ BOOTSTRAP_ENDPOINT }}"},
GroupID: "{{ GROUP_NAME }}",
Topic: "{{ TOPIC_NAME }}",
Dialer: dialer,
})
defer r.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
m, err := r.ReadMessage(ctx)
if err != nil {
log.Fatalln(err)
}
log.Printf("%+v\n", m)
}
```
# Connect Using kaf CLI
[kaf](https://github.com/birdayz/kaf) is a modern CLI for Apache Kafka. You can
connect to your Upstash Kafka cluster using `kaf`.
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
In the cluster details section of the
[Upstash Console](https://console.upstash.com) copy bootstrap endpoint, username
and password. Then replace following parameters in the code snippets below with
the actual values you copied earlier.
* `$BOOTSTRAP_ENDPOINT`
* `$UPSTASH_KAFKA_USERNAME`
* `$UPSTASH_KAFKA_PASSWORD`
* `$GROUP_ID`
* `$TOPIC_NAME`
Initially we should add cluster configuration to `kaf`'s config file, which
should be located in `~/.kaf/config`. Open config file if it exists or create an
empty one and insert following config:
```yaml
clusters:
- name: $CLUSTER_NAME
brokers:
- $BOOTSTRAP_ENDPOINT
SASL:
mechanism: SCRAM-SHA-512
username: $UPSTASH_KAFKA_USERNAME
password: $UPSTASH_KAFKA_PASSWORD
security-protocol: SASL_SSL
```
`$CLUSTER_NAME` is a logical name, which is used to identify different Kafka
cluster. You can use your Upstash cluster name.
To select the cluster configuration to use, run:
```shell
kaf config use-cluster $CLUSTER_NAME
```
At this point you should be able to connect to your Kafka cluster using `kaf`.
**List Brokers and Topics:**
```shell
kaf nodes
```
```shell
kaf topics
```
**Produce a message:**
```shell
echo "Hello Upstash!" | kaf produce $TOPIC_NAME
```
**Fetch messages:**
```shell
kaf consume $TOPIC_NAME
```
**Consume messages using consumer groups:**
```shell
kaf consume $TOPIC_NAME -g $GROUP_ID --offset oldest
```
For more information see [kaf](https://github.com/birdayz/kaf) repository.
# Connect Using kcat CLI
[kcat](https://github.com/edenhill/kcat) is a generic command line non-JVM
producer and consumer for Apache Kafka. You can connect to your Upstash Kafka
cluster using `kcat`.
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
In the cluster details section of the
[Upstash Console](https://console.upstash.com) copy bootstrap endpoint, username
and password. Then replace following parameters in the code snippets below with
the actual values you copied earlier.
* `$BOOTSTRAP_ENDPOINT`
* `$UPSTASH_KAFKA_USERNAME`
* `$UPSTASH_KAFKA_PASSWORD`
* `$GROUP_ID`
* `$TOPIC_NAME`
**Query cluster metadata:**
```shell
kcat -b $BOOTSTRAP_ENDPOINT -X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username=$UPSTASH_KAFKA_USERNAME \
-X sasl.password=$UPSTASH_KAFKA_PASSWORD \
-L
```
**Produce a message:**
```shell
echo "Hello Upstash!" | kcat -b $BOOTSTRAP_ENDPOINT
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username=$UPSTASH_KAFKA_USERNAME \
-X sasl.password=$UPSTASH_KAFKA_PASSWORD \
-P -t $TOPIC_NAME
```
**Fetch messages:**
```shell
kcat -b $BOOTSTRAP_ENDPOINT -X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username=$UPSTASH_KAFKA_USERNAME \
-X sasl.password=$UPSTASH_KAFKA_PASSWORD \
-C -t $TOPIC_NAME
```
**Consume messages using consumer groups:**
```shell
kcat -b $BOOTSTRAP_ENDPOINT -X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username=$UPSTASH_KAFKA_USERNAME \
-X sasl.password=$UPSTASH_KAFKA_PASSWORD \
-o beginning -G $GROUP_ID $TOPIC_NAME
```
For more information see [kcat](https://github.com/edenhill/kcat) repository.
# Connect with upstash-kafka
[upstash-kafka](https://github.com/upstash/upstash-kafka/blob/master/README.md)
is an HTTP/REST based Kafka client built on top of
[Upstash Kafka REST API](https://docs.upstash.com/kafka/rest).
It is the only connectionless (HTTP based) Kafka client and designed to work
with:
* Serverless functions (AWS Lambda ...)
* Cloudflare Workers (see
[the example](https://github.com/upstash/upstash-kafka/tree/main/examples/cloudflare-workers))
* Fastly Compute\@Edge
* Next.js Edge, Remix, Nuxt ...
* Client side web/mobile applications
* WebAssembly
* and other environments where HTTP is preferred over TCP.
## Quick Start
### Install
```bash
npm install @upstash/kafka
```
### Authenticate
Copy URL, username and password from
[Upstash Console](https://console.upstash.com)
```typescript
import { Kafka } from "@upstash/kafka";
const kafka = new Kafka({
url: "",
username: "",
password: "",
});
```
### Produce
```typescript
const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const response = await p.produce("TOPIC", message);
const response2 = await p.produce("TOPIC", message, {
partition: 1,
timestamp: 4567,
key: "KEY",
headers: [{ key: "TRACE-ID", value: "32h67jk" }],
});
```
### Produce Many
```javascript
const p = kafka.producer();
const res = await p.produceMany([
{
topic: "TOPIC",
value: "MESSAGE",
// ...options
},
{
topic: "TOPIC-2",
value: "MESSAGE-2",
// ...options
},
]);
```
### Consume
When a new consumer instance is created, it may return empty messages until
consumer group coordination is completed.
```javascript
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
});
```
## Commit
While `consume` commits automatically, you can commit manually as below:
```typescript
const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
});
for (const message of messages) {
// message handling logic
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
});
}
```
## Fetch
```typescript
const c = kafka.consumer();
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
});
```
## Examples
See [here](https://github.com/upstash/upstash-kafka/tree/main/examples) for more
examples.
# Consume Messages Using REST API
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
In the cluster details section of the
[Upstash Console](https://console.upstash.com), scroll down the **REST API**
section and and copy `UPSTASH_KAFKA_REST_URL`, `UPSTASH_KAFKA_REST_USERNAME` and
`UPSTASH_KAFKA_REST_PASSWORD` using the copy icons next to them.
We will use a `Node.js` sample code to show how to consume messages using the
REST API. Our sample will use a topic named `cities` and consume previously
produced city names from this topic using Kafka consumer groups and automatic
offset committing.
Replace following parameters in the code snippets below with your actual values.
```js
const address = "https://tops-stingray-7863-eu1-rest-kafka.upstash.io";
const user = "G9wcy1zdGluZ3JheS03ODYzJMUX";
const pass = "eUmYCkAlxEhihIc7Hooi2IA2pz2fw==";
const auth = Buffer.from(`${user}:${pass}`).toString("base64");
const topic = "cities";
```
Following code will consume city names using `mygroup` consumer group id and
`myconsumer` consumer id from the topic starting from the latest offset and
print the consumed messages and their offsets to the console:
```js
async function consumeTopic(groupId, consumerId, topic) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: { Authorization: `Basic ${auth}` },
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
consumeTopic("mygroup", "myconsumer", topic);
```
By default consume API starts consuming from the latest offset. It's also
possible to start from the earliest offset by passing
`Kafka-Auto-Offset-Reset: earliest` request header:
```js
async function consumeTopic(groupId, consumerId, topic, offsetReset) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: {
Authorization: `Basic ${auth}`,
"Kafka-Auto-Offset-Reset": offsetReset,
},
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
consumeTopic("mygroup", "myconsumer", topic, "earliest");
```
We can also go deeper and turn off auto-commit behaviour of the consumer to
manually commit the offsets later. To turn off auto commit, we should send
`Kafka-Enable-Auto-Commit: false` header. This allows us to commit the offsets
only when all messages processed successfully.
```js
async function consumeTopicWithoutCommit(
groupId,
consumerId,
topic,
offsetReset
) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: {
Authorization: `Basic ${auth}`,
"Kafka-Auto-Offset-Reset": offsetReset,
"Kafka-Enable-Auto-Commit": "false",
},
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
async function commitOffsetsFor(groupId, consumerId) {
const response = await fetch(`${address}/commit/${groupId}/${consumerId}`, {
headers: { Authorization: `Basic ${auth}` },
});
const resp = await response.json();
console.log(
`Result: ${resp.result}, Error: ${resp.error}, Status: ${resp.status}`
);
}
consumeTopicWithoutCommit("mygroup", "myconsumer", topic, "earliest");
commitOffsetsFor("mygroup", "myconsumer");
```
For more info about using the REST API see
[Kafka REST Consume API](../rest/restconsumer#consume-api) section.
# Use Serverless Kafka as an Event Source For AWS Lambda
In this tutorial we will implement a serverless message processing pipeline
using Upstash Kafka and AWS Lambda. We will use Upstash Kafka as a source for an
AWS Lambda function. The produced messages will trigger AWS Lambda, so your
Lambda function will process the messages.
Because Upstash Kafka is a true serverless product, the whole pipeline will be
serverless. You pay only when your pipeline is actively processing messages.
### Create Upstash Kafka
First, create an Upstash Kafka cluster and topic following
[those steps.](../overall/getstarted) You will need the endpoint, username and
password in the following steps.
### Create AWS Lambda Function
Now letâs create an AWS Lambda function. For the best performance, select the
same region with Upstash Kafka cluster. We will use Node.js runtime.
You can use Serverless Framework or AWS SAM for this step.
### Lambda Function Code
Update your function's code as below:
```javascript
exports.handler = async (event) => {
if (!event.records) {
return { response: "no kafka event" };
}
for (let messages of Object.values(event.records)) {
for (let msg of messages) {
let buff = Buffer.from(msg.value, "base64");
let text = buff.toString("ascii");
// process the message
console.log(text);
}
}
return { response: "success" };
};
```
The above code parses the Kafka message from the event parameter. AWS encodes
the message using `base64` so we decode the message and log it to the console.
### Create AWS Secret
AWS Lambda trigger needs the Kafka credentials to be bundled as a secret. So we
will create a secret in
[AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/home?region=us-east-1#!/newSecret?step=selectSecret).
Select `Other type of secret`. Enter your Kafka cluster's username and password
as key/value pairs as below:
In the next screen give a name to your secret.
### Edit AWS Lambda Role
Now we need to configure the Lambda functionâs role to access the secrets.
On the AWS Lambda functionâs page, click on `Configuration` tab and
`Permissions`. Click to the link just below the `Role name` label.
The IAM management console will be opened in a new tab. On the `Permissions` tab
click on the link which starts with `AWSLambdaBasicExecutionRole-....`
Click on the `Edit Policy` button and add this configuration in the JSON tab:
```json
{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue"],
"Resource": ["REPLACE_THE_ARN_OF_THE_SECRET"]
}
```
You need to replace the ARN of the secret that you created in the previous step.
### Create the Trigger
Go back to your Lambda functions page and click the `Add trigger` button. Select
`Apache Kafka` from the menu and fill in the inputs.
Bootstrap servers: copy/paste endpoint from Upstash console.
Topic name: enter your topicâs name
Click on the `Add` button under Authentication. Select `SASL_SCRAM_256_AUTH` and
select the secret that you created in the previous step.
Check the `Enable trigger` checkbox and you can leave the remaining inputs as
they are.
### Testing
Now letâs produce messages and see if AWS Lambda is processing the messages.
Copy the curl URL to produce a message from
[Upstash Console](https://console.upstash.com).
```shell
â curl https://full-mantis-14289-us1-rest-kafka.upstash.io/produce/newtopic/newmessage -u ZnVsbC1tYW50aXMtMTQyODkkimaEsuUsiT9TGk3OFdjveYHBV9Jjzow03SnUtRQ:4-R-fmtoalXnoeu9TjQBOOL4njfSKwEsE10YvHMiW63hFljqUrrq5_yAq4TPGd9c6JbqfQ==
{
"topic" : "newtopic",
"partition" : 0,
"offset" : 48,
"timestamp" : 1639522675505
}
```
Check the cloudwatch **(Lambda > Monitor > View logs in CloudWatch)**. You
should see the messages you produced are logged by Lambda function.
# Fetch Messages Using REST API
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
In the cluster details section of the
[Upstash Console](https://console.upstash.com), scroll down the **REST API**
section and and copy `UPSTASH_KAFKA_REST_URL`, `UPSTASH_KAFKA_REST_USERNAME` and
`UPSTASH_KAFKA_REST_PASSWORD` using the copy icons next to them.
We will use a `Node.js` sample code to show how to fetch messages using the REST
API. Our sample will use a topic named `cities` and fetch previously produced
city names from this topic without using Kafka consumer groups.
Replace following parameters in the code snippets below with your actual values.
```js
const address = "https://tops-stingray-7863-eu1-rest-kafka.upstash.io";
const user = "G9wcy1zdGluZ3JheS03ODYzJMUX";
const pass = "eUmYCkAlxEhihIc7Hooi2IA2pz2fw==";
const auth = Buffer.from(`${user}:${pass}`).toString("base64");
const topic = "cities";
```
Following code will fetch city names from `0th` partition of the topic starting
from `1st` offset and print the fetched messages and their offsets to the
console:
```js
async function fetchTopic(topic, partition, offset) {
const request = {
topic: topic,
partition: partition,
offset: offset,
};
const response = await fetch(`${address}/fetch/`, {
headers: { Authorization: `Basic ${auth}` },
method: "POST",
body: JSON.stringify(request),
});
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
fetchTopic(topic, 0, 1);
```
For more info about using the REST API see
[Kafka REST Fetch API](../rest/restconsumer#consume-api) section.
# Use Serverless Kafka to Produce Events in Cloudflare Workers
In this tutorial, we will produce events to Upstash Kafka from a Cloudflare
Workers function.
### Create Kafka
First, create an Upstash Kafka cluster and topic following
[those steps.](https://docs.upstash.com/kafka) You will need the endpoint,
username and password in the following steps.
### Create Project
We will use
[Wrangler](https://developers.cloudflare.com/workers/get-started/guide) to
create the application. After installing and configuring wrangler, create a
folder for your project inside the folder run: `wrangler init`
It will create `wrangler.toml`. Paste your account id to the toml which is
logged by wrangler.
Copy and paste the Upstash Kafka URL, topic name, username and password to the
toml.
```toml
name = "produce-in-cloudflare-workers"
type = 'webpack'
account_id = 'REPLACE_HERE'
route = ''
zone_id = ''
usage_model = ''
workers_dev = true
target_type = "webpack"
[vars]
UPSTASH_KAFKA_REST_URL = "REPLACE_HERE"
UPSTASH_KAFKA_REST_USERNAME = "REPLACE_HERE"
UPSTASH_KAFKA_REST_PASSWORD = "REPLACE_HERE"
```
### Implement the Function
Init a node project and install @upstash/kafka:
```
npm init
npm install @upstash/kafka
```
Add the index.js as below:
```javascript
import { Kafka } from "@upstash/kafka";
addEventListener("fetch", (event) => {
event.respondWith(handleRequest(event.request));
});
async function handleRequest(request) {
console.log("START", request);
const kafka = new Kafka({
url: UPSTASH_KAFKA_REST_URL,
username: UPSTASH_KAFKA_REST_USERNAME,
password: UPSTASH_KAFKA_REST_PASSWORD,
});
const { pathname } = new URL(request.url);
if (pathname.startsWith("/favicon")) {
return fetch(request);
}
const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const response = await p.produce("mytopic", message);
return new Response(JSON.stringify(response));
}
```
The above code simply sends the message to Kafka. If your message is more
complicated then you can send it in the request body as explained
[here](./producewithrest).
### Run and Deploy the Function
Run the function locally: `wrangler dev`
Deploy your function to Cloudflare by running:
```
wrangler publish
```
This command will output your URL. The output of the URL should be something
like this:
```json
{
"topic": "newtopic",
"partition": 0,
"offset": 278,
"timestamp": 1640728294879
}
```
### Test the Function
Now letâs validate that the messages are pushed to Kafka. We can consume the
Kafka topic using the REST API. You can copy the curl code to consume from the
Upstash Console.
```
produce-in-lambda git:(master) â curl https://full-mantis-14289-us1-rest-kafka.upstash.io/consume/GROUP_NAME/GROUP_INSTANCE_NAME/mytopic -u REPLACE_USER_NAME:REPLACE_PASSWORD
[ {
"topic" : "newtopic",
"partition" : 0,
"offset" : 282,
"timestamp" : 1639610767445,
"key" : "",
"value" : "hello",
"headers" : [ ]
} ]%
```
### upstash-kafka vs other Kafka Clients
Upstash also supports native Kafka clients (e.g. KafkaJS). But Cloudflare
Workers runtime does not allow TCP connections.
[upstash-kafka](https://github.com/upstash/upstash-kafka) is HTTP based. That's
why we use [upstash-kafka](https://github.com/upstash/upstash-kafka) in our
Cloudflare examples.
# Use Serverless Kafka to Produce Events in AWS Lambda
In this tutorial, we will produce events to Upstash Kafka from an AWS Lambda
function.
### Create Kafka
First, create an Upstash Kafka cluster and topic following
[those steps.](../overall/getstarted) You will need the endpoint, username and
password in the following steps.
### Create Project
We will use Serverless Framework to create the application.
```shell
kafka-examples git:(master) serverless
What do you want to make? AWS - Node.js - HTTP API
What do you want to call this project? produce-in-lambda
Downloading "aws-node-http-api" template...
Project successfully created in produce-in-lambda folder
```
Then we will initialize a node project and install axios dependency.
```shell
npm init
npm install axios
```
### Implement the Lambda Function
Open the handler.js and update as below:
```javascript
const fetch = require("axios").default;
module.exports.hello = async (event) => {
const msg = "Hello";
const address = "https://REPLACE_YOUR_ENDPOINT";
const user = "REPLACE YOUR USERNAME";
const pass = "REPLACE YOUR PASSWORD";
const auth = Buffer.from(`${user}:${pass}`).toString("base64");
const response = await fetch(`${address}/produce/newtopic/${msg}}`, {
headers: {
Authorization: `Basic ${auth}`,
},
});
const res = response.data;
return {
statusCode: 200,
body: JSON.stringify(
{
header: "Pushed this message to Upstash Kafka with REST API!",
message: msg,
response: res,
},
null,
2
),
};
};
```
You need to replace the endpoint, username and password above with the values
that you copy from the [Upstash Console](https://console.upstash.com).
The above code simply creates a producer and sends the message to Kafka.
### Deploy the Lambda Function
You can deploy your function to AWS by running:
```
serverless deploy
```
This command will output your URL. The output should be something like this:
```json
{
"header": "Pushed this message to Upstash Kafka!",
"message": {
"value": "Hello message"
}
}
```
### Test the Function
Now letâs validate that the messages are pushed to Kafka. We can consume the
Kafka topic using the REST API. You can copy the curl code to consume from the
Upstash Console.
```
produce-in-lambda git:(master) â curl https://full-mantis-14289-us1-rest-kafka.upstash.io/consume/GROUP_NAME/GROUP_INSTANCE_NAME/newtopic -u REPLACE_USER_NAME:REPLACE_PASSWORD
[ {
"topic" : "newtopic",
"partition" : 0,
"offset" : 98,
"timestamp" : 1639610767445,
"key" : "",
"value" : "Hello message",
"headers" : [ ]
} ]%
```
### REST vs Kafka Client
We can also use a native Kafka client (e.g. KafkaJS) to access our Kafka
cluster. See
[the repo](https://github.com/upstash/kafka-examples/tree/master/produce-in-lambda)
for both examples. But there is a latency overhead if connecting (and
disconnecting) to the Kafka with each function invocation. In our tests, the
latency of the function with REST is about 10ms whereas it goes up to 50ms when
KafkaJS is used. Kafka client's performance could be improved by caching the
client outside the function but it can cause other problems as explained
[here](https://blog.upstash.com/serverless-database-connections).
**Troubleshooting:** If Lambda function outputs `internal error`, check the
cloudwatch log **(Lambda > Monitor > View logs in CloudWatch)**.
# Monitoring Upstash Kafka Cluster with AKHQ
[AKHQ](https://akhq.io) is a GUI for monitoring & managing Apache Kafka topics,
topics data, consumers group etc. You can connect and monitor your Upstash Kafka
cluster using [AKHQ](https://akhq.io).
To be able to use [AKHQ](https://akhq.io), first you should create a yaml
configuration file:
```yaml
akhq:
connections:
my-cluster:
properties:
bootstrap.servers: "tops-stingray-7863-eu1-rest-kafka.upstash.io:9092"
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_SSL
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="ZmlycG9iZXJtYW4ZHtSXVwmyJQ" password="J6ocnQfe25vUsI8AX-XxA==";
```
You should replace `bootstrap.servers` and `sasl.jaas.config` attributes with
your cluster endpoint and credentials.
You can start [AKHQ](https://akhq.io) application directly using `jar` file.
First download the latest release from
[releases page](https://github.com/tchiotludo/akhq/releases). Then launch the
application using following command:
```shell
java -Dmicronaut.config.files=application.yml -jar akhq.jar
```
Alternatively you can start using Docker:
```shell
docker run -p 8080:8080 -v ~/akhq/application.yml:/app/application.yml tchiotludo/akhq
```
After launching the [AKHQ](https://akhq.io) app, just go to
[http://localhost:8080](http://localhost:8080) to access UI.
For more information see
[AKHQ documentation](https://akhq.io/docs/#installation).
# Monitoring Upstash Kafka Cluster with Conduktor
[Conduktor](https://www.conduktor.io/) is a quite powerful application to
monitor and manage Apache Kafka clusters. You can connect and monitor your
Upstash Kafka cluster using [Conduktor](https://www.conduktor.io/). Conduktor
has a free for development and testing.
### Install Conduktor
Conduktor is a desktop application. So you need to
[download](https://www.conduktor.io/download/) it first. If you are using a Mac,
you can install it using `brew` too.
```shell
brew tap conduktor/brew
brew install conduktor
```
### Connect Your Cluster
Once you install Conduktor and
[create an Upstash Kafka cluster and topic](../overall/getstarted), you can
connect your cluster to Conduktor. Open Conduktor and click on
`New Kafka Cluster` button.
* You can set any name as `Cluster Name`.
* Copy Kafka endpoint from [Upstash console](https://console.upstash.com) and
paste to `Bootstrap Servers` field.
* In Upstash console, copy the properties from the `Properties` tab. Paste it to
the `Additional Properties` field on Conduktor.
Once you connected to the cluster, now you can produce and consume to your
topics using Conduktor.
# Monitoring Upstash Kafka Cluster with kafka-ui
[kafka-ui](https://github.com/provectus/kafka-ui) is a GUI for monitoring Apache
Kafka. From their description:
> Kafka UI for Apache Kafka is a simple tool that makes your data flows
> observable, helps find and troubleshoot issues faster and deliver optimal
> performance. Its lightweight dashboard makes it easy to track key metrics of
> your Kafka clusters - Brokers, Topics, Partitions, Production, and
> Consumption.
You can connect and monitor your Upstash Kafka cluster using
[kafka-ui](https://github.com/provectus/kafka-ui).
To be able to use [kafka-ui](https://github.com/provectus/kafka-ui), first you
should create a yaml configuration file:
```yaml
kafka:
clusters:
- name: my-cluster
bootstrapServers: "tops-stingray-7863-eu1-rest-kafka.upstash.io:9092"
properties:
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_SSL
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="ZmlycG9iZXJtYW4ZHtSXVwmyJQ" password="J6ocnQfe25vUsI8AX-XxA==";
```
You should replace `bootstrap.servers` and `sasl.jaas.config` attributes with
your cluster endpoint and credentials.
You can start [kafka-ui](https://github.com/provectus/kafka-ui) application
directly using `jar` file. First download the latest release from
[releases page](https://github.com/provectus/kafka-ui/releases). Then launch the
application using following command in the same directory with `application.yml`
file:
```shell
java -jar kafka-ui-api-X.Y.Z.jar
```
Alternatively you can start using Docker:
```shell
docker run -p 8080:8080 -v ~/kafka-ui/application.yml:/application.yml provectuslabs/kafka-ui:latest
```
After launching the [kafka-ui](https://github.com/provectus/kafka-ui) app, just
go to [http://localhost:8080](http://localhost:8080) to access UI.
For more information see
[kafka-ui documentation](https://github.com/provectus/kafka-ui/blob/master/README.md).
# Produce Messages Using REST API
If you do not have a Kafka cluster and/or topic already, follow [these
steps](../overall/getstarted) to create one.
In the cluster details section of the
[Upstash Console](https://console.upstash.com), scroll down the **REST API**
section and and copy `UPSTASH_KAFKA_REST_URL`, `UPSTASH_KAFKA_REST_USERNAME` and
`UPSTASH_KAFKA_REST_PASSWORD` using the copy icons next to them.
We will use a `Node.js` sample code to show how to produce message(s) using the
REST API. Our sample will use a topic named `cities` and send a few city names
to this topic.
Replace following parameters in the code snippets below with your actual values.
```js
const address = "https://tops-stingray-7863-eu1-rest-kafka.upstash.io";
const user = "G9wcy1zdGluZ3JheS03ODYzJMUX";
const pass = "eUmYCkAlxEhihIc7Hooi2IA2pz2fw==";
const auth = Buffer.from(`${user}:${pass}`).toString("base64");
const topic = "cities";
```
Following code will produce three city names to a topic:
```js
async function produce(topic, msg) {
const response = await fetch(`${address}/produce/${topic}/${msg}`, {
headers: { Authorization: `Basic ${auth}` },
});
const metadata = await response.json();
console.log(
`Topic: ${metadata.topic}, Partition: ${metadata.partition}, Offset: ${metadata.offset}`
);
}
produce(topic, "Tokyo");
produce(topic, "Istanbul");
produce(topic, "London");
```
Alternatively we can post all cities using a single request, instead of
producing them one-by-one. Note that in this case, URL does not have the message
argument but instead all messages are posted in the request body.
```js
async function produceMulti(topic, ...messages) {
let data = messages.map((msg) => {
return { value: msg };
});
const response = await fetch(`${address}/produce/${topic}`, {
headers: { Authorization: `Basic ${auth}` },
method: "POST",
body: JSON.stringify(data),
});
const metadata = await response.json();
metadata.forEach((m) => {
console.log(
`Topic: ${m.topic}, Partition: ${m.partition}, Offset: ${m.offset}`
);
});
}
produceMulti(topic, "Tokyo", "Istanbul", "London");
```
For more info about using the REST API see
[Kafka REST Produce API](../rest/restproducer) section.
# Clickhouse
This tutorial shows how to set up a pipeline to stream traffic events to Upstash Kafka and analyse with Clickhouse
In this tutorial series, we will show how to build an end to end real time
analytics system. We will stream the traffic (click) events from our web
application to Upstash Kafka then we will analyse it on real time. We will
implement one simply query with different stream processing tools:
```sql
SELECT city, count() FROM page_views where event_time > now() - INTERVAL 15 MINUTE group by city
```
Namely, we will query the number of page views from different cities in last 15
minutes. We keep the query and scenario intentionally simple to make the series
easy to understand. But you can easily extend the model for your more complex
realtime analytics scenarios.
If you do not have already set up Kafka pipeline, see
[the first part of series](./cloudflare_workers) where we
did the set up our pipeline including Upstash Kafka and Cloudflare Workers (or
Vercel).
In this part of the series, we will showcase how to use ClickHouse to run a
query on a Kafka topic.
## Clickhouse Setup
You can create a managed service from
[Clickhouse Cloud](https://clickhouse.cloud/) with a 30 days free trial.
Select your region and enter a name for your service. For simplicity, you can
allow access to the service from anywhere. If you want to restrict to the IP
addresses here is the list of Upstash addresses that needs permission:
```text
52.48.149.7
52.213.40.91
174.129.75.41
34.195.190.47
52.58.175.235
18.158.44.120
63.34.151.162
54.247.137.96
3.78.151.126
3.124.80.204
34.236.200.33
44.195.74.73
```
### Create a table
On Clickhouse service screen click on `Open SQL console`. Click on `+` to open a
new query window and run the following query to create a table:
```sql
CREATE TABLE page_views
(
country String,
city String,
region String,
url String,
ip String,
event_time DateTime DEFAULT now()
)
ORDER BY (event_time)
```
Â
## Kafka Setup
We will create an [Upstash Kafka cluster](https://console.upstash.com/kafka).
Upstash offers serverless Kafka cluster with per message pricing. Select the
same (or nearest) region with region of Clickhouse for the best performance.
Â
Also create a topic whose messages will be streamed to Clickhouse. Â
## Connector Setup
We will create a connector on
[Upstash console](https://console.upstash.com/kafka). Select your cluster and
click on `Connectors` tab. Select `Aiven JDBC Connector - Sink`
Click next to skip the Config step as we will enter the configuration manually
at the third (Advanced) step.
In the third step. copy paste the below config to the text editor:
```json
{
"name": "kafka-clickhouse",
"properties": {
"auto.create": false,
"auto.evolve": false,
"batch.size": 10,
"connection.password": "KqVQvD4HWMng",
"connection.url": "jdbc:clickhouse://a8mo654iq4e.eu-central-1.aws.clickhouse.cloud:8443/default?ssl=true",
"connection.user": "default",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"errors.deadletterqueue.topic.name": "dlqtopic",
"insert.mode": "insert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"pk.mode": "none",
"table.name.format": "page_views",
"topics": "mytopic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true
}
}
```
Replace the following attributes:
* "name" : Name your connector.
* "connection.password": Copy this from your Clickhouse dashboard. (`Connect` >
`View connection string`)
* "connection.url": Copy this from your Clickhouse dashboard. (`Connect` >
`View connection string`)
* "connection.user": Copy this from your Clickhouse dashboard. (`Connect` >
`View connection string`)
* "errors.deadletterqueue.topic.name": Give a name for your dead letter topic.
It will be auto created.
* "topics": Enter the name of the topic that you have created.
Note that there should be `?ssl=true` as a parameter for the connection.url.
Click the `Connect` button to create the connector.
## Test and Run
Clickhouse expects a schema together with the message payload. We need to go
back to [the set up step](./cloudflare_workers) and update
the message object to include schema as below:
```js
const message = {
schema: {
type: "struct",
optional: false,
version: 1,
fields: [
{
field: "country",
type: "string",
optional: false,
},
{
field: "city",
type: "string",
optional: false,
},
{
field: "region",
type: "string",
optional: false,
},
{
field: "url",
type: "string",
optional: false,
},
{
field: "ip",
type: "string",
optional: false,
},
],
},
payload: {
country: req.geo?.country,
city: req.geo?.city,
region: req.geo?.region,
url: req.url,
ip: req.headers.get("x-real-ip"),
mobile: req.headers.get("sec-ch-ua-mobile"),
platform: req.headers.get("sec-ch-ua-platform"),
useragent: req.headers.get("user-agent"),
},
};
```
It is not ideal to send the schema together with payload. Schema registry is a
solution. Upstash will launch managed schema registry service soon.
After deploying the changes (Cloudflare Workers or Vercel function), visit your
web app to generate traffic to Kafka.
Now, go to the Clickhouse console. `Connect` > `Open SQL console`. Click on
`page_views` (your table's name) on the left menu. You should see the table is
populated like below:
Also run the following query to get most popular cities in last 15 minutes:
```shell
SELECT city, count() FROM page_views where event_time > now() - INTERVAL 15 MINUTE group by city
```
It should return something like below:
# Cloudflare Workers
As a tutorial for this integration, we'll implement a real time analytics system. We'll stream the traffic (click) events from our web application to Upstash Kafka. Here's the implementation for this simple query:
```sql
SELECT city, count() FROM kafka_topic_page_views where timestamp > now() - INTERVAL 15 MINUTE group by city
```
Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.
We'll use Clouflare Workers to intercept incoming requests to the website, and run a serverless function.
### Kafka Setup
Create an Upstash Kafka cluster and a topic as explained
[here](https://docs.upstash.com/kafka).
### Project Setup
We will use **C3 (create-cloudflare-cli)** command-line tool to create our application. You can open a new terminal window and run C3 using the prompt below.
```shell npm
npm create cloudflare@latest
```
```shell yarn
yarn create cloudflare@latest
```
This will install the `create-cloudflare` package, and lead you through setup. C3 will also install Wrangler in projects by default, which helps us testing and deploying the application.
```text
â npm create cloudflare@latest
Need to install the following packages:
create-cloudflare@2.1.0
Ok to proceed? (y) y
using create-cloudflare version 2.1.0
â Create an application with Cloudflare Step 1 of 3
â
â In which directory do you want to create your application?
â dir ./cloudflare_starter
â
â What type of application do you want to create?
â type "Hello World" Worker
â
â Do you want to use TypeScript?
â yes typescript
â
â Copying files from "hello-world" template
â
â Do you want to use TypeScript?
â yes typescript
â
â Retrieving current workerd compatibility date
â compatibility date 2023-08-07
â
â Do you want to use git for version control?
â yes git
â
â° Application created
```
We will also install the **Upstash Kafka SDK** to connect to Kafka.
```bash
npm install @upstash/kafka
```
### The Code
You can update the `src/index.ts` file with the code below:
```ts src/index.ts
import { Kafka } from "@upstash/kafka";
export interface Env {
UPSTASH_KAFKA_REST_URL: string;
UPSTASH_KAFKA_REST_USERNAME: string;
UPSTASH_KAFKA_REST_PASSWORD: string;
}
export default {
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise {
if (new URL(request.url).pathname == "/favicon.ico") {
return new Response(null, { status: 200 });
}
let message = {
country: request.cf?.country,
city: request.cf?.city,
region: request.cf?.region,
url: request.url,
ip: request.headers.get("x-real-ip"),
mobile: request.headers.get("sec-ch-ua-mobile"),
platform: request.headers.get("sec-ch-ua-platform"),
useragent: request.headers.get("user-agent"),
};
const kafka = new Kafka({
url: env.UPSTASH_KAFKA_REST_URL,
username: env.UPSTASH_KAFKA_REST_USERNAME,
password: env.UPSTASH_KAFKA_REST_PASSWORD,
});
const p = kafka.producer();
// Please update the topic according to your configuration
const topic = "mytopic";
ctx.waitUntil(p.produce(topic, JSON.stringify(message)));
// if you use CF Workers to intercept your existing site, uncomment below
// return await fetch(request);
return new Response("My website");
},
};
```
```js src/index.js
import { Kafka } from "@upstash/kafka";
export default {
async fetch(request, env, ctx) {
if (new URL(request.url).pathname == "/favicon.ico") {
return new Response(null, { status: 200 });
}
let message = {
country: request.cf?.country,
city: request.cf?.city,
region: request.cf?.region,
url: request.url,
ip: request.headers.get("x-real-ip"),
mobile: request.headers.get("sec-ch-ua-mobile"),
platform: request.headers.get("sec-ch-ua-platform"),
useragent: request.headers.get("user-agent"),
};
const kafka = new Kafka({
url: env.UPSTASH_KAFKA_REST_URL,
username: env.UPSTASH_KAFKA_REST_USERNAME,
password: env.UPSTASH_KAFKA_REST_PASSWORD,
});
const p = kafka.producer();
// Please update the topic according to your configuration
const topic = "mytopic";
ctx.waitUntil(p.produce(topic, JSON.stringify(message)));
// if you use CF Workers to intercept your existing site, uncomment below
// return await fetch(request);
return new Response("My website");
},
};
```
Above, we simply parse the request object and send the useful information to Upstash Kafka. You may add/remove information depending on your own requirements.
### Configure Credentials
There are two methods for setting up the credentials for Upstash Kafka client. The recommended way is to use Cloudflare Upstash Integration. Alternatively, you can add the credentials manually.
#### Using the Cloudflare Integration
Access to the [Cloudflare Dashboard](https://dash.cloudflare.com) and login with the same account that you've used while setting up the Worker application. Then, navigate to **Workers & Pages > Overview** section on the sidebar. Here, you'll find your application listed.
Clicking on the application will direct you to the application details page, where you can perform the integration process. Switch to the **Settings** tab in the application details, and proceed to **Integrations** section. You will see various Worker integrations listed. To proceed, click the **Add Integration** button associated with the Upstash Kafka.
On the Integration page, connect to your Upstash account. Then, select the related cluster from the dropdown menu. Finalize the process by pressing **Add Integration** button.
#### Setting up Manually
Navigate to [Upstash Console](https://console.upstash.com) and copy/paste your `UPSTASH_KAFKA_REST_URL`, `UPSTASH_KAFKA_REST_USERNAME` and `UPSTASH_KAFKA_REST_PASSWORD` credentials to your `wrangler.toml` as below.
```yaml
[vars]
UPSTASH_KAFKA_REST_URL="REPLACE_HERE"
UPSTASH_KAFKA_REST_USERNAME="REPLACE_HERE"
UPSTASH_KAFKA_REST_PASSWORD="REPLACE_HERE"
```
### Test and Deploy
You can test the function locally with `npx wrangler dev`
Deploy your function to Cloudflare with `npx wrangler deploy`
Once the deployment is done, the endpoint of the function will be provided to you.
You can check if logs are collected in Kafka by copying the `curl` expression from the console:
```shell
curl https:///consume/GROUP_NAME/GROUP_INSTANCE_NAME/TOPIC \
-H "Kafka-Auto-Offset-Reset: earliest" -u \
REPLACE_HERE
```
# Decodable
This tutorial shows how to integrate Upstash Kafka with Decodable
[Decodable](https://www.decodable.co/product?utm_source=upstash) is a platform
which enables developers to build data pipelines using SQL. It is built on
Apache Flink under the hood to provide a seamless experience, while abstracting
away the underlying complexity. In this post, we will show how to connect an
Upstash Kafka topic to Decodable to streamline messages from Kafka to Decodable.
## Upstash Kafka Setup
Create a Kafka cluster using
[Upstash Console](https://console.upstash.com/kafka) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
## Decodable Setup
Just like Upstash, Decodable is a managed service that means you do not need to
host or provision anything. You can easily register for free and start using it.
After creating your account, click on `Connections` and `New Connection`. Select
`Apache Kafka`. Then:
* Select Source as connection type.
* Select **SASL\_SSL** as security protocol and **SCRAM-SHA-256** as SASL
mechanism.
* Enter your topic, SASL username, SASL password. You can find all those from
Upstash console.
* Value format should be JSON.
In the next step, click on `New Stream` and give a name to it.
In the schema screen, add `country`, `city`, `region` and `url` with `string`
type.
Give a name to your connection and click `Create Connection`. In the next screen
click on Start.
## Test the Setup
Now, let's some events to our Kafka topic. Go to Upstash console, click on your
cluster then `Topics`, click `mytopic`. Select `Messages` tab then click
`Produce a new message`. Send a message in JSON format like the below:
```json
{
"country": "US",
"city": "San Jose",
"region": "CA",
"url": "https://upstash.com"
}
```
Now, go back to Decodable console, click Streams and select the one you have
created. Then click `Run Preview`. You should see something like:
## Links
[Decodable documentation](https://docs.decodable.co/docs)
[Decodable console](https://app.decodable.co/)
[Upstash console](https://console.upstash.com/kafka)
# EMQX Cloud
This tutorial shows how to integrate Upstash Kafka with EMQX Cloud
EMQX, a robust open-source MQTT message broker, is engineered for scalable, distributed environments, prioritizing high availability, throughput, and minimal latency. As a preferred protocol in the IoT landscape, MQTT (Message Queuing Telemetry Transport) excels in enabling devices to effectively publish and subscribe to messages.
Offered by EMQ, EMQX Cloud is a comprehensively managed MQTT service in the cloud, inherently scalable and secure. Its design is particularly advantageous for IoT applications, providing dependable MQTT messaging services.
This guide elaborates on streaming MQTT data to Upstash by establishing data integration. This process allows clients to route temperature and humidity metrics to EMQX Cloud using MQTT protocol, and subsequently channel these data streams into a Kafka topic within Upstash.
## Initiating Kafka Clusters on Upstash
Begin your journey with Upstash by visiting [Upstash](https://upstash.com/) and registering for an account.
### Kafka Cluster Creation
1. After logging in, initiate the creation of a Kafka cluster by selecting the **Create Cluster** button.
2. Input an appropriate name and select your desired deployment region, ideally close to your EMQX Cloud deployment for optimized performance.
3. Choose your cluster type: opt for a single replica for development/testing or a multi-replica setup for production scenarios.
4. Click **Create Cluster** to establish your serverless Kafka cluster.
![UPSTASH](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/upstash_kafka_01.png)
### Topic Configuration
1. Inside the Cluster console, navigate to **Topics** and proceed with **Create Topic**.
2. Enter `emqx` in the **Topic name** field, maintaining default settings, then finalize with **Create**.
![UPSTASH](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/upstash_kafka_02.png)
### Setting Up Credentials
1. Go to **Credentials** in the navigation menu and choose **New Credentials**.
2. Here, you can customize the topic and permissions for the credential. Default settings will be used in this tutorial.
![UPSTASH](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/upstash_kafka_03.png)
With these steps, we have laid the groundwork for Upstash.
## Establishing Data Integration with Upstash
### Enabling EMQX Cloud's NAT Gateway
1. Sign in to the EMQX Cloud console and visit the deployment overview page.
2. Click on the **NAT Gateway** section at the bottom of the page and opt for **Subscribe Now**.
![NAT](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/public_nat.png)
### Data Integration Setup
1. In the EMQX Cloud console, under your deployment, go to **Data Integrations** and select **Upstash for Kafka**.
![create resource](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/upstash_kafka_04.png)
2. Fill in the **Endpoints** details from the Upstash Cluster details into the **Kafka Server** fields. Insert the username and password created in Create Credentials into the respective fields and click **Test** to confirm the connection.
![create resource](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/upstash_kafka_05.png)
3. Opt for **New** to add a Kafka resource. You'll see your newly created Upstash for Kafka listed under **Configured Resources**.
4. Formulate a new SQL rule. Input the following SQL command in the **SQL** field. This rule will process messages from the `temp_hum/emqx` topic and append details like client\_id, topic, and timestamp.
```sql
SELECT
timestamp as up_timestamp,
clientid as client_id,
payload.temp as temp,
payload.hum as hum
FROM
"temp_hum/emqx"
```
![rule sql](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/kafka_create_sql.png)
5. Conduct an SQL test by inputting the test payload, topic, and client data. Success is indicated by results similar to the example below.
![rule sql](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/kafka_create_sql_test.png)
6. Advance to **Next** to append an action to the rule. Specify the Kafka topic and message format, then confirm.
```bash
# kafka topic
emqx
# kafka message template
{"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
```
![rule sql](https://raw.githubusercontent.com/emqx/cloud-docs/master/en_US/rule_engine/_assets/kafka_action.png)
7. View the rule SQL statement and bound actions by clicking **View Details** after successfully adding the action.
8. To review created rules, click **View Created Rules** on the Data Integrations
# Apache Flink
This tutorial shows how to integrate Upstash Kafka with Apache Flink
[Apache Flink](https://flink.apache.org/) is a distributed processing engine
which can process streaming data.
### Upstash Kafka Setup
Create a Kafka cluster using [Upstash Console](https://console.upstash.com) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
Create two topics by following the creating topic
[steps](https://docs.upstash.com/kafka#create-a-topic). Letâs name first topic
âinputâ, since we are going to stream this topic to other one, which we can name
it as âoutputâ.
### Project Setup
If you already have a project and want to implement Upstash Kafka and Apache
Flink integration into it, you can skip this section and continue with [Add
Apache Flink and Kafka into the
Project](#add-apache-flink-and-kafka-into-the-project).
Install Maven to your machine by following [Maven Installation Guide](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html).
Run `mvn âversion` in a terminal or in a command prompt to make sure you have
Maven downloaded.
It should print out the version of the Maven you have:
```
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running `cd `
Run the following command:
```
mvn archetype:generate -DgroupId=com.kafkaflinkinteg.app -DartifactId=kafkaflinkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
```
### Add Apache Flink and Kafka into the Project
Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following Apache Flink dependencies into the
dependencies tag in `pom.xml` file.
```xml
org.apache.flinkflink-connector-kafka1.16.0org.apache.flinkflink-connector-base1.16.0org.apache.flinkflink-streaming-java1.16.0org.apache.flinkflink-clients1.16.0
```
### Streaming From One Topic to Another Topic
You need to create 2 more classes (LineSplitter, CustomSerializationSchema) for
word count example.
#### LineSplitter
This class will be custom implementation of FlatMapFunction from Apache Flink
client library. It takes a sentence, splits into words and returns a
two-dimensional Tuple in format: `(, 1)`.
Create LineSplitter class as following.
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2).
*/
public class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
```
#### CustomSerializationSchema
This class will be a custom implementation of KafkaRecordSerializationSchema
from Apache Flink Kafka connector library. It will provide a schema for
serializing and converting data from two-dimensional Tuple, which will be the
output of word counting process, to Kafka record format.
Create CustomSerializationSchema class as following:
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
public class CustomSerializationSchema implements KafkaRecordSerializationSchema> {
private String topic;
private ObjectMapper mapper;
public CustomSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}
@Override
public ProducerRecord serialize(Tuple2 stringIntegerTuple2, KafkaSinkContext kafkaSinkContext, Long aLong) {
byte[] k = null;
byte[] v = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
k = mapper.writeValueAsBytes(stringIntegerTuple2.f0);
v = mapper.writeValueAsBytes(stringIntegerTuple2.f1);
} catch ( JsonProcessingException e) {
// error
}
return new ProducerRecord<>(topic, k,v);
}
}
```
#### Integration
Import the following packages first:
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
```
Define the names of the topics you are going to work on:
```java
String inputTopic = "input";
String outputTopic = "output";
```
Create the following properties for Apache Flink Kafka connector and replace
`UPSTASH-KAFKA-*` placeholders with your cluster information.
```java
Properties props = new Properties();
props.put("transaction.timeout.ms", "90000"); // e.g., 2 hours
props.put("bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
```
Get the stream execution environment to create and execute the pipeline in it.
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
```
Create the Kafka consumer.
```java
KafkaSource source = KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperties(props)
.setTopics(inputTopic)
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
```
Implement the stream processing part, which will take the input sentence from
source and count words.
```java
DataStream> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
```
You can see the output by printing the data stream.
```java
stream.print();
```
If you produce message to the input topic from your
[console](https://console.upstash.com), you will see the output like this:
```
2> (This,1)
1> (an,1)
3> (is,1)
2> (sentence,1)
4> (example,1)
```
Next, create a Kafka producer to sink the data stream to output Kafka topic.
```java
KafkaSink sink = KafkaSink.builder()
.setKafkaProducerConfig(props)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("integ")
.setRecordSerializer(new CustomSerializationSchema(outputTopic))
.build();
stream.sinkTo(sink);
```
Finally, execute the Stream execution environment that was retrieved and run it.
```java
env.execute();
```
# Upstash Kafka with Decodable
This tutorial shows how to integrate Upstash Kafka with Decodable
[Decodable](https://www.decodable.co/product?utm_source=upstash) is a platform
which enables developers to build data pipelines using SQL. It is built on
Apache Flink under the hood to provide a seamless experience, while abstracting
away the underlying complexity. In this post, we will show how to connect an
Upstash Kafka topic to Decodable to streamline messages from Kafka to Decodable.
## Upstash Kafka Setup
Create a Kafka cluster using
[Upstash Console](https://console.upstash.com/kafka) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
## Decodable Setup
Just like Upstash, Decodable is a managed service that means you do not need to
host or provision anything. You can easily register for free and start using it.
After creating your account, click on `Connections` and `New Connection`. Select
`Apache Kafka`. Then:
* Select Source as connection type.
* Select **SASL\_SSL** as security protocol and **SCRAM-SHA-256** as SASL
mechanism.
* Enter your topic, SASL username, SASL password. You can find all those from
Upstash console.
* Value format should be JSON.
In the next step, click on `New Stream` and give a name to it.
In the schema screen, add `country`, `city`, `region` and `url` with `string`
type.
Give a name to your connection and click `Create Connection`. In the next screen
click on Start.
## Test the Setup
Now, let's some events to our Kafka topic. Go to Upstash console, click on your
cluster then `Topics`, click `mytopic`. Select `Messages` tab then click
`Produce a new message`. Send a message in JSON format like the below:
```json
{
"country": "US",
"city": "San Jose",
"region": "CA",
"url": "https://upstash.com"
}
```
Now, go back to Decodable console, click Streams and select the one you have
created. Then click `Run Preview`. You should see something like:
## Links
[Decodable documentation](https://docs.decodable.co/docs)
[Decodable console](https://app.decodable.co/)
[Upstash console](https://console.upstash.com/kafka)
# Upstash Kafka with Apache Flink
This tutorial shows how to integrate Upstash Kafka with Apache Flink
[Apache Flink](https://flink.apache.org/) is a distributed processing engine
which can process streaming data.
### Upstash Kafka Setup
Create a Kafka cluster using [Upstash Console](https://console.upstash.com) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
Create two topics by following the creating topic
[steps](https://docs.upstash.com/kafka#create-a-topic). Letâs name first topic
âinputâ, since we are going to stream this topic to other one, which we can name
it as âoutputâ.
### Project Setup
If you already have a project and want to implement Upstash Kafka and Apache
Flink integration into it, you can skip this section and continue with [Add
Apache Flink and Kafka into the
Project](#add-apache-flink-and-kafka-into-the-project).
Install Maven to your machine by following [Maven Installation Guide](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html).
Run `mvn âversion` in a terminal or in a command prompt to make sure you have
Maven downloaded.
It should print out the version of the Maven you have:
```
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running `cd `
Run the following command:
```
mvn archetype:generate -DgroupId=com.kafkaflinkinteg.app -DartifactId=kafkaflinkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
```
### Add Apache Flink and Kafka into the Project
Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following Apache Flink dependencies into the
dependencies tag in `pom.xml` file.
```xml
org.apache.flinkflink-connector-kafka1.16.0org.apache.flinkflink-connector-base1.16.0org.apache.flinkflink-streaming-java1.16.0org.apache.flinkflink-clients1.16.0
```
### Streaming From One Topic to Another Topic
You need to create 2 more classes (LineSplitter, CustomSerializationSchema) for
word count example.
#### LineSplitter
This class will be custom implementation of FlatMapFunction from Apache Flink
client library. It takes a sentence, splits into words and returns a
two-dimensional Tuple in format: `(, 1)`.
Create LineSplitter class as following.
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2).
*/
public class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
```
#### CustomSerializationSchema
This class will be a custom implementation of KafkaRecordSerializationSchema
from Apache Flink Kafka connector library. It will provide a schema for
serializing and converting data from two-dimensional Tuple, which will be the
output of word counting process, to Kafka record format.
Create CustomSerializationSchema class as following:
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
public class CustomSerializationSchema implements KafkaRecordSerializationSchema> {
private String topic;
private ObjectMapper mapper;
public CustomSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}
@Override
public ProducerRecord serialize(Tuple2 stringintTuple2, KafkaSinkContext kafkaSinkContext, Long aLong) {
byte[] k = null;
byte[] v = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
k = mapper.writeValueAsBytes(stringintTuple2.f0);
v = mapper.writeValueAsBytes(stringintTuple2.f1);
} catch ( JsonProcessingException e) {
// error
}
return new ProducerRecord<>(topic, k,v);
}
}
```
#### Integration
Import the following packages first:
```java
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
```
Define the names of the topics you are going to work on:
```java
String inputTopic = "input";
String outputTopic = "output";
```
Create the following properties for Apache Flink Kafka connector and replace
`UPSTASH-KAFKA-*` placeholders with your cluster information.
```java
Properties props = new Properties();
props.put("transaction.timeout.ms", "90000"); // e.g., 2 hours
props.put("bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
```
Get the stream execution environment to create and execute the pipeline in it.
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
```
Create the Kafka consumer.
```java
KafkaSource source = KafkaSource.builder()
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperties(props)
.setTopics(inputTopic)
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
```
Implement the stream processing part, which will take the input sentence from
source and count words.
```java
DataStream> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
```
You can see the output by printing the data stream.
```java
stream.print();
```
If you produce message to the input topic from your
[console](https://console.upstash.com), you will see the output like this:
```
2> (This,1)
1> (an,1)
3> (is,1)
2> (sentence,1)
4> (example,1)
```
Next, create a Kafka producer to sink the data stream to output Kafka topic.
```java
KafkaSink sink = KafkaSink.builder()
.setKafkaProducerConfig(props)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("integ")
.setRecordSerializer(new CustomSerializationSchema(outputTopic))
.build();
stream.sinkTo(sink);
```
Finally, execute the Stream execution environment that was retrieved and run it.
```java
env.execute();
```
# Upstash Kafka with ksqlDB
This tutorial shows how to integrate Upstash Kafka with ksqlDB
[ksqlDB](https://www.confluent.io/product/ksqldb) is a SQL interface for
performing stream processing over the Kafka environment.
## Upstash Kafka Setup
Create a Kafka cluster using [Upstash Console](https://console.upstash.com) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
## ksqlDB Setup
Upstash does not have a managed ksqlDB. Therefore, set up ksqlDB on a docker
container and replace UPSTASH-KAFKA-\* placeholders with your cluster
information.
First, download and install [Docker](https://www.docker.com/).
Create a `docker-compose.yml` file as below:
```yml
version: "2"
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "UPSTASH_KAFKA_ENDPOINT"
KSQL_SASL_MECHANISM: "SCRAM-SHA-256"
KSQL_SECURITY_PROTOCOL: "SASL_SSL"
KSQL_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="UPSTASH_KAFKA_USERNAME" password="UPSTASH_KAFKA_PASSWORD";'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
```
Open your CLI, navigate to the folder that includes the docker-compose.yml file
you created and start ksqlDB by running `docker-compose up`.
When you check your Kafka cluster from
[console](https://console.upstash.com/kafka), you will see new topics created
after you start ksqlDB.
## Streaming From One Topic to Another Topic
Implementing a word count example project can be done with both ksqlDB CLI and
Java client. In both ways, it is going to be done by consecutive streams. The
operations of the process will be as
`receive input > split into array > convert to rows > count occurrences`.
### Using ksqlDB CLI
Start the ksqlDB CLI by running the following command:
```
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
```
Create the first stream, which reads from the "input" topic:
```
ksql> CREATE STREAM source_stream (sentence VARCHAR) WITH (kafka_topic='input', value_format='json', partitions=1);
```
Create the second stream, which reads from source\_stream, splits the string to
an array, and writes to the split\_stream topic.
```
ksql> CREATE STREAM split_stream AS SELECT regexp_split_to_array(sentence, ' ') as word_array FROM source_stream EMIT CHANGES;
```
Next, create the third stream, which reads from split\_stream created above,
converts word\_array to rows, and writes to explode\_stream.
```
ksql> CREATE STREAM explode_stream AS SELECT explode(word_array) as words FROM split_stream EMIT CHANGES;
```
Lastly, create a table, which will count the wordsâ occurrences and write it to
the "OUTPUT" topic.
```
ksql> CREATE TABLE output AS SELECT words as word, count(words) as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;
```
You can check what you have created so far by running the following commands on
ksqlDB CLI.
```
ksql> show tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
--------------------------------------------------------------------
OUTPUT | OUTPUT | KAFKA | JSON | false
--------------------------------------------------------------------
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
EXPLODE_STREAM | EXPLODE_STREAM | KAFKA | JSON | false
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
SOURCE_STREAM | input | KAFKA | JSON | false
SPLIT_STREAM | SPLIT_STREAM | KAFKA | JSON | false
------------------------------------------------------------------------------------------
```
### Using Java Client
#### Project Setup
> :pushpin: **Note** If you already have a project and want to implement Upstash
> Kafka and ksqlDB integration into it, you can skip this section and continue
> with
> [Add Ksqldb and Kafka into the Project](#add-ksqldb-and-kafka-into-the-project).
Install Maven to your machine by following
[Maven Installation Guide](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html)
Run `mvn âversion` in a terminal or a command prompt to make sure you have Maven
downloaded.
It should print out the version of the Maven you have:
```
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running `cd `
Run the following command:
```
mvn archetype:generate -DgroupId=com.kafkaksqldbinteg.app -DartifactId=kafkaksqldbinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
```
#### Add ksqlDB and Kafka into the Project
Open the project folder using an IDE with maven plugins such as Intellij, Visual
Studio, Eclipse, etc. Add ksqlDB into the `pom.xml` file.
```xml
confluentconfluent-repohttp://packages.confluent.io/maven/io.confluent.ksqlksqldb-api-client7.3.0
```
#### Streaming
Import the following packages.
```java
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import java.util.concurrent.CompletableFuture;
```
Create a ksqlDB client first.
```java
String KSQLDB_SERVER_HOST = "localhost";
int KSQLDB_SERVER_HOST_PORT = 8088;
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);
```
Create the first stream, which reads from "input" topic:
```java
String SOURCE_STREAM = "CREATE STREAM IF NOT EXISTS source_stream (sentence VARCHAR)" +
" WITH (kafka_topic='input', value_format='json', partitions=1);";
CompletableFuture result =
client.executeStatement(SOURCE_STREAM);
System.out.println(result);
```
Create the second stream, which reads from source\_stream, split the string to an
array, and writes to the split\_stream topic.
```java
String SPLIT_STREAM = "CREATE STREAM IF NOT EXISTS split_stream " +
"AS SELECT regexp_split_to_array(sentence, ' ') " +
"as word_array FROM source_stream EMIT CHANGES;";
CompletableFuture result1 =
client.executeStatement(SPLIT_STREAM);System.out.println(result1);
```
Next, create the third stream, which reads from split\_stream created above,
converts word\_array to rows, and writes to explode\_stream.
```java
String EXPLODE_STREAM = "CREATE STREAM IF NOT EXISTS explode_stream " +
"AS SELECT explode(word_array) " +
"as words FROM split_stream EMIT CHANGES;";
CompletableFuture result2 =
client.executeStatement(EXPLODE_STREAM);System.out.println(result2);
```
Lastly, create a table, which will count the wordsâ occurrences and write it to
the "OUTPUT" topic.
```java
String OUTPUT_TABLE = "CREATE TABLE output " +
"AS SELECT words as word, count(words) " +
"as occurrence FROM explode_stream GROUP BY words EMIT CHANGES;";
CompletableFuture result3 =
client.executeStatement(OUTPUT_TABLE);System.out.println(result3);
```
## Results
The word count stream we created above is taking input sentences in JSON format
from the "input" topic and sends word count results to the "OUTPUT" topic.
You can both send input and observe the output on
[console](https://console.upstash.com/kafka).
Send the input sentence to the "input" topic. The key can be a random string,
but since we defined "sentence" as a field while creating the `source_stream`,
the value must be a JSON that includes âsentenceâ as a key for this use case:
```json
{
âsentenceâ: âThis is an example sentenceâ
}
```
Once you send this message to "input" topic, you can observe the result at
"OUTPUT" topic as following:
```
Timestamp Key Value
2022-12-06 23:39:56 This {"OCCURRENCE":1}
2022-12-06 23:39:56 is {"OCCURRENCE":1}
2022-12-06 23:39:56 an {"OCCURRENCE":1}
2022-12-06 23:39:56 example {"OCCURRENCE":1}
2022-12-06 23:39:56 sentence {"OCCURRENCE":1}
```
# Upstash Kafka with Materialize
This tutorial shows how to integrate Upstash Kafka with Materialize
[Materialize](https://materialize.com/docs/get-started/) is a PostgreSQL
wire-compatible stream database for low latency applications.
## Upstash Kafka Setup
Create a Kafka cluster using [Upstash Console](https://console.upstash.com) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).
Create two topics by following the creating topic
[steps](https://docs.upstash.com/kafka#create-a-topic). Letâs name first topic
`materialize_input`, since we are going to stream from this topic to Materialize
database. Name of the second topic can be `materialize_output`. This one is
going to receive stream from Materialize.
## Materialize Setup
Materialize is `wire-compatible` with PostgreSQL, thatâs why it can be used with
most of the SQL clients.
[Sign up](https://materialize.com/register) and complete activation of your
Materialize account first.
Once you completed your activation, you can sign in and enable the region to run
Materialize database. It can provide better performance if you enable the same
region with location of your Upstash Kafka cluster.
Region setup takes a few minutes. During that time, create a new app password
from `Connect` tab for your project. This step will generate a password and
display it just once. You should copy that password to somewhere safe before it
disappears.
To interact with your Materialize database, you need to download one of the
PostgreSQL installers mentioned
[here](https://materialize.com/docs/get-started/quickstart/#before-you-begin).
After installing a PostgreSQL on your machine, open SQL shell, run the command
appeared on Connect tab to connect SQL Shell to Materialize database. You will
need to enter the app password to log in.
Now you are connected to your Materialize!
## Connect Materialize to Upstash Kafka
You first need to save Upstash username and password to Materializeâs secret
management system to be able to connect Materialize to Upstash Kafka.
To do this, run the following command from the psql terminal by replacing
`` and `` with the username and password you
see on your Upstash Kafka cluster:
```sql
CREATE SECRET upstash_username AS '';
CREATE SECRET upstash_password AS '';
```
`CREATE SECRET` command stores a sensitive value with the name assigned to it as
identifier. Once you define name and corresponding value with this command, you
will then be able to use the sensitive value by calling its name.
As the next step, we need to create a connection between Materialize and Upstash
Kafka by running following command from the psql terminal:
```sql
CREATE CONNECTION TO KAFKA (
BROKER '',
SASL MECHANISMS = 'SCRAM-SHA-256',
SASL USERNAME = SECRET upstash_username,
SASL PASSWORD = SECRET upstash_password
);
```
`` is the going to be used as the name of the connection. You
can name it as you wish.
`` is the endpoint of your Kafka. You can copy it from your
Upstash console.
Your connection is now established between Upstash Kafka and Materialize!
## Create Source
Source means streaming from external data source or pipeline to Materialize
database. By creating source, the message you add to the topic is going to be
streamed from Upstash Kafka to Materialize source.
You can create a source from SQL Shell first by running the following command:
```sql
CREATE SOURCE
FROM KAFKA CONNECTION (TOPIC '')
FORMAT BYTES
WITH (SIZE = '3xsmall');
```
In this tutorial, we are going to use connection we established in the previous
section and use âmaterialized\_inputâ as source topic.
Once you created source, you can see it:
```sql
materialize=> SHOW SOURCES;
name | type | size
------------------------+-----------+---------
upstash_source | kafka | 3xsmall
upstash_source_progress | subsource |
(2 rows)
```
To test this source, go to your
[Upstash console](https://console.upstash.com/kafka), open `materialize_input`
topic in your Kafka cluster.
Produce a message in this topic.
The message you sent to this topic should be streamed to Materialize source.
Query the Materialize source from SQL Shell by converting it to a readable form
since we defined the source format as âBYTEâ while creating the source.
```sql
materialize=> SELECT convert_from(data, 'utf8') as data from upstash_source;
data
-----------------------------
"This is my test sentence."
(1 row)
```
## Create Sink
Sink means streaming from Materialize database to external data stores or
pipelines. By creating a sink, the data you inserted to Materialize table or
source will be streamed to the Upstash Kafka topic.
For testing purposes, let's create a new table. This table will be streamed to
the Upstash Kafka sink topic.
```sql
materialize=> CREATE TABLE mytable (name text, age int);
CREATE TABLE
materialize=> SELECT * FROM mytable;
name | age
-----+-----
(0 rows)
```
Create a sink from SQL Shell by running the following command:
```sql
CREATE SINK
FROM