Skip to main content
When you trigger a workflow, your workflow starts executing as a background job. Traditionally, the only way to check workflow status is by repeatedly calling client.logs to fetch the workflow state. However, this approach is slow and expensive. A better solution is to use Upstash Realtime, which enables you to emit events from your workflow and subscribe to them in real-time on your frontend.

How It Works

Upstash Realtime is powered by Upstash Redis and provides a simple API for publishing and subscribing to events:
  • When you emit an event, it’s instantly delivered to live subscribers and stored for later retrieval
  • Your frontend can subscribe to these events in real-time
  • You can also fetch events emitted in the past
This guide shows you how to integrate Upstash Workflow with Upstash Realtime to display real-time progress updates in your frontend.

Prerequisites

  • An Upstash account with:
    • A QStash project for workflows
    • A Redis database for Realtime
  • Next.js application set up

Setup

1. Install Dependencies

npm install @upstash/workflow @upstash/realtime @upstash/redis zod

2. Configure Upstash Realtime

Create a Realtime instance in lib/realtime.ts:
import { InferRealtimeEvents, Realtime } from "@upstash/realtime";
import { Redis } from "@upstash/redis";
import z from "zod/v4";

const redis = Redis.fromEnv();

const events = z.discriminatedUnion("type", [
  z.object({ type: z.literal("runStart"), workflowRunId: z.string(), timestamp: z.number() }),
  z.object({ type: z.literal("runFinish"), workflowRunId: z.string(), timestamp: z.number(), status: z.union([z.literal("success"), z.literal("failed")]), error: z.string().optional() }),
  z.object({ type: z.literal("stepStart"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string() }),
  z.object({ type: z.literal("stepFinish"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string(), result: z.unknown().optional() }),
  z.object({ type: z.literal("stepFail"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string(), error: z.string() }),
])

const schema = {
  workflow: {
    update: events
  }
}

export const realtime = new Realtime({ schema, redis })
export type RealtimeEvents = InferRealtimeEvents<typeof realtime>

3. Create a Realtime Endpoint

Create an API route at app/api/realtime/route.ts to handle Realtime connections:
import { handle } from "@upstash/realtime";
import { realtime } from "@/lib/realtime";

export const maxDuration = 300;

export const GET = handle({ realtime });
This endpoint enables Server-Sent Events (SSE) connections for real-time updates.

Building the Workflow

1. Create the Workflow Endpoint

Create your workflow at app/api/workflow/basic/route.ts:
import { serve } from "@upstash/workflow/nextjs";
import { realtime } from "@/lib/realtime";
import { WorkflowAbort } from "@upstash/workflow";

type WorkflowPayload = {
  userId: string;
  action: string;
};

export const { POST } = serve<WorkflowPayload>(
  async (context) => {
    const { userId, action } = context.requestPayload;
    const workflowRunId = context.workflowRunId;

    // Create a channel based on the workflow run ID
    const channel = realtime.channel(workflowRunId);

    // Emit run start event
    await context.run("start-workflow", () =>
      channel.emit("workflow.update", {
        type: "runStart",
        workflowRunId,
        timestamp: Date.now(),
      })
    );

    // Step 1: Data Validation
    try {
      await context.run("validate-data", async () => {
        // Emit step start
        await channel.emit("workflow.update", {
          type: "stepStart",
          workflowRunId,
          stepName: "validate-data",
          timestamp: Date.now(),
        });

        // Your validation logic
        if (!userId || !action) {
          throw new Error("Missing required fields");
        }

        const result = { valid: true, userId, action };

        // sleep 500 ms
        await new Promise((resolve) => setTimeout(resolve, 500));

        // Emit step completion
        await channel.emit("workflow.update", {
          type: "stepFinish",
          workflowRunId,
          stepName: "validate-data",
          timestamp: Date.now(),
          result,
        });

        return result;
      });
    } catch (error) {
      if (error instanceof WorkflowAbort) {
        throw error;
      }
      // Emit failure event
      await channel.emit("workflow.update", {
        type: "stepFail",
        workflowRunId,
        stepName: "validate-data",
        timestamp: Date.now(),
        error: error instanceof Error ? error.message : "Unknown error",
      });
      throw error;
    }

    // Additional steps follow the same pattern...

    // Emit run completion
    await channel.emit("workflow.update", {
      type: "runFinish",
      workflowRunId,
      timestamp: Date.now(),
      status: "success",
    });

    return { success: true, workflowRunId };
  },
  {
    // Handle workflow failures
    failureFunction: async ({ context }) => {
      const workflowRunId = context.workflowRunId;
      const channel = realtime.channel(workflowRunId);

      await channel.emit("workflow.update", {
        type: "runFinish",
        workflowRunId,
        timestamp: Date.now(),
        status: "failed",
        error: "Workflow execution failed",
      });
    },
  }
);
Key points:
  • Use realtime.channel(workflowRunId) to create a unique channel per workflow run
  • Emit events at each significant step (start, step start/finish/fail, completion)
  • Wrap each step in try-catch to handle and emit failures
  • Use failureFunction to emit failure events if the entire workflow fails

2. Create a Trigger Endpoint

Create an endpoint to trigger workflows at app/api/trigger/route.ts:
import { NextRequest, NextResponse } from "next/server";
import { Client } from "@upstash/workflow";

export const workflowClient = new Client({
  token: process.env.QSTASH_TOKEN,
  baseUrl: process.env.QSTASH_URL,
});

export async function POST(request: NextRequest) {
  try {
    const body = await request.json() as { workflowType: string };
    const workflowUrl = `${request.nextUrl.origin}/api/workflow/${body.workflowType}`;

    const { workflowRunId } = await workflowClient.trigger({
      url: workflowUrl,
      body: {
        userId: "user-123",
        action: "process-data",
      },
    });

    return NextResponse.json({ workflowRunId });
  } catch (error) {
    console.error("Error triggering workflow:", error);
    return NextResponse.json(
      { error: "Failed to trigger workflow" },
      { status: 500 }
    );
  }
}

Building the Frontend

1. Create a Custom Hook

Create a React hook to manage the Realtime subscription at hooks/useWorkflowWithRealtime.ts:
"use client";

import { useRealtime } from "@upstash/realtime/client";
import { useState, useCallback } from "react";
import type { RealtimeEvents } from "@/lib/realtime";

interface WorkflowStep {
  stepName: string;
  status: "running" | "completed" | "failed";
  timestamp: number;
  error?: string;
  result?: unknown;
}

export function useWorkflowWithRealtime() {
  const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
  const [steps, setSteps] = useState<WorkflowStep[]>([]);
  const [isTriggering, setIsTriggering] = useState(false);
  const [runStatus, setRunStatus] = useState<{
    status: "running" | "success" | "failed";
    error?: string;
  } | null>(null);

  // Subscribe to workflow updates
  useRealtime<RealtimeEvents>({
    enabled: !!workflowRunId,
    channels: workflowRunId ? [workflowRunId] : [],
    event: "workflow.update",
    history: true, // Fetch past events on connection
    onData(data) {
      if (data.type === "runStart") {
        setRunStatus({ status: "running" });
      } else if (data.type === "runFinish") {
        setRunStatus({
          status: data.status,
          error: data.error,
        });
      } else if (data.type === "stepStart") {
        setSteps((prev) => [
          ...prev,
          {
            stepName: data.stepName,
            status: "running",
            timestamp: data.timestamp,
          },
        ]);
      } else if (data.type === "stepFinish") {
        setSteps((prev) =>
          prev.map((step) =>
            step.stepName === data.stepName
              ? { ...step, status: "completed", result: data.result }
              : step
          )
        );
      } else if (data.type === "stepFail") {
        setSteps((prev) =>
          prev.map((step) =>
            step.stepName === data.stepName
              ? { ...step, status: "failed", error: data.error }
              : step
          )
        );
      }
    },
  });

  const trigger = useCallback(async () => {
    setIsTriggering(true);
    setSteps([]);
    setRunStatus(null);

    try {
      const response = await fetch("/api/trigger", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ workflowType: "basic" }),
      });

      const data = await response.json();
      setWorkflowRunId(data.workflowRunId);
    } catch (error) {
      console.error("Error triggering workflow:", error);
    } finally {
      setIsTriggering(false);
    }
  }, []);

  return {
    trigger,
    isTriggering,
    workflowRunId,
    steps,
    runStatus,
  };
}
Key features:
  • history: true fetches past events when connecting, so users can see what happened before they joined
  • The hook manages both triggering the workflow and subscribing to updates
  • Type-safe event handling with TypeScript

