> ## Documentation Index
> Fetch the complete documentation index at: https://upstash.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# AI Generation

## Introduction

This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI's GPT-4 model, aggregates the results and generates a report.

## Use Case

Our workflow will:

1. Receive a request to process a dataset
2. Download the dataset from a remote source
3. Process the data in chunks using OpenAI
4. Aggregate results
5. Generate and send a final report

## Code Example

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  import { serve } from "@upstash/workflow/nextjs"
  import {
    downloadData,
    aggregateResults,
    generateReport,
    sendReport,
    getDatasetUrl,
    splitIntoChunks,
  } from "./utils"

  type OpenAiResponse = {
    choices: {
      message: {
        role: string,
        content: string
      }
    }[]
  }

  export const { POST } = serve<{ datasetId: string; userId: string }>(
    async (context) => {
      const request = context.requestPayload

      // Step 1: Download the dataset
      const datasetUrl = await context.run("get-dataset-url", async () => {
        return await getDatasetUrl(request.datasetId)
      })

      // HTTP request with much longer timeout (2hrs)
      const { body: dataset } = await context.call("download-dataset", {
        url: datasetUrl,
        method: "GET"
      })
        

      // Step 2: Process data in chunks using OpenAI
      const chunkSize = 1000
      const chunks = splitIntoChunks(dataset, chunkSize)
      const processedChunks: string[] = []

      for (let i = 0; i < chunks.length; i++) {
        const { body: processedChunk } = await context.api.openai.call(
          `process-chunk-${i}`,
          {
            token: process.env.OPENAI_API_KEY,
            operation: "chat.completions.create",
            body: {
              model: "gpt-4",
              messages: [
                {
                  role: "system",
                  content:
                    "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
                },
                {
                  role: "user",
                  content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
                },
              ],
              max_completion_tokens: 150,
            },
          }
        )

        processedChunks.push(processedChunk.choices[0].message.content!)

        // Every 10 chunks, we'll aggregate intermediate results
        if (i % 10 === 9 || i === chunks.length - 1) {
          await context.run(`aggregate-results${i}`, async () => {
            await aggregateResults(processedChunks)
            processedChunks.length = 0
          })
        }
      }

      // Step 3: Generate and send data report
      const report = await context.run("generate-report", async () => {
        return await generateReport(request.datasetId)
      })

      await context.run("send-report", async () => {
        await sendReport(report, request.userId)
      })
    }
  )
  ```

  ```python main.py theme={"system"}
  from fastapi import FastAPI
  import json
  import os
  from typing import Dict, List, Any, TypedDict
  from upstash_workflow.fastapi import Serve
  from upstash_workflow import AsyncWorkflowContext, CallResponse
  from utils import (
      aggregate_results,
      generate_report,
      send_report,
      get_dataset_url,
      split_into_chunks,
  )


  app = FastAPI()
  serve = Serve(app)


  class RequestPayload(TypedDict):
      dataset_id: str
      user_id: str


  @serve.post("/ai-generation")
  async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None:
      request = context.request_payload
      dataset_id = request["dataset_id"]
      user_id = request["user_id"]

      # Step 1: Download the dataset
      async def _get_dataset_url() -> str:
          return await get_dataset_url(dataset_id)

      dataset_url = await context.run("get-dataset-url", _get_dataset_url)

      # HTTP request with much longer timeout (2hrs)
      response: CallResponse[Any] = await context.call(
          "download-dataset", url=dataset_url, method="GET"
      )
      dataset = response.body

      # Step 2: Process data in chunks using OpenAI
      chunk_size = 1000
      chunks = split_into_chunks(dataset, chunk_size)
      processed_chunks: List[str] = []

      for i, chunk in enumerate(chunks):
          openai_response: CallResponse[Dict[str, str]] = await context.call(
              f"process-chunk-{i}",
              url="https://api.openai.com/v1/chat/completions",
              method="POST",
              headers={
                  "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
              },
              body={
                  "model": "gpt-4",
                  "messages": [
                  {
                      "role": "system",
                      "content":
                      "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
                  },
                  {
                      "role": "user",
                      "content": f"Analyze this data chunk: {json.dumps(chunk)}",
                  },
                  ],
                  "max_tokens": 150,
              },
          )

          processed_chunks.append(
              openai_response.body["choices"][0]["message"]["content"]
          )

          # Every 10 chunks, we'll aggregate intermediate results
          if i % 10 == 9 or i == len(chunks) - 1:

              async def _aggregate_results() -> None:
                  await aggregate_results(processed_chunks)
                  processed_chunks.clear()

              await context.run(f"aggregate-results{i}", _aggregate_results)

      # Step 3: Generate and send data report
      async def _generate_report() -> Any:
          return await generate_report(dataset_id)

      report = await context.run("generate-report", _generate_report)

      async def _send_report() -> None:
          await send_report(report, user_id)

      await context.run("send-report", _send_report)

  ```
</CodeGroup>

## Code Breakdown

### 1. Preparing our data

We start by retrieving the dataset URL and then downloading the dataset:

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  const datasetUrl = await context.run("get-dataset-url", async () => {
    return await getDatasetUrl(request.datasetId)
  })

  const { body: dataset } = await context.call("download-dataset", {
    url: datasetUrl,
    method: "GET"
  })
  ```

  ```python main.py theme={"system"}
  async def _get_dataset_url() -> str:
      return await get_dataset_url(dataset_id)

  dataset_url = await context.run("get-dataset-url", _get_dataset_url)

  response: CallResponse[Any] = await context.call(
      "download-dataset", url=dataset_url, method="GET"
  )
  dataset = response.body

  ```
