Skip to main content
Some workflows require human approval or input before proceeding. When combined with Upstash Realtime, you can create interactive workflows that pause for user input and provide real-time feedback to your frontend during the entire process. This guide shows you how to implement a human-in-the-loop workflow pattern with real-time updates using Upstash Workflow and Upstash Realtime.

How It Works

In a human-in-the-loop workflow:
  1. The workflow executes initial steps and emits progress events
  2. The workflow pauses at a specific point using context.waitForEvent()
  3. A “waiting for input” event is emitted to notify the frontend
  4. The user makes a decision in the frontend (approve/reject)
  5. The frontend calls an API to notify the workflow using client.notify()
  6. The workflow resumes with the user’s decision
  7. An “input resolved” event is emitted so the frontend can update its UI
  8. The workflow continues and completes based on the decision

Prerequisites

  • An Upstash account with:
    • A QStash project for workflows
    • A Redis database for Realtime
  • Next.js application set up
  • Completed the basic real-time workflow setup

Event Types

For human-in-the-loop workflows, extend your schema in lib/realtime.ts with these additional event types:
const schema = {
  workflow: {
    runFinish: z.object({}),
    stepFinish: z.object({
      stepName: z.string(),
      result: z.unknown().optional(),
    }),
    waitingForInput: z.object({
      eventId: z.string(),
      message: z.string(),
    }),
    inputResolved: z.object({
      eventId: z.string(),
    }),
  },
};
The new event types are:
  • waitingForInput: Emitted when the workflow pauses and needs user input
  • inputResolved: Emitted when the user provides input, so the frontend knows to clear the waiting state

Create the Realtime Middleware

Create a custom middleware that will emit events to Realtime at lib/middleware.ts:
lib/middleware.ts
import { WorkflowMiddleware } from "@upstash/workflow";
import { realtime } from "./realtime";

export const realtimeMiddleware = new WorkflowMiddleware({
  name: "realtime-events",
  callbacks: {
    beforeExecution: async ({ context, stepName }) => {
      const channel = realtime.channel(context.workflowRunId);

      // Detect wait-for-event steps and emit waitingForInput
      if (stepName === "wait-for-approval") {
        await channel.emit("workflow.waitingForInput", {
          eventId: `approval-${context.workflowRunId}`,
          message: `Waiting for approval`,
        });
      }
    },
    afterExecution: async ({ context, stepName, result }) => {
      const channel = realtime.channel(context.workflowRunId);

      // Emit inputResolved after wait-for-event steps complete
      if (stepName === "wait-for-approval") {
        await channel.emit("workflow.inputResolved", {
          eventId: `approval-${context.workflowRunId}`,
        });
      }

      // Emit stepFinish for all steps
      await channel.emit("workflow.stepFinish", {
        stepName,
        result,
      });
    },
    runCompleted: async ({ context }) => {
      const channel = realtime.channel(context.workflowRunId);
      await channel.emit("workflow.runFinish", {});
    },
  },
});
Key points:
  • The middleware handles all realtime event emissions automatically
  • beforeExecution: Detects wait-for-event steps by checking the stepName and emits workflow.waitingForInput
  • afterExecution: Emits workflow.inputResolved for wait steps and workflow.stepFinish for all steps
  • runCompleted: Emits workflow.runFinish when the workflow finishes
  • All emission logic is centralized in the middleware, keeping workflow code clean

Building the Workflow

1. Create the Workflow Endpoint

Create your workflow at app/api/workflow/human-in-loop/route.ts:
app/api/workflow/human-in-loop/route.ts
import { serve } from "@upstash/workflow/nextjs";
import { realtimeMiddleware } from "@/lib/middleware";

type WorkflowPayload = {
  userId: string;
  action: string;
};

export const { POST } = serve<WorkflowPayload>(
  async (context) => {
    const { userId, action } = context.requestPayload;

    // Step 1: Initial Processing
    await context.run("initial-processing", async () => {
      // Your processing logic
      return {
        preprocessed: true,
        userId,
        action,
        requiresApproval: true,
      };
    });

    // Step 2: Wait for Human Approval
    const eventId = `approval-${context.workflowRunId}`;

    const { eventData, timeout } = await context.waitForEvent<{
      approved: boolean;
    }>("wait-for-approval", eventId, { timeout: "5m" });

    // Handle timeout
    if (timeout) {
      return { success: false, reason: "timeout" };
    }

    const status = eventData.approved ? "approved" : "rejected";

    // Step 3: Process based on approval
    await context.run(`process-${status}`, async () => {
      return {
        status,
        processedAt: Date.now(),
        action,
        userId,
      };
    });

    // Step 4: Finalize (only if approved)
    if (eventData.approved) {
      // Additional steps...
    }

    return {
      success: true,
      approved: eventData.approved,
      workflowRunId: context.workflowRunId,
    };
  },
  {
    middlewares: [realtimeMiddleware],
  }
);
Key patterns:
  1. Middleware for all events: The realtimeMiddleware automatically handles all realtime event emissions by detecting wait-for-event steps through the stepName
  2. Step name detection: The middleware checks if stepName === "wait-for-approval" to know when to emit waitingForInput and inputResolved events
  3. Unique event IDs: Use a unique eventId (like approval-${workflowRunId}) to identify which approval request this is
  4. Timeout handling: Always handle the timeout case when waiting for events

2. Create the Notify Endpoint

Create an endpoint at app/api/notify/route.ts to handle user input:
import { Client } from "@upstash/workflow";
import { NextRequest, NextResponse } from "next/server";