2. Use the Hook in Your Component

"use client";

import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime";

export default function WorkflowPage() {
  const { trigger, isTriggering, steps, runStatus } = useWorkflowWithRealtime();

  return (
    <div style={{ maxWidth: "600px", margin: "40px auto", fontFamily: "Arial, sans-serif" }}>
      <button onClick={trigger} disabled={isTriggering}>
        {isTriggering ? "Starting..." : "Click to Trigger Workflow"}
      </button>

      <h3 style={{ marginTop: "20px" }}>Run Status:</h3>

      {runStatus && (
        <div>{runStatus.status}</div>
      )}

      <h3 style={{ marginTop: "20px" }}>Workflow Steps:</h3>

      <div>
        {steps.map((step, index) => (
          <div key={index}>
            <strong>{step.stepName}</strong>: {step.status}
            {step.error && <span> - {step.error}</span>}
          </div>
        ))}
      </div>
    </div>
  );
}

How It All Works Together

  1. User triggers workflow: The frontend calls /api/trigger, which returns a workflowRunId
  2. Workflow executes: The workflow runs as a background job, emitting events at each step
  3. Frontend subscribes: Using the workflowRunId, the frontend subscribes to the Realtime channel
  4. Real-time updates: As the workflow emits events, they’re instantly delivered to the frontend via Server-Sent Events
  5. History on connect: When connecting, history: true fetches past events from Redis, so users see the full progress even if they join late

Benefits Over Polling

Polling (client.logs)Realtime
Slow (requires HTTP requests)Instant (Server-Sent Events)
Expensive (repeated API calls)Efficient (single connection)
High latency (poll interval)Low latency (real-time)

Full Example

For a complete working example with all steps, error handling, and UI components, check out the Upstash Realtime example on GitHub.

Next Steps