# Serverless Event Sourcing and CQRS with Next.js and Upstash

> **Source:** https://upstash.com/blog/nextjs-kafka-upstash-cqrs
> **Date:** 2023-03-21
> **Author(s):** Kay Plößer
> **Reading time:** 14 min read
> **Tags:** nextjs, kafka, vercel, cqrs
> **Format:** text/markdown — machine-readable content for agents and LLMs

---

Microservices are a widespread software architecture, and with serverless technologies that allow for granular deployments, they have become even more critical. Event sourcing and command query responsibility segregation (CQRS) are architectural patterns that help you to get the most out of your microservices.

## What is Event Sourcing?

The primary idea of event sourcing is to think of all operations in your system as events. You then save the events that modify your system state in an event store instead of applying their changes immediately. Other services will then load the events from the store and apply the changes asynchronously.

The event store is a ledger that keeps track of everything that happened to your system. You don’t delete anything; you save a delete event. This way, services you add to the system can recreate their internal state by reading the events. If someone accidentally deleted or overrode data, it’s still in the event store. You have exact timestamps for every interaction with your system.

## What is CQRS?

CQRS is an alternative to the create, read, update, and delete (CRUD) model. Instead of having one data model for all your operations, you create models for your commands (the writing operations ) and queries (the reading operations ). This gives you more flexibility in modeling your data and also simplifies the use of event sourcing. Every command becomes an event that goes to your event store. Every query reads from a database with the latest state created by applying events. Often these databases are called projections.

We will build a simple to-do list to learn how you can apply these patterns in a serverless way.

## Features

The to-do list will have an API that allows users to create, read, update, and delete tasks.

- The writing operations land in an event store.
- The event store will notify other services about new events so that they can update their state.
- The reading operations hit the projection store.
- The projection store listens to event notifications and calculates its state from the events.

## Technology

The bread and butter of this system are Upstash services. This way, everything is serverless, and the system scales automatically.

- Upstash Kafka as an event store
- Upstash Redis as a projection store
- QStash as a notification service

Additionally, we will use Next.js with Apollo GraphQL to build the gateway to our serverless system.

> Note: Upstash Kafka’s free tier only persists events for 7 days; in a production environment, you can use unlimited persistence of a paid account to prevent events from deletion.

## Prerequisites

