# Using Server-Sent Events (SSE) to stream LLM responses in Next.js

> **Source:** https://upstash.com/blog/sse-streaming-llm-responses
> **Date:** 2024-05-06
> **Author(s):** Rishi Raj Jain
> **Reading time:** 16 min read
> **Tags:** nextjs, redis, vercel, streams, openai
> **Format:** text/markdown — machine-readable content for agents and LLMs

---

AI-powered applications that respond quickly enhance the user experience by making them feel heard and seen, instantly. With streaming, you can create such chatbots that instantly address queries or deliver personalized recommendations in real-time. It's not just about speed—it's about meeting user needs promptly, enhancing its perceived worth.

Streaming involves sending data in small, continuous chunks rather than in one large block. In the context of AI applications, like chatbots or recommendation systems, streaming means sending partial responses to users as soon as they're available, instead of waiting to compile and deliver the entire response at once. This approach ensures that users receive immediate feedback or information, creating a more dynamic and responsive user experience.

In this guide, you will learn how to leverage Server-Sent Events (SSE) to implement streaming in Next.js endpoints with LangChain and OpenAI's language models. Further, you will learn how to cache the streaming responses with Upstash.

## Prerequisites

You will need the following:

- [Node.js 18](https://nodejs.org/en/blog/announcements/v18-release-announce/) or later
- An [OpenAI](https://platform.openai.com/api-keys) account
- An [Upstash](https://upstash.com/) account
- A [Vercel](https://vercel.com/) account

## Tech Stack

| Technology | Description |
| --- | --- |
| [Upstash](https://upstash.com) | Serverless database platform. You are going to use Upstash Redis to cache OpenAI API responses. |
| [OpenAI](https://platform.openai.com) | An artificial intelligence research lab focused on developing advanced AI technologies. |
| [LangChain](https://js.langchain.com) | Framework for developing applications powered by language models. |
| [Next.js](https://nextjs.org) | The React Framework for the Web. |
| [Vercel](https://vercel.com) | A cloud platform for deploying and scaling web applications. |

## Generate an OpenAI Token

Using OpenAI API, you are able to create chatbot responses using AI. Any request to OpenAI API requires an authorization token. To obtain the token, navigate to the [API Keys](https://platform.openai.com/api-keys) in your OpenAI account, and click the **Create new secret key** button. Copy and securely store this token for later use as **OPENAI_API_KEY** environment variable.

## Setting up Upstash Redis

Once you have created an [Upstash account](https://console.upstash.com) and are logged in you are going to go to the Redis tab and create a database.

![Upstash Redis Tab](/blog/rag-chatbot-upstash/redis-tab.png)

![Create An Upstash Redis Instance](/blog/rag-chatbot-upstash/redis-create.png)

Scroll down until you find the REST API section and select the `.env` button. Copy the content and save it somewhere safe.

![Upstash Redis Rest API](/blog/rag-chatbot-upstash/redis-rest.png)

## Create a new Next.js application

Let’s get started by creating a new Next.js project. Open your terminal and run the following command:

```bash
npx create-next-app@latest my-app
```

When prompted, choose:

- `Yes` when prompted to use TypeScript.
- `No` when prompted to use ESLint.
- `Yes` when prompted to use Tailwind CSS.
- `No` when prompted to use `src/` directory.
- `Yes` when prompted to use App Router.
- `No` when prompted to customize the default import alias (`@/*`).

Once that is done, move into the project directory and start the app in development mode by executing the following command:

```bash
cd my-app
npm run dev
```

The app should be running on [localhost:3000](http://localhost:3000). Stop the development server to install LangChain dependencies in the application by executing the following command:

```bash
npm install @langchain/openai @langchain/community
```

The command installed the following library:

- `@langchain/openai`: A LangChain package to interface with the OpenAI series of models.
- `@langchain/community`: A collection of third party integrations for plug-n-play with LangChain core.

Now, create a `.env` file at the root of your project. You are going to add the `OPENAI_API_KEY` you obtained earlier.

It should look something like this:

```bash
# .env

# OpenAI Environment Variable
OPENAI_API_KEY="..."

# Upstash Redis Environment Variables
UPSTASH_REDIS_REST_URL="..."
UPSTASH_REDIS_REST_TOKEN="..."
```

To create API endpoints in Next.js, you will use Next.js [Route Handlers](https://nextjs.org/docs/app/building-your-application/routing/route-handlers) which allow you to serve responses over Web [Request](https://developer.mozilla.org/docs/Web/API/Request) and [Response](https://developer.mozilla.org/en-US/docs/Web/API/Response) APIs. To start creating an API route in Next.js that streams responses to the user, execute the following command:

```bash
mkdir -p app/api/stream/completion/chat
mkdir app/lib
```

  The `-p` flag creates parent directories for the `chat` directory if they're missing.

This sets up our Next.js project. Now, let's move on to creating a cache handler with Upstash.

## Instantiate an Upstash Redis Cache Client

With `UpstashRedisCache` class, you can initialize a Redis cache client for caching OpenAI API responses within a few lines of code. Create a file `upstashCache.tsx` inside the `app/lib` directory:

```tsx
// File: app/lib/upstashCache.tsx

import { UpstashRedisCache } from "@langchain/community/caches/upstash_redis";

// See https://docs.upstash.com/redis/howto/connectwithupstashredis#quick-start for connection options
export default new UpstashRedisCache({
  config: {
    url: process.env.UPSTASH_REDIS_REST_URL,
    token: process.env.UPSTASH_REDIS_REST_TOKEN,
  },
});
```

Using the cache handler, it becomes super easy to cache the conversations or completion responses generated by OpenAI API. Let's move on to understand how you can start using Server-Sent Events in Next.js.

## Creating Server-Sent Events API in Next.js App Router

[Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) allow you to deliver real-time data updates from the server to the client without the need for continuous polling. SSE enables a unidirectional flow of data over a single, long-lived HTTP connection.

Create a file named `route.ts` with the following code in the `app/api/stream` directory to understand the minimal setup required to stream responses using Server-Sent Events:

```jsx
// File: app/api/stream/route.ts

// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";

export async function POST(request: Request) {
  const encoder = new TextEncoder()
  // Create a streaming response
  const customReadable = new ReadableStream({
    start(controller) {
      const message = "A sample message."
      controller.enqueue(encoder.encode(`data: ${message}\n\n`))
    },
  })
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  })
}
```

The code above does the following:

- The dynamic constant is set to `force-dynamic`, preventing caching of responses on the Vercel platform. This ensures that each request for the SSE stream gets fresh data and is not served from cache.
- A `ReadableStream` is created to generate a stream of data to be sent to the client. In the start method of the stream, a message is encoded and enqueued into the stream's controller. This message will be sent to the client as part of the SSE stream.
- The `Response` object is created with headers specific to Server-Sent Events. These headers include the following:
  - `Connection: keep-alive` to maintain the connection open for streaming.
  - `Cache-Control: no-cache, no-transform` to prevent caching in browser.
  - `Content-Type: text/event-stream; charset=utf-8` to specify the content type as Server-Sent Events.
- The Response object containing the custom ReadableStream is returned from the endpoint. This response will be sent to clients requesting the SSE stream.

With this, you have created an endpoint that is able to stream responses. In the next sections, you will understand how you can stream [Completion](https://platform.openai.com/docs/guides/text-generation/completions-api) and [Chat Completion](https://platform.openai.com/docs/guides/text-generation/chat-completions-api) API responses from OpenAI with LangChain callbacks.

## Streaming OpenAI Completion API responses with LangChain Callbacks

The OpenAI [Completion API](https://platform.openai.com/docs/guides/text-generation/completions-api) provides a powerful tool for generating text based on given prompt(s), useful in various real-life scenarios such as content generation and language translation. Integrating LangChain callbacks enables real-time streaming of responses, increasing perceived responsiveness in applications.

Create a file named `completionModel.tsx` in `app/lib` directory, defining a function which initializes an OpenAI instance with streaming enabled and LangChain callbacks, generating text in real-time.

```tsx
// File: app/lib/completionModel.tsx

import { OpenAI } from "@langchain/openai";

export const completionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new OpenAI({
    streaming: true,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(`data: ${token}\n\n`));
        },
        async handleLLMEnd(output) {
          controller.close();
        },
      },
    ],
  });
```

The code above does the following:

- Imports the `OpenAI` class from the `@langchain/openai` package.
- Exports a function named `completionModel` which takes two parameters:
  - controller: A `ReadableStreamDefaultController` object which allows you to enqueue messages into the stream.
  - encoder: A `TextEncoder` object that encodes the messages into `Uint8Array` object.
- Creates a new instance of the `OpenAI` class with the following configuration options:
  - `streaming`: Set to true, indicating that the API should return streaming responses.
  - `callbacks`: An array containing a single object with two callback functions, used to intercept response creation by LLM at each stage.
- `handleLLMNewToken`: A callback invoked when a new token is generated by the language model. It enqueues the encoded token to the controller.
- `handleLLMEnd`: An asynchronous callback invoked when the language model generation is complete. It closes the controller.

Further, create a file named `route.ts` with the following code in the `app/api/stream/completion` directory. The changes from the initial streaming route handler that you created have been higlighted below:

```diff
// File: app/api/stream/completion/route.ts

// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";

+ import { completionModel } from "@/app/lib/completionModel";

export async function POST(request: Request) {
+  // Obtain the user message from request's body
+  const { message } = await request.json();
  const encoder = new TextEncoder();
  // Create a streaming response
  const customReadable = new ReadableStream({
    async start(controller) {
+      // Generate a streaming response from OpenAI with LangChain
+      await completionModel(controller, encoder).invoke(message);
    },
  });
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  });
}
```

Apart from what you did earlier to create a streaming endpoint in Next.js, the code additions above do the following:

- Imports the `completionModel` function.
- Extracts user message from the request body using `request.json()`.
- Inside the `start` method of the ReadableStream, the `completionModel` function is invoked asynchronously with the message obtained from the request. This function generates a streaming response from OpenAI with LangChain.

To cache user queries and their responses with Upstash, import the Upstash Redis cache client into the `completionModel` function as follows:

```diff
import { OpenAI } from "@langchain/openai";
+ import cache from '@/app/lib/upstashCache';

export const completionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new OpenAI({
+   cache,
    streaming: true,
    temperature: 0.9,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(token));
        },
        async handleLLMEnd(output) {
          console.log(output.generations[0][0].text);
          controller.close();
        },
      },
    ],
  });
```

With minimal changes, you have enabled caching of the OpenAI Completion API responses with Upstash.

Now, let's move on to learning how OpenAI Chat Completion API responses can be streamed in Next.js endpoints.

## Streaming OpenAI Chat Completion API responses with LangChain Callbacks

The OpenAI [Chat Completion API](https://platform.openai.com/docs/guides/text-generation/chat-completions-api) provides a powerful tool for generating real-time conversational responses based on provided messages. The applications span from building chatbots capable of engaging in natural dialogues to enhancing customer support systems with AI-driven interaction capabilities. It enables developers to create dynamic and responsive conversational interfaces.

Create a file named `chatCompletionModel.tsx` in `app/lib` directory, defining a `chatCompletionModel` function which creates a streaming interface for OpenAI's Chat Completion API with LangChain callbacks, facilitating real-time conversation generation based on provided messages.

```tsx
// File: app/lib/chatCompletionModel.tsx

import { ChatOpenAI } from "@langchain/openai";

export type ConversationMessage = {
  role: string;
  content: string;
};

export type ConversationMessages = ConversationMessage[];

export const chatCompletionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new ChatOpenAI({
    streaming: true,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(`data: ${token}\n\n`));
        },
        async handleLLMEnd(output) {
          controller.close();
        },
      },
    ],
  });
```

The code above does the following:

- Imports the `ChatOpenAI` class from the `@langchain/openai` package.
- Creates the following type definitions:
  - `ConversationMessage`: Defines a type for a single message in a conversation. It contains two properties: **role** (indicating the role of the sender) and **content** (the message content).
  - `ConversationMessages`: Defines a type for an array of conversation messages.
- Exports a function named `chatCompletionModel` which takes two parameters:
  - controller: A `ReadableStreamDefaultController` object which allows you to enqueue messages into the stream.
  - encoder: A `TextEncoder` object that encodes the messages into `Uint8Array` object.
- Creates a new instance of the `ChatOpenAI` class with the following configuration options:
  - `streaming`: Set to true, indicating that the API should return streaming responses.
  - `callbacks`: An array containing a single object with two callback functions, used to intercept response creation by LLM at each stage.
- `handleLLMNewToken`: A callback invoked when a new token is generated by the language model. It enqueues the encoded token to the controller.
- `handleLLMEnd`: An asynchronous callback invoked when the language model generation is complete. It closes the controller.

Further, create a file named `route.ts` with the following code in the `app/api/stream/completion/chat` directory. The changes from the initial streaming route handler that you created have been higlighted below:

```diff
// File: app/api/stream/completion/chat/route.ts

// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";

+ import {
+   type ConversationMessage,
+   chatCompletionModel,
+ } from "@/app/lib/chatCompletionModel";

export async function POST(request: Request) {
+   // Obtain the conversation messages from request's body
+   const { messages = [] } = await request.json();
  const encoder = new TextEncoder();
  // Create a streaming response
  const customReadable = new ReadableStream({
    async start(controller) {
+       // Generate a streaming response from OpenAI with LangChain
+       await chatCompletionModel(controller, encoder).invoke(
+         messages.map((i: ConversationMessage) => [i["role"], i["content"]])
+       );
    },
  });
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  });
}
```

Apart from what you did earlier to create a streaming endpoint in Next.js, the code additions above do the following:

- Imports the `chatCompletionModel` function and `ConversationMessage` type definition.
- Extracts the messages in conversation from the request body using `request.json()`.
- Inside the `start` method of the ReadableStream, the `chatCompletionModel` function is invoked asynchronously after transformed messages array. Each message is transformed into an array containing the role and content of the message. This function generates a streaming response from OpenAI with LangChain.

To cache the chat history with Upstash, import the Upstash Redis cache client into the `chatCompletionModel` function as follows:

```diff
+ import cache from '@/app/lib/upstashCache';
import { ChatOpenAI } from "@langchain/openai";

export type ConversationMessage = {
  role: string;
  content: string;
};

export type ConversationMessages = ConversationMessage[];

export const chatCompletionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new ChatOpenAI({
+   cache,
    streaming: true,
    temperature: 0.9,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(token));
        },
        async handleLLMEnd(output) {
          console.log(output.generations[0][0].text);
          controller.close();
        },
      },
    ],
  });
```

With minimal changes, you have enabled caching of the entire chat history with Upstash.

Now, let's move on to creating the client side React components to consume the streaming endpoints.

## Setup Next.js frontend to listen to Server-Sent Events

In this section, you will learn how to setup a minimal listener to Server-Sent Events API messages, and an approach to manage the state of messages exchanged between user and AI.

### Listening to Server-Sent Events API in React

To listen to SSE API in our client-side component in React, you are going to initiate a POST request to the specified API route. Upon receiving a response, you would decode the incoming data as a string using `TextDecoderStream` method, and continuously read the data from stream.

```tsx
"use client";

export default function () {

  const obtainAPIResponse = async (apiRoute: string, apiData: any) => {
    // Initiate the first call to connect to SSE API
    const apiResponse = await fetch(apiRoute, {
      method: "POST",
      headers: {
        "Content-Type": "text/event-stream",
      },
      body: JSON.stringify(apiData),
    });

    if (!apiResponse.body) return;

    // To decode incoming data as a string
    const reader = apiResponse.body
      .pipeThrough(new TextDecoderStream())
      .getReader();

    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        break;
      }
      if (value) {
        // Do something
      }
    }
  };

  return <></>
}
```

### Managing state of the conversation in React

To manage the state of the conversation between the user and chatbot, you can use state variables. You would update the state variables `messages` and `latestMessage` to store the conversation history and the latest message, respectively. By updating these state variables within the loop that processes incoming data from the SSE API, you can ensure real-time updates of the conversation display. The code additions to achieve the same are as follows:

```diff
"use client";

+ import { useEffect, useState } from "react";
+ import { ConversationMessages } from "@/app/lib/chatCompletionModel";

export default function () {
+  const [latestMessage, setLatestMessage] = useState<string>("");
+  const [messages, setMessages] = useState<ConversationMessages>([]);

  const obtainAPIResponse = async (apiRoute: string, apiData: any) => {
    // Initiate the first call to connect to SSE API
    const apiResponse = await fetch(apiRoute, {
      method: "POST",
      headers: {
        "Content-Type": "text/event-stream",
      },
      body: JSON.stringify(apiData),
    });

    if (!apiResponse.body) return;

    // To decode incoming data as a string
    const reader = apiResponse.body
      .pipeThrough(new TextDecoderStream())
      .getReader();

+    let incomingMessage = "";

    while (true) {
      const { value, done } = await reader.read();
      if (done) {
+        // Insert the response received into the messages state
+        setMessages((messages) => [
+          ...messages,
+          { role: "assistant", content: incomingMessage },
+        ]);
+        // Reset the latest message's state received 
+        setLatestMessage("");
        break;
      }
      if (value) {
+        // Append the incoming data to latest message's value
+        incomingMessage += value;
+        setLatestMessage(incomingMessage);
      }
    }
  };

  return <></>
}
```

The code additions above do the following:

- A state variable is declared to store the incoming data stream received from the SSE API.
- Upon receiving a complete message (indicated by `done` being true), the latest received message is added to the `messages` state array. This message is formatted with a role of **assistant** to distinguish it from user messages. Also, the latest message state is reset to an empty string to prepare for the next incoming message.
- As data is received incrementally (not yet forming a complete message), it is appended to the `incomingMessage` variable.
- The latest message state is updated with the concatenated incoming data, ensuring real-time updates as new data arrives.

Great! With the reactive `latestMessage` and `messages` variables, you can now create dynamic user interfaces representing the messages exchanged in the conversation and the latest AI generated response.

> That was a lot of learning! You’re all done now ✨

## Deploy to Vercel

The repository is now ready to deploy to Vercel. Use the following steps to deploy:

- Start by creating a GitHub repository containing your app's code.
- Then, navigate to the Vercel Dashboard and create a **New Project**.
- Link the new project to the GitHub repository you have just created.
- In **Settings**, update the **Environment Variables** to match those in your local `.env` file.
- Deploy! 🚀

## More Information

For more detailed insights, explore the references cited in this post.

- [GitHub Repository](https://github.com/rishi-raj-jain/sse-streaming-llm-response)
- [Server Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
- [Next.js Streaming](https://vercel.com/docs/functions/streaming/quickstart)
- [Caching layer for LLMs](https://js.langchain.com/docs/modules/model_io/llms/llm_caching#caching-with-upstash-redis)