·16 min read

Using Server-Sent Events (SSE) to stream LLM responses in Next.js

Rishi Raj JainRishi Raj JainFounder of LaunchFa.st (Guest Author)

AI-powered applications that respond quickly enhance the user experience by making them feel heard and seen, instantly. With streaming, you can create such chatbots that instantly address queries or deliver personalized recommendations in real-time. It's not just about speed—it's about meeting user needs promptly, enhancing its perceived worth.

Streaming involves sending data in small, continuous chunks rather than in one large block. In the context of AI applications, like chatbots or recommendation systems, streaming means sending partial responses to users as soon as they're available, instead of waiting to compile and deliver the entire response at once. This approach ensures that users receive immediate feedback or information, creating a more dynamic and responsive user experience.

In this guide, you will learn how to leverage Server-Sent Events (SSE) to implement streaming in Next.js endpoints with LangChain and OpenAI's language models. Further, you will learn how to cache the streaming responses with Upstash.

Prerequisites

You will need the following:

Tech Stack

TechnologyDescription
UpstashServerless database platform. You are going to use Upstash Redis to cache OpenAI API responses.
OpenAIAn artificial intelligence research lab focused on developing advanced AI technologies.
LangChainFramework for developing applications powered by language models.
Next.jsThe React Framework for the Web.
VercelA cloud platform for deploying and scaling web applications.

Generate an OpenAI Token

Using OpenAI API, you are able to create chatbot responses using AI. Any request to OpenAI API requires an authorization token. To obtain the token, navigate to the API Keys in your OpenAI account, and click the Create new secret key button. Copy and securely store this token for later use as OPENAI_API_KEY environment variable.

Setting up Upstash Redis

Once you have created an Upstash account and are logged in you are going to go to the Redis tab and create a database.

Upstash Redis Tab

Create An Upstash Redis Instance

Scroll down until you find the REST API section and select the .env button. Copy the content and save it somewhere safe.

Upstash Redis Rest API

Create a new Next.js application

Let’s get started by creating a new Next.js project. Open your terminal and run the following command:

npx create-next-app@latest my-app