You need [an Upstash account](https://console.upstash.com/auth/sign-in) to create the required databases, but the free tier is enough for this tutorial. You also need [an up-to-date installation of Node.js](https://nodejs.org/en). You can install everything else via NPM.

I also recommend using [GitHub Codespaces](https://github.com/features/codespaces) to run the example because you need a public hostname for QStash.

## Getting the Full Code

This article will only explain the crucial parts of the system. To run the example, [clone the repository from GitHub](https://github.com/kay-is/upstash-kafka-cqrs) before you move to the **Functional Steps** section.

## Implementation

Let’s look at the architecture of the whole system first. Figure 1 illustrates how the Upstash services and our serverless functions work together.

![Figure 1: Serverless CQRS and event store architecture](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ehfkz9zqb6o4l98q5fy4.png)
_Figure 1: Serverless CQRS and event store architecture_

The first function handles client requests with a GraphQL API. GraphQL queries handle the reading queries of the CQRS pattern, and GraphQL mutations handle the writing commands.

Let’s check out the GraphQL schema:

```graphql title="schema/typedefs.ts"
...
type Task {
  id: String!
  text: String!
  done: Boolean!
}
type Query {
  list: [Task]
}
type Mutation {
  createTask(text: String!): Task
  updateTask(id: String!, text: String!, done: Boolean!): Task
  deleteTask(id: String!): String
}
...
```

GraphQL schemas require separate types for queries and mutations, we get one model for read access (`Query`) and one for write access (`Mutation`) out of the box.

The query resolvers use `ProjectionStore`, a TypeScript class that works as a client to Upstash Redis.

The mutation resolvers use `EventStore`, a TypeScript class that works as a client to Upstash Kafka.

The `EventStore` will call a `Notificator` when it writes a new event to Upstash Kafka. This `Notifier` is a TypeScript class that acts as a client to QStash.

Our stores are just abstractions over Upstash services.

The second function gets notified by QStash when new events are stored in Upstash Kafka. It will read the new events via `EventStore` and process and write them via `ProjectionStore`. The system's clients never use this function; it just runs in the background.

### Implementing the Commands

We will start by implementing the commands of CQRS with GraphQL mutation resolvers. Let’s look at the following code:

```ts title="schema/resolvers.ts"
...
export const resolvers = {
  Query: {...},
  Mutation: {
    async createTask(_: unknown, input: CreateTaskInput) {
      const newTask = {
        ...input,
        id: String(Date.now() + Math.random()),
        done: false,
      }
      await eventStore.saveEvent({
        type: "create-task",
         data: newTask
      })
      return newTask
    },
    async updateTask(_: unknown, input: UpdateTaskInput) {
      await eventStore.saveEvent({
        type: "update-task",
        data: input
      })
      return { ...input }
    },
    async deleteTask(_: unknown, input: DeleteTaskInput) {
      await eventStore.saveEvent({
        type: "delete-task",
        data: { id: input.id, text: "", done: false },
      })
      return input.id
    },
  },
}
...
```

As we saw in the schema before, we need three resolvers:

1. `createTask` constructs a new task object, puts it into an event object, and sends it to the `EventStore`.
2. `updateTask` puts the resolvers input into a new event object and sends it to the `EventStore`.
3. `deleteTask` puts the `input.id` into a new event object and sends it to the `EventStore`.

Besides the task construction in `createTask`, there is not much logic in these resolvers. Everything is handed off to the `EventStore`. So, let’s look at the `saveEvent` method of our `EventStore`.

```ts title="utils/eventstore.ts"
...
async saveEvent(event: Omit<TaskEvent, "offset">) {
  await fetch(
    this.producerUrl + JSON.stringify(event),
    this.fetchConfig
  )
  await notify()
}
...
```

Nothing fancy here either; just sending a `GET` request with our new event to Upstash Kafka and calling `notify`. For completeness, let’s also look at the implementation of the `notify` function.

```ts title="utils/notificator.ts"
...
export const notify = async () => {
  const qstashClient = new Client({
    token: process.env.QSTASH_TOKEN!
  })
  await qstashClient.publish({
    topic: process.env.QSTASH_TOPIC!,
    body: String(Math.random()),
    headers: {
       content: "application/json",
    },
  })
}
...
```

Since QStash handles all the work, there is not much to see here either. While that might not be exciting, it shows how little code we must write to set up the whole thing.

### Implementing the Query

There is just one query resolver, and it lists all the tasks.

```ts title="schema/resolvers.ts"
...
export const resolvers = {
  Query: {
    async list() {
      return await projectionStore.loadTasks()
    },
  },
  Mutation: {...},
}
...
```

The resolver calls `loadTasks` on the `projectionStore`, which handles the work. The `projectionStore` uses Upstash Redis to store the state calculated from all the events we put in Upstash Kafka. Let’s look at the `loadTasks` method.

```ts title="utils/projectionstore.ts"
...
async loadTasks(): Promise<Task[]> {
  const taskIds = await this.redisClient.smembers("tasks")
  const pipeline = this.redisClient.pipeline()
  for (let taskId of taskIds) pipeline.get(taskId)
  return await pipeline.exec<Task[]>()
}
...
```

First, we load the `taskIds`, which are stored in [a Redis Set](https://redis.io/docs/data-types/sets/). Then we use each ID to fetch the corresponding task. [A pipeline](/docs/redis/sdks/javascriptsdk/pipeline) ensures we send all get commands in one request to Upstash.

### Implementing the Projection

Now that the GraphQL resolvers are ready, we must calculate our read state. We need to read all events from our `EventStore` and execute them on our `ProjectionStore`. This is done in a new serverless function that our `notify` function notifies via QStash.

The handler of our serverless function is quite simple:

```ts title="app/projection/route.ts"
...
export const POST = validate(async (request: Request) => {
  const offset = await projectionStore.loadOffset()
  const events = await eventStore.loadEvents(offset)
  await projectionStore.processEvents(events)
  return new Response()
})
...
```

If the request came from QStash, we would load the last `offset` stored in the `projectionStore` to know which events aren’t processed yet. Then we load the latest events from the `eventStore` and feed them to the `projectionStore` so it can calculate the new state.

The function handler is wrapped in a `validate` function from our `notificator`, which ensures only QStash can call this endpoint.

```ts title="utils/notificator.ts"
...
export const validate = (handler: RequestHandler) => {
  const qstashReceiver = new Receiver({
    currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
    nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
  })

  return async (request: Request) => {
    const isValid = await qstashReceiver.verify({
      signature: request.headers.get("upstash-signature")!,
      body: await request.text(),
    })

    if (!isValid) {
      return new Response("Unauthorized", {
        status: 401,
        statusText: "Unauthorized",
      })
    }

    return await handler(request)
  }
}
...
```

The `validate` function uses the QStash signing keys to check every request before calling the actual handler of an endpoint.

The `loadEvents` method of our `EventStore` looks like this:

```ts title="utils/eventstore.ts"
...
async loadEvents(offset: number): Promise<TaskEvent[]> {
  const response = await fetch(this.fetchUrl, {
    ...this.fetchConfig,
    method: "POST",
    body: JSON.stringify({
      topic: process.env.UPSTASH_KAFKA_TOPIC,
      partition: 0,
      offset,
    }),
  })
  const messages: {value: string; offset: number }[] =
    await response.json()
  return messages.map(({ value, offset }) => {
    const event = JSON.parse(value)
    return { ...event, offset }
  })
}
...
```

The method expects an `offset`, to ensure that only new events are loaded.

> Background: Every message in Kafka gets an offset number. This number is incremented with each new message. These offsets are used to prevent the double processing of a message. Which is exactly what we want to do.

We `fetch` all `messages` since the `offset` and extract the events from them.

Finally, the `processEvents` method of the `projectionStore` will calculate our state for the query resolver:

```ts title="utils/projectionstore.ts"
...
async processEvents(events: TaskEvent[]) {
  if (events.length === 0) return
  const pipeline = this.redisClient.pipeline()
  for (let event of events) {
    switch (event.type) {
      case "create-task":
        pipeline.set(event.data.id, event.data)
        pipeline.sadd("tasks", event.data.id)
        break
      case "update-task":
        pipeline.set(event.data.id, event.data)
        break
      case "delete-task":
        peline.srem("tasks", event.data.id)
        pipeline.del(event.data.id)
        break
    }
  }
  pipeline.set("lastOffset", offset)
  await pipeline.exec()
}
...
```

Again, we use the pipeline feature of Upstash Redis to ensure we only send one request with all Redis commands.

We will add at least one command to the pipeline. The `create-tasks` and `delete-task` event requires an update to the Redis Set that stores our ID, so it will use two commands. Ultimately, we add the last event `offset` so our `projectionStore` remembers where it left off when the next notification arrives.

The pipeline ensures we only send one request to Upstash, but it also puts the commands [in a transaction](https://github.com/upstash/upstash-redis/blob/351d1078d535b0f2b45b03fa70bdcad832e9b0b0/pkg/redis.ts#L358), which ensures that we only execute all or none of them. This way, we can be sure the `lastOffset` only gets updated when we really updated the state.

## Functional Steps

[Make sure you cloned the whole example from GitHub](https://github.com/kay-is/upstash-kafka-cqrs) before moving on.

In the next steps, we will set up the infrastructure for our system. We will use Upstash services, so you need an Upstash account for this, but we only use the free tier here.

We will write all Upstash credentials into a `.env.local` file, so they are available in the environment variables of our development machine. Here is a template for this file:

```shell title=".env.local"
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...

UPSTASH_KAFKA_URL=...
UPSTASH_KAFKA_TOPIC=...
UPSTASH_KAFKA_USERNAME=...
UPSTASH_KAFKA_PASSWORD=...

QSTASH_URL=...
QSTASH_TOPIC=...
QSTASH_TOKEN=...
QSTASH_CURRENT_SIGNING_KEY=...
QSTASH_NEXT_SIGNING_KEY=...
```

### Creating an Upstash Redis Database

1. Go to [the Upstash console](https://console.upstash.com/)
2. Click the “Create Database” button
3. Fill out the form shown in figure 2
4. Click “Create”

![Figure 2: Create Database](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/257znciwmil5ir2xts7i.png)
_Figure 2: Create Database_

After creating the database, you can scroll down to the “REST API” section and copy the `UPSTASH_REDIS_REST_UR`L and `UPSTASH_REDIS_REST_TOKEN` to the `.env.local` file.

### Creating an Upstash Kafka Database

1. Open [the Upstash Kafka Console](https://console.upstash.com/kafka)
2. Click the “Create Cluster” button
3. Fill out the form shown in figure 3
4. Click “Create Cluster”
5. Fill out the form shown in figure 4
6. Click “Create Topic”

![Figure 3: Create cluster](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hvfdadrgrogy5c0h22ln.png)
_Figure 3: Create cluster_

![Figure 4: Create topic](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/i7rqtsrk0bdf6aoz2sf4.png)
_Figure 4: Create topic_

After you created the database, scroll to the “REST API” section and copy the `UPSTASH_KAFKA_URL`, `UPSTASH_KAFKA_USERNAME`, and `UPSTASH_KAFKA_PASSWORD` to your `.env.local` file. Also, add the topic name “events” as `UPSTASH_KAFKA_TOPIC` to this file.

### Creating an Upstash QStash Queue

1. Open [the QStash Console](https://console.upstash.com/qstash)
2. Navigate to the “Topics” tab
3. Click the “Create Topic” button
4. Enter “events” as the new topics name
5. Click “Create”
6. Select the “Add new URL” input
7. Write your projection URL like seen in figure 5
8. Click “Add”

![Figure 5: Add new topic URL](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5gfy8ghh1wag2ohj43dc.png)
_Figure 5: Add new topic URL_

Copy the `QSTASH_URL`, `QSTASH_TOKEN`, `QSTASH_CURRENT_SIGNING_KEY`, and `QSTASH_NEXT_SIGNING_KEY` from the “Details” tab (seen in figure 6) to your `.env.local` file. Also, add `QSTASH_TOPIC` with “events” to the file.

![Figure 6: QStash credentials](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/2q9ogxyp17hzbqk7q3l8.png)
_Figure 6: QStash credentials_

## Testing the System

Now that we implemented and configured everything, we can test the system with Apollo’s GraphQL UI, which Next.js automatically starts on the `/graphql` endpoint.

Start the server with the following commands:

    $ npm i
    $ npm run dev

If you’re running on GitHub Codespaces, you must make the server's port public, as seen in figure 7, to Apollo GraphQL UI can read the schema from the server.

![Figure 7: Port configuration](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ubmscbjo15y3yw824rp5.png)
_Figure 7: Port configuration_

### Creating a New Task

Command:

```graphql
mutation {
  createTask(text: "Do the dishes!") {
    id
    text
    done
  }
}
```

Console output:

```shell
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] No offset found! Starting from 1.
[EventStore] Loading events at offset 1
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Creating new task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
```

       &nbsp;

### Listing all Tasks

Query:

```graphql
query {
  list {
    id
    text
    done
  }
}
```

Console Output:

```shell
[ProjectionStore] Loading tasks...
[ProjectionStore] 1 tasks loaded!
```

&nbsp;

### Updating an Existing Task

Command:

```graphql
mutation {
  updateTask(id: "...", text: "Do the dishes!", done: true) {
    id
    text
    done
  }
}
```

Console Output:

```shell
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 2
[EventStore] Loading events at offset 2
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Updating task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
```

### Deleting a Task

Command:

```graphql
mutation {
  deleteTask(id: "...")
}
```

Console Output:

```shell
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 3
[EventStore] Loading events at offset 3
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Deleting task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
```

## Summary

Applying the CQRS and Event-Sourcing patterns to serverless computing can help to structure your backends. This makes them more flexible in terms of data modeling and scalability.

Upstash services are ideally suited for this task; they bring all you need. You can store your events in Upstash Kafka, your projected state in Upstash Redis, and let QStash ensure all your services know when updates happen.

## Next Steps

The resolvers all run in one function, which makes scaling them independently impossible. But since resolvers are asynchronous in nature, you could move the actual work in a separate API endpoint, so the resolvers just have to redirect. This way, you can scale each resolver as required.

## Additional Resources

[You can find the full code repository on GitHub.](https://github.com/kay-is/upstash-kafka-cqrs)