·12 min read

Durable RAG Document Ingestion with Upstash Workflow and Pinecone

Mehmet TokgözMehmet TokgözSoftware Engineer @Upstash
https://upstash.com/blog/rag-document-ingestion-workflow-pinecone

In this blog post, I will guide you through building a durable document ingestion pipeline for a Retrieval-Augmented Generation (RAG) application, using Upstash Workflow, Pinecone, and OpenAI embeddings on Next.js.

When a user uploads a document into a RAG system, it has to be turned into searchable vectors before it can be queried. The ingestion pipeline downloads the source, splits it into chunks, generates an embedding for each chunk, and upserts the resulting vectors into a vector database. This sounds simple, but it is one of the most failure-prone parts of a RAG application. Embedding APIs are slow and rate-limited, and a 200-page document can easily produce hundreds of chunks.

If you implement all of this in a single HTTP request, you will hit serverless timeouts, and any failure forces you to start over and pay for all the embeddings again. By the end of this post, we will have a pipeline that survives crashes and rate limits, and never repeats work that already succeeded.

Project Description

We will build the pipeline out of two workflows:

  • An ingestion workflow that orchestrates the run: it downloads the source, extracts and chunks the text, and fans out the embedding work.
  • An embed-and-upsert workflow that handles a single chunk: it embeds the chunk and writes the resulting vector to Pinecone. The ingestion workflow invokes one of these per chunk, in parallel, using context.invoke.

Splitting the work this way keeps each workflow small, and gives every chunk its own run in the Upstash Console. When chunk 181 of 300 fails, you can open that run and see what happened, and the other 299 are unaffected.

Here is the whole flow at a glance:

Why use a workflow engine here? Because the pipeline maps directly onto what durable execution gives us:

  • Every step result is persisted. If the process dies after embedding 180 of 300 chunks, the run resumes from chunk 181 instead of starting over.
  • context.call executes HTTP requests from QStash on your behalf, so a slow download or embedding call doesn't consume your serverless function's execution time.
  • Retries with exponential backoff absorb transient 429s and 5xxs from the embedding API.
  • Flow control caps how fast and how concurrently the fan-out hits the embedding provider.
  • A deterministic workflow run ID deduplicates concurrent re-uploads of the same document.

Setting up Pinecone

First, create a Pinecone account and an API key in the Pinecone console. Then create a serverless index whose dimension matches the embedding model. text-embedding-3-small produces 1536-dimensional vectors:

// scripts/create-index.ts
import { Pinecone } from "@pinecone-database/pinecone";
 
const pc = new Pinecone(); // reads PINECONE_API_KEY
 
const indexModel = await pc.createIndex({
  name: "rag-documents",
  dimension: 1536,
  metric: "cosine",
  spec: { serverless: { cloud: "aws", region: "us-east-1" } },
});

One Pinecone feature we will lean on is namespaces. Each tenant's vectors live in their own namespace, so queries at chat time only search that tenant's data, and deleting a tenant is a single deleteAll() call.

Setting up the Project

Create a Next.js application and install the dependencies:

npx create-next-app@latest rag-ingestion
cd rag-ingestion
npm install @upstash/workflow @pinecone-database/pinecone

Then grab your QStash token from the Upstash Console and configure the environment:

# .env.local
QSTASH_TOKEN=...
PINECONE_API_KEY=...
PINECONE_INDEX_HOST=...   # printed by the create-index script
OPENAI_API_KEY=...
APP_URL=https://your-app.vercel.app

Writing the Workflow Endpoint

Let's start with the workflow itself. An Upstash workflow is just an HTTP endpoint in your app. You describe the pipeline as a series of steps on a context object, and the SDK turns it into a durable, resumable execution. Three context methods matter for us:

  • context.run(name, fn) executes a piece of your own code as a durable step. The result is persisted, so when a workflow is retried, completed steps return their stored result instead of running again.
  • context.call(name, options) performs an HTTP request from QStash rather than from your function. Your function isn't even running while the request is in flight, so a slow call costs you no execution time, and responses can take up to 12 hours.
  • context.invoke(name, options) starts another workflow and waits for its result. We will use it to fan out the embedding work.

Because the ingestion workflow invokes the embed-and-upsert workflow, both have to live under the same route, exposed with serveMany on a catch-all path. Each workflow is defined with createWorkflow() which returns a workflow object that can be passed to context.invoke. The file looks like this:

// app/api/workflow/[[...route]]/route.ts
import { createWorkflow, serveMany } from "@upstash/workflow/nextjs";
import type { WorkflowContext } from "@upstash/workflow";
import { Pinecone } from "@pinecone-database/pinecone";
import { extractText, chunkText } from "@/lib/text";
 