</CodeGroup>

Note that we use `context.call` for the download, a way to make HTTP requests that run for much longer than your serverless execution limit would normally allow.

### 2. Processing our data

We split the dataset into chunks and process each one using OpenAI's GPT-4 model:

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  for (let i = 0; i < chunks.length; i++) {
    const { body: processedChunk } = await context.api.openai.call<OpenAiResponse>(
      `process-chunk-${i}`,
      {
        token: process.env.OPENAI_API_KEY!,
        operation: "chat.completions.create",
        body: {
          model: "gpt-4",
          messages: [
            {
              role: "system",
              content:
                "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
            },
            {
              role: "user",
              content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
            },
          ],
          max_completion_tokens: 150,
        },
      }
    )
  }
  ```

  ```python main.py theme={"system"}
  for i, chunk in enumerate(chunks):
      openai_response: CallResponse[Dict[str, str]] = await context.call(
          f"process-chunk-{i}",
          url="https://api.openai.com/v1/chat/completions",
          method="POST",
          headers={
              "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
          },
          body={
              "model": "gpt-4",
              "messages": [
              {
                  "role": "system",
                  "content":
                  "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
              },
              {
                  "role": "user",
                  "content": f"Analyze this data chunk: {json.dumps(chunk)}",
              },
              ],
              "max_tokens": 150,
          },
      )

  ```
</CodeGroup>

### 3. Aggregating our data

After processing our data in smaller chunks to avoid any function timeouts, we aggregate results every 10 chunks:

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  if (i % 10 === 9 || i === chunks.length - 1) {
    await context.run(`aggregate-results${i}`, async () => {
      await aggregateResults(processedChunks)
      processedChunks.length = 0
    })
  }

  ```

  ```python main.py theme={"system"}
  if i % 10 == 9 or i == len(chunks) - 1:

      async def _aggregate_results() -> None:
          await aggregate_results(processed_chunks)
          processed_chunks.clear()

      await context.run(f"aggregate-results{i}", _aggregate_results)

  ```
</CodeGroup>

### 4. Sending a report

Finally, we generate a report based on the aggregated results and send it to the user:

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  const report = await context.run("generate-report", async () => {
    return await generateReport(request.datasetId)
  })

  await context.run("send-report", async () => {
    await sendReport(report, request.userId)
  })

  ```

  ```python main.py theme={"system"}
  async def _generate_report() -> Any:
      return await generate_report(dataset_id)

  report = await context.run("generate-report", _generate_report)

  async def _send_report() -> None:
      await send_report(report, user_id)

  await context.run("send-report", _send_report)

  ```
</CodeGroup>

## Key Features

1. **Non-blocking HTTP Calls**: We use `context.call` for API requests so they don't consume the endpoint's execution time (great for optimizing serverless cost).

2. **Long-running tasks**: The dataset download can take up to 2 hours, though is realistically limited by function memory.
