AI Generation
Introduction
This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report.
Use Case
Our workflow will:
- Receive a request to process a dataset
- Download the dataset from a remote source
- Process the data in chunks using OpenAI
- Aggregate results
- Generate and send a final report
Code Example
import { serve } from "@upstash/qstash/nextjs"
import {
downloadData,
aggregateResults,
generateReport,
sendReport,
getDatasetUrl,
splitIntoChunks,
} from "./utils"
type OpenAiResponse = {
choices: {
message: {
role: string,
content: string
}
}[]
}
export const POST = serve<{ datasetId: string; userId: string }>(
async (context) => {
const request = context.requestPayload
// Step 1: Download the dataset
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
// HTTP request with much longer timeout (2hrs)
const dataset = await context.call("download-dataset", datasetUrl, "GET")
// Step 2: Process data in chunks using OpenAI
const chunkSize = 1000
const chunks = splitIntoChunks(dataset, chunkSize)
const processedChunks: string[] = []
for (let i = 0; i < chunks.length; i++) {
const processedChunk = await context.call<OpenAiResponse>(
`process-chunk-${i}`,
"https://api.openai.com/v1/chat/completions",
"POST",
{
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
{ authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
)
processedChunks.push(processedChunk.choices[0].message.content)
// Every 10 chunks, we'll aggregate intermediate results
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
}
// Step 3: Generate and send data report
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
}
)
Code Breakdown
1. Preparing our data
We start by retrieving the dataset URL and then downloading the dataset:
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
const dataset = await context.call("download-dataset", datasetUrl, "GET")
Note that we use context-call
for the download, a way to make HTTP requests that run for much longer than your serverless execution limit would normally allow.
2. Processing our data
We split the dataset into chunks and process each one using OpenAI’s GPT-4 model:
for (let i = 0; i < chunks.length; i++) {
const processedChunk = await context.call(
`process-chunk-${i}`,
"https://api.openai.com/v1/chat/completions",
"POST",
{
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_tokens: 150,
},
{ authorization: `Bearer ${process.env.OPENAI_API_KEY}` }
)
processedChunks.push(processedChunk.choices[0].message.content)
}
3. Aggregating our data
After processing our data in smaller chunks to avoid any function timeouts, we aggregate results every 10 chunks:
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
4. Sending a report
Finally, we generate a report based on the aggregated results and send it to the user:
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
Key Features
-
Non-blocking HTTP Calls: We use
context.call
for API requests so they don’t consume the endpoint’s execution time (great for optimizing serverless cost). -
Long-running tasks: The dataset download can take up to 2 hours, though is realistically limited by function memory.
Was this page helpful?