When prompted, choose:

  • Yes when prompted to use TypeScript.
  • No when prompted to use ESLint.
  • Yes when prompted to use Tailwind CSS.
  • No when prompted to use src/ directory.
  • Yes when prompted to use App Router.
  • No when prompted to customize the default import alias (@/*).

Once that is done, move into the project directory and start the app in development mode by executing the following command:

cd my-app
npm run dev

The app should be running on localhost:3000. Stop the development server to install LangChain dependencies in the application by executing the following command:

npm install @langchain/openai @langchain/community

The command installed the following library:

  • @langchain/openai: A LangChain package to interface with the OpenAI series of models.
  • @langchain/community: A collection of third party integrations for plug-n-play with LangChain core.

Now, create a .env file at the root of your project. You are going to add the OPENAI_API_KEY you obtained earlier.

It should look something like this:

# .env
 
# OpenAI Environment Variable
OPENAI_API_KEY="..."
 
# Upstash Redis Environment Variables
UPSTASH_REDIS_REST_URL="..."
UPSTASH_REDIS_REST_TOKEN="..."

To create API endpoints in Next.js, you will use Next.js Route Handlers which allow you to serve responses over Web Request and Response APIs. To start creating an API route in Next.js that streams responses to the user, execute the following command:

mkdir -p app/api/stream/completion/chat
mkdir app/lib

The -p flag creates parent directories for the chat directory if they're missing.

This sets up our Next.js project. Now, let's move on to creating a cache handler with Upstash.

Instantiate an Upstash Redis Cache Client

With UpstashRedisCache class, you can initialize a Redis cache client for caching OpenAI API responses within a few lines of code. Create a file upstashCache.tsx inside the app/lib directory:

// File: app/lib/upstashCache.tsx
 
import { UpstashRedisCache } from "@langchain/community/caches/upstash_redis";
 
// See https://docs.upstash.com/redis/howto/connectwithupstashredis#quick-start for connection options
export default new UpstashRedisCache({
  config: {
    url: process.env.UPSTASH_REDIS_REST_URL,
    token: process.env.UPSTASH_REDIS_REST_TOKEN,
  },
});

Using the cache handler, it becomes super easy to cache the conversations or completion responses generated by OpenAI API. Let's move on to understand how you can start using Server-Sent Events in Next.js.

Creating Server-Sent Events API in Next.js App Router

Server-Sent Events (SSE) allow you to deliver real-time data updates from the server to the client without the need for continuous polling. SSE enables a unidirectional flow of data over a single, long-lived HTTP connection.

Create a file named route.ts with the following code in the app/api/stream directory to understand the minimal setup required to stream responses using Server-Sent Events:

// File: app/api/stream/route.ts
 
// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";
 
export async function POST(request: Request) {
  const encoder = new TextEncoder()
  // Create a streaming response
  const customReadable = new ReadableStream({
    start(controller) {
      const message = "A sample message."
      controller.enqueue(encoder.encode(`data: ${message}\n\n`))
    },
  })
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  })
}

The code above does the following:

  • The dynamic constant is set to force-dynamic, preventing caching of responses on the Vercel platform. This ensures that each request for the SSE stream gets fresh data and is not served from cache.
  • A ReadableStream is created to generate a stream of data to be sent to the client. In the start method of the stream, a message is encoded and enqueued into the stream's controller. This message will be sent to the client as part of the SSE stream.
  • The Response object is created with headers specific to Server-Sent Events. These headers include the following:
    • Connection: keep-alive to maintain the connection open for streaming.
    • Cache-Control: no-cache, no-transform to prevent caching in browser.
    • Content-Type: text/event-stream; charset=utf-8 to specify the content type as Server-Sent Events.
  • The Response object containing the custom ReadableStream is returned from the endpoint. This response will be sent to clients requesting the SSE stream.

With this, you have created an endpoint that is able to stream responses. In the next sections, you will understand how you can stream Completion and Chat Completion API responses from OpenAI with LangChain callbacks.

Streaming OpenAI Completion API responses with LangChain Callbacks

The OpenAI Completion API provides a powerful tool for generating text based on given prompt(s), useful in various real-life scenarios such as content generation and language translation. Integrating LangChain callbacks enables real-time streaming of responses, increasing perceived responsiveness in applications.

Create a file named completionModel.tsx in app/lib directory, defining a function which initializes an OpenAI instance with streaming enabled and LangChain callbacks, generating text in real-time.

// File: app/lib/completionModel.tsx
 
import { OpenAI } from "@langchain/openai";
 
export const completionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new OpenAI({
    streaming: true,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(`data: ${token}\n\n`));
        },
        async handleLLMEnd(output) {
          controller.close();
        },
      },
    ],
  });

The code above does the following:

  • Imports the OpenAI class from the @langchain/openai package.
  • Exports a function named completionModel which takes two parameters:
    • controller: A ReadableStreamDefaultController object which allows you to enqueue messages into the stream.
    • encoder: A TextEncoder object that encodes the messages into Uint8Array object.
  • Creates a new instance of the OpenAI class with the following configuration options:
    • streaming: Set to true, indicating that the API should return streaming responses.
    • callbacks: An array containing a single object with two callback functions, used to intercept response creation by LLM at each stage.
  • handleLLMNewToken: A callback invoked when a new token is generated by the language model. It enqueues the encoded token to the controller.
  • handleLLMEnd: An asynchronous callback invoked when the language model generation is complete. It closes the controller.

Further, create a file named route.ts with the following code in the app/api/stream/completion directory. The changes from the initial streaming route handler that you created have been higlighted below:

// File: app/api/stream/completion/route.ts
 
// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";
 
+ import { completionModel } from "@/app/lib/completionModel";
 
export async function POST(request: Request) {
+  // Obtain the user message from request's body
+  const { message } = await request.json();
  const encoder = new TextEncoder();
  // Create a streaming response
  const customReadable = new ReadableStream({
    async start(controller) {
+      // Generate a streaming response from OpenAI with LangChain
+      await completionModel(controller, encoder).invoke(message);
    },
  });
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  });
}

Apart from what you did earlier to create a streaming endpoint in Next.js, the code additions above do the following:

  • Imports the completionModel function.
  • Extracts user message from the request body using request.json().
  • Inside the start method of the ReadableStream, the completionModel function is invoked asynchronously with the message obtained from the request. This function generates a streaming response from OpenAI with LangChain.

To cache user queries and their responses with Upstash, import the Upstash Redis cache client into the completionModel function as follows:

import { OpenAI } from "@langchain/openai";
+ import cache from '@/app/lib/upstashCache';
 
export const completionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new OpenAI({
+   cache,
    streaming: true,
    temperature: 0.9,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(token));
        },
        async handleLLMEnd(output) {
          console.log(output.generations[0][0].text);
          controller.close();
        },
      },
    ],
  });

With minimal changes, you have enabled caching of the OpenAI Completion API responses with Upstash.

Now, let's move on to learning how OpenAI Chat Completion API responses can be streamed in Next.js endpoints.

Streaming OpenAI Chat Completion API responses with LangChain Callbacks

The OpenAI Chat Completion API provides a powerful tool for generating real-time conversational responses based on provided messages. The applications span from building chatbots capable of engaging in natural dialogues to enhancing customer support systems with AI-driven interaction capabilities. It enables developers to create dynamic and responsive conversational interfaces.

Create a file named chatCompletionModel.tsx in app/lib directory, defining a chatCompletionModel function which creates a streaming interface for OpenAI's Chat Completion API with LangChain callbacks, facilitating real-time conversation generation based on provided messages.

// File: app/lib/chatCompletionModel.tsx
 
import { ChatOpenAI } from "@langchain/openai";
 
export type ConversationMessage = {
  role: string;
  content: string;
};
 
export type ConversationMessages = ConversationMessage[];
 
export const chatCompletionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new ChatOpenAI({
    streaming: true,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(`data: ${token}\n\n`));
        },
        async handleLLMEnd(output) {
          controller.close();
        },
      },
    ],
  });

The code above does the following:

  • Imports the ChatOpenAI class from the @langchain/openai package.
  • Creates the following type definitions:
    • ConversationMessage: Defines a type for a single message in a conversation. It contains two properties: role (indicating the role of the sender) and content (the message content).
    • ConversationMessages: Defines a type for an array of conversation messages.
  • Exports a function named chatCompletionModel which takes two parameters:
    • controller: A ReadableStreamDefaultController object which allows you to enqueue messages into the stream.
    • encoder: A TextEncoder object that encodes the messages into Uint8Array object.
  • Creates a new instance of the ChatOpenAI class with the following configuration options:
    • streaming: Set to true, indicating that the API should return streaming responses.
    • callbacks: An array containing a single object with two callback functions, used to intercept response creation by LLM at each stage.
  • handleLLMNewToken: A callback invoked when a new token is generated by the language model. It enqueues the encoded token to the controller.
  • handleLLMEnd: An asynchronous callback invoked when the language model generation is complete. It closes the controller.

Further, create a file named route.ts with the following code in the app/api/stream/completion/chat directory. The changes from the initial streaming route handler that you created have been higlighted below:

// File: app/api/stream/completion/chat/route.ts
 
// Prevents this route's response from being cached on Vercel
export const dynamic = "force-dynamic";
 
+ import {
+   type ConversationMessage,
+   chatCompletionModel,
+ } from "@/app/lib/chatCompletionModel";
 
export async function POST(request: Request) {
+   // Obtain the conversation messages from request's body
+   const { messages = [] } = await request.json();
  const encoder = new TextEncoder();
  // Create a streaming response
  const customReadable = new ReadableStream({
    async start(controller) {
+       // Generate a streaming response from OpenAI with LangChain
+       await chatCompletionModel(controller, encoder).invoke(
+         messages.map((i: ConversationMessage) => [i["role"], i["content"]])
+       );
    },
  });
  // Return the stream response and keep the connection alive
  return new Response(customReadable, {
    // Set the headers for Server-Sent Events (SSE)
    headers: {
      Connection: "keep-alive",
      "Content-Encoding": "none",
      "Cache-Control": "no-cache, no-transform",
      "Content-Type": "text/event-stream; charset=utf-8",
    },
  });
}

Apart from what you did earlier to create a streaming endpoint in Next.js, the code additions above do the following:

  • Imports the chatCompletionModel function and ConversationMessage type definition.
  • Extracts the messages in conversation from the request body using request.json().
  • Inside the start method of the ReadableStream, the chatCompletionModel function is invoked asynchronously after transformed messages array. Each message is transformed into an array containing the role and content of the message. This function generates a streaming response from OpenAI with LangChain.

To cache the chat history with Upstash, import the Upstash Redis cache client into the chatCompletionModel function as follows:

+ import cache from '@/app/lib/upstashCache';
import { ChatOpenAI } from "@langchain/openai";
 
export type ConversationMessage = {
  role: string;
  content: string;
};
 
export type ConversationMessages = ConversationMessage[];
 
export const chatCompletionModel = (
  controller: ReadableStreamDefaultController,
  encoder: TextEncoder
) =>
  new ChatOpenAI({
+   cache,
    streaming: true,
    temperature: 0.9,
    callbacks: [
      {
        handleLLMNewToken(token) {
          controller.enqueue(encoder.encode(token));
        },
        async handleLLMEnd(output) {
          console.log(output.generations[0][0].text);
          controller.close();
        },
      },
    ],
  });

With minimal changes, you have enabled caching of the entire chat history with Upstash.

Now, let's move on to creating the client side React components to consume the streaming endpoints.

Setup Next.js frontend to listen to Server-Sent Events

In this section, you will learn how to setup a minimal listener to Server-Sent Events API messages, and an approach to manage the state of messages exchanged between user and AI.

Listening to Server-Sent Events API in React

To listen to SSE API in our client-side component in React, you are going to initiate a POST request to the specified API route. Upon receiving a response, you would decode the incoming data as a string using TextDecoderStream method, and continuously read the data from stream.

"use client";
 
export default function () {
 
  const obtainAPIResponse = async (apiRoute: string, apiData: any) => {
    // Initiate the first call to connect to SSE API
    const apiResponse = await fetch(apiRoute, {
      method: "POST",
      headers: {
        "Content-Type": "text/event-stream",
      },
      body: JSON.stringify(apiData),
    });
 
    if (!apiResponse.body) return;
 
    // To decode incoming data as a string
    const reader = apiResponse.body
      .pipeThrough(new TextDecoderStream())
      .getReader();
 
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        break;
      }
      if (value) {
        // Do something
      }
    }
  };
 
  return <></>
}

Managing state of the conversation in React

To manage the state of the conversation between the user and chatbot, you can use state variables. You would update the state variables messages and latestMessage to store the conversation history and the latest message, respectively. By updating these state variables within the loop that processes incoming data from the SSE API, you can ensure real-time updates of the conversation display. The code additions to achieve the same are as follows:

"use client";
 
+ import { useEffect, useState } from "react";
+ import { ConversationMessages } from "@/app/lib/chatCompletionModel";
 
export default function () {
+  const [latestMessage, setLatestMessage] = useState<string>("");
+  const [messages, setMessages] = useState<ConversationMessages>([]);
 
  const obtainAPIResponse = async (apiRoute: string, apiData: any) => {
    // Initiate the first call to connect to SSE API
    const apiResponse = await fetch(apiRoute, {
      method: "POST",
      headers: {
        "Content-Type": "text/event-stream",
      },
      body: JSON.stringify(apiData),
    });
 
    if (!apiResponse.body) return;
 
    // To decode incoming data as a string
    const reader = apiResponse.body
      .pipeThrough(new TextDecoderStream())
      .getReader();
 
+    let incomingMessage = "";
 
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
+        // Insert the response received into the messages state
+        setMessages((messages) => [
+          ...messages,
+          { role: "assistant", content: incomingMessage },
+        ]);
+        // Reset the latest message's state received 
+        setLatestMessage("");
        break;
      }
      if (value) {
+        // Append the incoming data to latest message's value
+        incomingMessage += value;
+        setLatestMessage(incomingMessage);
      }
    }
  };
 
  return <></>
}

The code additions above do the following:

  • A state variable is declared to store the incoming data stream received from the SSE API.
  • Upon receiving a complete message (indicated by done being true), the latest received message is added to the messages state array. This message is formatted with a role of assistant to distinguish it from user messages. Also, the latest message state is reset to an empty string to prepare for the next incoming message.
  • As data is received incrementally (not yet forming a complete message), it is appended to the incomingMessage variable.
  • The latest message state is updated with the concatenated incoming data, ensuring real-time updates as new data arrives.

Great! With the reactive latestMessage and messages variables, you can now create dynamic user interfaces representing the messages exchanged in the conversation and the latest AI generated response.

That was a lot of learning! You’re all done now ✨

Deploy to Vercel

The repository is now ready to deploy to Vercel. Use the following steps to deploy:

  • Start by creating a GitHub repository containing your app's code.
  • Then, navigate to the Vercel Dashboard and create a New Project.
  • Link the new project to the GitHub repository you have just created.
  • In Settings, update the Environment Variables to match those in your local .env file.
  • Deploy! 🚀

More Information

For more detailed insights, explore the references cited in this post.