const workflowClient = new Client({
  baseUrl: process.env.QSTASH_URL!,
  token: process.env.QSTASH_TOKEN!,
});

export async function POST(request: NextRequest) {
  const body = await request.json();
  const { eventId, eventData } = body;

  if (!eventId) {
    return NextResponse.json(
      { success: false, error: "eventId is required" },
      { status: 400 }
    );
  }

  // Notify the workflow
  await workflowClient.notify({
    eventId,
    eventData,
  });

  return NextResponse.json({ success: true });
}

Building the Frontend

1. Extend the Custom Hook

Extend your hook from the basic example to handle waiting states:
"use client";

import { useRealtime } from "@/lib/realtime-client";
import { useState, useCallback } from "react";

interface WorkflowStep {
  stepName: string;
  result?: unknown;
}

interface WaitingState {
  eventId: string;
  message: string;
}

export function useWorkflowWithRealtime() {
  const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
  const [steps, setSteps] = useState<WorkflowStep[]>([]);
  const [waitingState, setWaitingState] = useState<WaitingState | null>(null);
  const [isTriggering, setIsTriggering] = useState(false);
  const [isRunFinished, setIsRunFinished] = useState(false);

  useRealtime({
    enabled: !!workflowRunId,
    channels: workflowRunId ? [workflowRunId] : [],
    events: [
      "workflow.stepFinish",
      "workflow.runFinish",
      "workflow.waitingForInput",
      "workflow.inputResolved",
    ],
    onData({ event, data }) {
      if (event === "workflow.stepFinish") {
        setSteps((prev) => [
          ...prev,
          {
            stepName: data.stepName,
            result: data.result,
          },
        ]);
      } else if (event === "workflow.runFinish") {
        setIsRunFinished(true);
      } else if (event === "workflow.inputResolved") {
        // Clear waiting state if it matches
        setWaitingState((prev) =>
          prev?.eventId === data.eventId ? null : prev
        );
      } else if (event === "workflow.waitingForInput") {
        setWaitingState({
          eventId: data.eventId,
          message: data.message,
        });
      }
    },
  });

  const trigger = useCallback(async () => {
    setIsTriggering(true);
    setSteps([]);
    setWaitingState(null);
    setIsRunFinished(false);

    const response = await fetch("/api/trigger", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ workflowType: "human-in-loop" }),
    });

    const data = await response.json();
    setWorkflowRunId(data.workflowRunId);
    setIsTriggering(false);
  }, []);

  const continueWorkflow = useCallback(
    async (data: { approved: boolean }) => {
      if (!waitingState) {
        throw new Error("No workflow waiting for input");
      }

      const response = await fetch("/api/notify", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          eventId: waitingState.eventId,
          eventData: data,
        }),
      });

      if (!response.ok) {
        throw new Error("Failed to notify workflow");
      }

      // The waiting state will be cleared when we receive inputResolved event
    },
    [waitingState]
  );

  return {
    trigger,
    continueWorkflow,
    isTriggering,
    workflowRunId,
    steps,
    waitingState,
    isRunFinished,
  };
}
Key additions:
  • waitingState: Tracks when the workflow is waiting for input
  • continueWorkflow: Function to submit user decisions back to the workflow
  • Multiple events subscription: Uses events array to subscribe to multiple event types
  • Input resolved handling: Clears the waiting state when the workflow receives the user’s input

2. Use the Hook with Approval UI

"use client";

import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime";

export default function WorkflowPage() {
  const {
    trigger,
    isTriggering,
    steps,
    isRunFinished,
    waitingState,
    continueWorkflow,
  } = useWorkflowWithRealtime();

  return (
    <div
      style={{
        maxWidth: "600px",
        margin: "40px auto",
        fontFamily: "Arial, sans-serif",
      }}
    >
      <button onClick={trigger} disabled={isTriggering}>
        {isTriggering ? "Starting..." : "Click to Trigger Workflow"}
      </button>

      {isRunFinished && (
        <h3 style={{ marginTop: "20px" }}>Workflow Finished!</h3>
      )}

      {/* Show workflow steps */}
      <h3 style={{ marginTop: "20px" }}>Workflow Steps:</h3>
      <div>
        {steps.map((step, index) => (
          <div key={index}>
            <strong>{step.stepName}</strong>
            {Boolean(step.result) && (
              <span>: {JSON.stringify(step.result)}</span>
            )}
          </div>
        ))}
      </div>

      {/* Show approval UI when waiting for input */}
      {waitingState && (
        <div style={{ marginTop: "20px" }} className="approval-prompt">
          <p>{waitingState.message}</p>
          <p>
            <button onClick={() => continueWorkflow({ approved: true })}>
              Click to Approve
            </button>
          </p>
          <p>
            <button onClick={() => continueWorkflow({ approved: false })}>
              Click to Reject
            </button>
          </p>
        </div>
      )}
    </div>
  );
}

How the Pattern Works

Timeline of Events

  1. Initial processing: stepFinish event → Frontend shows completed step
  2. Waiting for approval: waitingForInput event → Frontend shows approval buttons
  3. User clicks approve/reject: Frontend calls /api/notify
  4. Workflow resumes: inputResolved event → Frontend hides approval buttons
  5. Processing continues: More stepFinish events as workflow continues
  6. Workflow completes: runFinish event → Frontend shows “Workflow Finished!”

Benefits

  • Real-time feedback: Users see exactly when their approval is needed
  • No polling: Instant updates via Server-Sent Events
  • Timeout handling: Workflows don’t hang indefinitely waiting for input

Full Example

For a complete working example with all steps, error handling, and full UI components, check out the Upstash Realtime example on GitHub.

Next Steps