Upstash Kafka with Materialize
This tutorial shows how to integrate Upstash Kafka with Materialize
Materialize is a PostgreSQL wire-compatible stream database for low latency applications.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
Create two topics by following the creating topic
steps. Let’s name first topic
materialize_input
, since we are going to stream from this topic to Materialize
database. Name of the second topic can be materialize_output
. This one is
going to receive stream from Materialize.
Materialize Setup
Materialize is wire-compatible
with PostgreSQL, that’s why it can be used with
most of the SQL clients.
Sign up and complete activation of your Materialize account first.
Once you completed your activation, you can sign in and enable the region to run Materialize database. It can provide better performance if you enable the same region with location of your Upstash Kafka cluster.
Region setup takes a few minutes. During that time, create a new app password
from Connect
tab for your project. This step will generate a password and
display it just once. You should copy that password to somewhere safe before it
disappears.
To interact with your Materialize database, you need to download one of the PostgreSQL installers mentioned here.
After installing a PostgreSQL on your machine, open SQL shell, run the command appeared on Connect tab to connect SQL Shell to Materialize database. You will need to enter the app password to log in.
Now you are connected to your Materialize!
Connect Materialize to Upstash Kafka
You first need to save Upstash username and password to Materialize’s secret management system to be able to connect Materialize to Upstash Kafka.
To do this, run the following command from the psql terminal by replacing
<upstash-username>
and <upstash-password>
with the username and password you
see on your Upstash Kafka cluster:
CREATE SECRET upstash_username AS '<upstash-username>';
CREATE SECRET upstash_password AS '<upstash-password>';
CREATE SECRET
command stores a sensitive value with the name assigned to it as
identifier. Once you define name and corresponding value with this command, you
will then be able to use the sensitive value by calling its name.
As the next step, we need to create a connection between Materialize and Upstash Kafka by running following command from the psql terminal:
CREATE CONNECTION <connection-name> TO KAFKA (
BROKER '<upstash-endpoint>',
SASL MECHANISMS = 'SCRAM-SHA-256',
SASL USERNAME = SECRET upstash_username,
SASL PASSWORD = SECRET upstash_password
);
<connection-name>
is the going to be used as the name of the connection. You
can name it as you wish.
<upstash-endpoint>
is the endpoint of your Kafka. You can copy it from your
Upstash console.
Your connection is now established between Upstash Kafka and Materialize!
Create Source
Source means streaming from external data source or pipeline to Materialize database. By creating source, the message you add to the topic is going to be streamed from Upstash Kafka to Materialize source.
You can create a source from SQL Shell first by running the following command:
CREATE SOURCE <source-name>
FROM KAFKA CONNECTION <connection_name> (TOPIC '<source-topic-name>')
FORMAT BYTES
WITH (SIZE = '3xsmall');
In this tutorial, we are going to use connection we established in the previous section and use “materialized_input” as source topic.
Once you created source, you can see it:
materialize=> SHOW SOURCES;
name | type | size
------------------------+-----------+---------
upstash_source | kafka | 3xsmall
upstash_source_progress | subsource |
(2 rows)
To test this source, go to your
Upstash console, open materialize_input
topic in your Kafka cluster.
Produce a message in this topic.
The message you sent to this topic should be streamed to Materialize source.
Query the Materialize source from SQL Shell by converting it to a readable form since we defined the source format as “BYTE” while creating the source.
materialize=> SELECT convert_from(data, 'utf8') as data from upstash_source;
data
-----------------------------
"This is my test sentence."
(1 row)
Create Sink
Sink means streaming from Materialize database to external data stores or pipelines. By creating a sink, the data you inserted to Materialize table or source will be streamed to the Upstash Kafka topic.
For testing purposes, let’s create a new table. This table will be streamed to the Upstash Kafka sink topic.
materialize=> CREATE TABLE mytable (name text, age int);
CREATE TABLE
materialize=> SELECT * FROM mytable;
name | age
-----+-----
(0 rows)
Create a sink from SQL Shell by running the following command:
CREATE SINK <sink-name>
FROM <source, table or mview>
INTO KAFKA CONNECTION <connection-name> (TOPIC '<sink-topic-name>')
FORMAT JSON
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
We are going to use the connection we created and “materialize_output” as sink topic. We can also use the table named “mytable” we have just created.
Once you created sink, you can see it:
materialize=> SHOW SINKS;
name | type | size
-------------+-------+---------
upstash_sink | kafka | 3xsmall
(1 row)
To test this sink, go to your Upstash console, open the output topic in your Kafka cluster. Open messages tab to see incoming messages.
Now insert a new row to the table to be streamed:
materialize=> INSERT INTO mytable VALUES ('Noah', 1);
INSERT 0 1
materialize=> SELECT * FROM mytable;
name | age
-----+-----
Noah | 1
(1 row)
You can see this row streamed to the Upstash Kafka output topic on your Upstash console.
Was this page helpful?