const pinecone = new Pinecone({ apiKey: process.env.PINECONE_API_KEY! });
const index = pinecone.index({ host: process.env.PINECONE_INDEX_HOST! });
 
const ingest = createWorkflow(/* ... the orchestrator, below ... */);
const embedAndUpsert = createWorkflow(/* ... the worker, below ... */);
 
export const { POST } = serveMany({
  "ingest": ingest,
  "embed-and-upsert": embedAndUpsert,
});

With this in place, the ingestion workflow is reachable at /api/workflow/ingest. That is the URL the trigger endpoint will target at the end of the post. Let's fill in the two workflows.

The Ingestion Workflow

The orchestrator receives its input, { documentId, sourceUrl, namespace }, as context.requestPayload and walks through the pipeline in four steps. Here it is in full:

type IngestPayload = {
  documentId: string;
  sourceUrl: string;
  namespace: string;
};
 
const ingest = createWorkflow(
  async (context: WorkflowContext<IngestPayload>) => {
    const { documentId, sourceUrl, namespace } = context.requestPayload;
 
    // 👇 Step 1: download the source file
    const { status: downloadStatus, body: rawSource } = await context.call(
      "download-source",
      { url: sourceUrl, method: "GET", retries: 3 }
    );
 
    if (downloadStatus < 200 || downloadStatus >= 300) {
      throw new Error(`Download failed with status ${downloadStatus}`);
    }
 
    // 👇 Step 2: extract the text and split it into chunks
    const chunks = await context.run("extract-and-chunk", async () => {
      const text = await extractText(rawSource, sourceUrl);
      chunkText(text, { tokens: 800, overlap: 100 })
      if (!text.trim()) throw new Error("Document is empty or unparseable");
      return ;
    });
 
    // 👇 Step 3: fan out one embed-and-upsert run per chunk
    const results = await Promise.all(
      chunks.map((chunk, chunkIndex) =>
        context.invoke(`embed-chunk-${chunkIndex}`, {
          workflow: embedAndUpsert,
          body: { documentId, namespace, sourceUrl, chunk, chunkIndex },
        })
      )
    );
 
    // 👇 Step 4: fan in and check the results
    let vectorsUpserted = 0;
    results.forEach((result, chunkIndex) => {
      if (result.isFailed) {
        throw new Error(`Chunk ${chunkIndex} failed to embed`);
      }
      vectorsUpserted += result.body.upsertedCount;
    });
 
    return { chunks: chunks.length, vectorsUpserted };
  },
  {
    retries: 3,
    failureFunction: async ({ context, failStatus, failResponse }) => {
      const { documentId } = context.requestPayload;
      console.error(
        `Ingestion of ${documentId} failed: ${failStatus} ${failResponse}`
      );
    },
  }
);

Step by step:

  1. The download goes through context.call because pulling a large PDF inside a serverless function is a timeout waiting to happen. QStash performs the request on our behalf, however long it takes. Note that context.call doesn't throw on a non-2xx response; it returns it and lets you decide what is fatal. A 404 that survived three retries means the source is gone, so we throw.
  2. Parsing and chunking is plain application code, so it goes into a context.run step. We split into chunks of roughly 800 tokens with 100 tokens of overlap, a common starting point for RAG. The result is memoized, which means the document is parsed once no matter how many times later steps retry.
  3. For each chunk we invoke the embed-and-upsert workflow with context.invoke. Awaiting all the invocations with Promise.all runs them as parallel child workflows, and the parent suspends until every child has finished. There is no job queue and no polling loop, and if the parent run is retried, completed invocations return their memoized results instantly, so nothing is embedded twice.
  4. context.invoke returns the child's response along with isFailed and isCanceled flags, so the fan-in is just summing the counts and failing the run if any chunk could not be embedded.

The failureFunction at the bottom handles documents that can never be ingested, like an empty PDF or a source URL that no longer exists. It runs once the workflow exhausts its retries, and it is the place to record the failure or notify the user that their document couldn't be processed.

The end of the handler is also where you would extend the pipeline for your own application. A context.run step that marks the document as ready in your database, or one that sends a realtime notification so the UI can switch from a spinner to "ready to chat", is just another durable step.

The Embed-and-Upsert Workflow

The worker has one job: embed a single chunk and write the vector to Pinecone. It receives the payload passed by the parent's context.invoke the same way every workflow receives its input, via context.requestPayload.

type EmbedChunkPayload = {
  documentId: string;
  namespace: string;
  sourceUrl: string;
  chunk: string;
  chunkIndex: number;
};
 
type OpenAIEmbeddingResponse = {
  data: { embedding: number[] }[];
};
 
const embedAndUpsert = createWorkflow(
  async (context: WorkflowContext<EmbedChunkPayload>) => {
    const { documentId, namespace, sourceUrl, chunk, chunkIndex } =
      context.requestPayload;
 
    // 👇 Step 1: embed the chunk
    const response = await context.call<OpenAIEmbeddingResponse>("embed", {
      url: "https://api.openai.com/v1/embeddings",
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
      },
      body: {
        model: "text-embedding-3-small",
        input: chunk,
      },
      retries: 5,
      timeout: 120,
      flowControl: { key: "openai-embeddings", rate: 10, parallelism: 5 },
    });
 
    if (response.status < 200 || response.status >= 300) {
      throw new Error(`Embedding call failed: ${response.status}`);
    }
 
    // 👇 Step 2: upsert the vector into the tenant's namespace
    const upsertedCount = await context.run("upsert-vector", async () => {
      const result = await index.upsert({
        records: [
          {
            id: `${documentId}#${chunkIndex}`,
            values: response.body.data[0].embedding,
            metadata: { documentId, chunkIndex, text: chunk, source: sourceUrl },
          },
        ],
        namespace,
      });
      return result.upsertedCount;
    });
 
    return { upsertedCount };
  }
);

Step by step:

  1. The embedding is a context.call to the OpenAI embeddings API, so our function isn't billed while OpenAI is working. retries: 5 gives the call its own retry budget with exponential backoff, which absorbs transient 429s and 5xxs.
Note

flowControl caps requests at 10 per second with at most 5 in flight, and it is shared by key rather than by run: all chunks of all documents being ingested at the same time queue behind the same openai-embeddings limit. This is what lets us fan out hundreds of children without exceeding the provider's rate limit.

  1. The upsert writes the vector into the tenant's namespace from a context.run step. The vector ID is deterministic (documentId#chunkIndex) and Pinecone upserts are idempotent on ID, so a retried step overwrites the same record instead of creating a duplicate. We store the chunk text in metadata so the chat side of the application can show retrieved passages without a second lookup; Pinecone allows up to 40 KB of metadata per vector, and chunks of ~800 tokens fit comfortably.

The return value at the end is what the parent receives as result.body from context.invoke, fully typed: createWorkflow carries both the payload and response types through to the invoke call.

The Trigger Endpoint

Both workflows are written and exposed. The last piece is starting a run. A workflow is started by sending an HTTP request to its URL, and the cleanest way to do that from your own backend is the Workflow client. You call client.trigger() with the workflow's URL and a payload, QStash durably enqueues the run, and your endpoint returns immediately.

So the trigger endpoint is a regular Next.js API route, the same route your frontend already calls when a user uploads a document:

// app/api/documents/ingest/route.ts
import { Client } from "@upstash/workflow";
import { NextResponse } from "next/server";
 
const client = new Client({ token: process.env.QSTASH_TOKEN! });
 
export async function POST(request: Request) {
  const { documentId, sourceUrl, namespace } = await request.json();
 
  const { workflowRunId } = await client.trigger({
    url: `${process.env.APP_URL}/api/workflow/ingest`,
    body: { documentId, sourceUrl, namespace },
    workflowRunId: `ingest-${documentId}`,
    retries: 3,
  });
 
  return NextResponse.json({ workflowRunId, status: "processing" });
}

Testing It Locally

You don't need to deploy to try this out. Upstash Workflow SDK comes with a built-in local development server.

QSTASH_DEV=true

Run npm run dev, and POST a document:

curl -X POST http://localhost:3000/api/documents/ingest \
  -H "Content-Type: application/json" \
  -d '{
    "documentId": "doc_123",
    "sourceUrl": "https://example.com/whitepaper.pdf",
    "namespace": "tenant_42"
  }'

You can watch runs in the Workflow tab of the Upstash Console: the ingestion run with its steps, plus one child run per chunk, each with its own retries and memoized results.

Final Words and Project Improvements

We built a document ingestion pipeline that keeps working through timeouts, rate limits, crashes, and re-uploads, and never pays for the same embedding twice. Splitting it into an orchestrator and a per-chunk worker with context.invoke kept both workflows short and made every chunk independently observable. Here are some ideas to take the project further:

  • Show real progress in the UI by having the embed-and-upsert workflow bump a counter in your database before returning.
  • Batch chunks for very large documents. One child per chunk maximizes isolation, but a 5,000-chunk document means 5,000 runs. The OpenAI API accepts an array of inputs, so the same worker can take a batch of chunks per invocation and upsert them in one Pinecone request.

Thanks for reading. If you have any questions or feedback, you can reach us on Discord or Twitter.