> ## Documentation Index
> Fetch the complete documentation index at: https://upstash.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom Retry Logic

## Key Features

This example demonstrates how to implement custom retry logic when using third-party services in your Upstash Workflow.

We'll use OpenAI as an example for such a third-party service. **Our retry logic uses response status codes and headers to control when to retry, sleep, or store the third-party API response**.

## Code Example

The following code:

1. Attempts to make an API call up to 10 times.
2. Dynamically adjusts request delays based on response headers or status.
3. Stores successful responses asynchronously.

<CodeGroup>
  ```typescript api/workflow/route.ts theme={"system"}
  import { serve } from "@upstash/workflow/nextjs"
  import { storeResponse } from "@/lib/utils"

  const BASE_DELAY = 10;

  const createSystemMessage = () => ({
    role: "system",
    content: "You are an AI assistant providing a brief summary and key insights for any given data.",
  })

  const createUserMessage = (data: string) => ({
    role: "user",
    content: `Analyze this data chunk: ${data}`,
  })

  export const { POST } = serve<{ userData: string }>(async (context) => {
    // 👇 initial data sent along when triggering the workflow
    const { userData } = context.requestPayload

    for (let attempt = 0; attempt < 10; attempt++) {
      const response = await context.api.openai.call(`call-openai`, {
        token: process.env.OPENAI_API_KEY!,
        operation: "chat.completions.create",
        body: {
          model: "gpt-3.5-turbo",
          messages: [createSystemMessage(), createUserMessage(userData)],
          max_completion_tokens: 150,
        },
      })

      // Success case
      if (response.status < 300) {
        await context.run("store-response-in-db", () => storeResponse(response.body))
        return
      }

      // Rate limit case - wait and retry
      if (response.status === 429) {
        const resetTime =
          response.header["x-ratelimit-reset-tokens"]?.[0] ||
          response.header["x-ratelimit-reset-requests"]?.[0] ||
          BASE_DELAY

        // assuming `resetTime` is in seconds
        await context.sleep("sleep-until-retry", Number(resetTime))

        continue
      }

      // Any other scenario - pause for 5 seconds to avoid overloading OpenAI API
      await context.sleep("pause-to-avoid-spam", 5)
    }
  })
  ```

  ```python main.py theme={"system"}
  from fastapi import FastAPI
  from typing import Dict, Any, TypedDict
  import os
  from upstash_workflow.fastapi import Serve
  from upstash_workflow import AsyncWorkflowContext, CallResponse
  from utils import store_response

  app = FastAPI()
  serve = Serve(app)


  class InitialData(TypedDict):
      user_data: str


  def create_system_message() -> Dict[str, str]:
      return {
          "role": "system",
          "content": "You are an AI assistant providing a brief summary and key insights for any given data.",
      }


  def create_user_message(data: str) -> Dict[str, str]:
      return {"role": "user", "content": f"Analyze this data chunk: {data}"}


  @serve.post("/custom-retry-logic")
  async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None:
      # 👇 initial data sent along when triggering the workflow
      user_data = context.request_payload["user_data"]

      for attempt in range(10):
          response: CallResponse[Dict[str, Any]] = await context.call(
              "call-openai",
              url="https://api.openai.com/v1/chat/completions",
              method="POST",
              headers={
                  "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
              },
              body={
                  "model": "gpt-4",
                  "messages": [create_system_message(), create_user_message(user_data)],
                  "max_tokens": 150,
              },
          )

          # Success case
          if response.status_code < 300:

              async def _store_response_in_db() -> None:
                  await store_response(response.body)

              await context.run("store-response-in-db", _store_response_in_db)
              return

          # Rate limit case - wait and retry
          if response.status_code == 429:
              ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens")
              ratelimit_requests_header = response.header.get(
                  "x-ratelimit-reset-requests"
              )
              reset_time = (
                  (ratelimit_tokens_header[0] if ratelimit_tokens_header else None)
                  or (ratelimit_requests_header[0] if ratelimit_requests_header else None)
                  or 10
              )

              # assuming `reset_time` is in seconds
              await context.sleep("sleep-until-retry", float(reset_time))
              continue

          # Any other scenario - pause for 5 seconds to avoid overloading OpenAI API
          await context.sleep("pause-to-avoid-spam", 5)

  ```
</CodeGroup>

## Code Breakdown

### 1. Setting up our Workflow

This POST endpoint serves our workflow. We create a loop to attempt the API call (we're about to write) up to 10 times.

<CodeGroup>
  ```typescript TypeScript theme={"system"}
  export const { POST } = serve<{ userData: string }>(async (context) => {
    for (let attempt = 0; attempt < 10; attempt++) {
      // TODO: call API in here
    }
  })
  ```

  ```python Python theme={"system"}
  @serve.post("/custom-retry-logic")
  async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None:
      for attempt in range(10):
          # TODO: call API in here

  ```
</CodeGroup>

### 2. Making a Third-Party API Call

We use `context.api.openai.call` to send a request to OpenAI.

<Note>
  `context.api.openai.call` uses `context.call` in the background and
  using `context.call` to request data from an API is one of the most powerful Upstash Workflow
  features. Your request can take much longer than any function timeout would normally allow,
  completely bypassing any platform-specific timeout limits.
</Note>

Our request to OpenAI includes an auth header, model parameters, and the data to be processed by the AI. The response from this function call (`response`) is used to determine our retry logic based on its status code and headers.

<CodeGroup>
  ```typescript TypeScript theme={"system"}
  const response = await context.api.openai.call(`call-openai`, {
    token: process.env.OPENAI_API_KEY,
    operation: "chat.completions.create",
    body: {
      model: "gpt-3.5-turbo",
      messages: [createSystemMessage(), createUserMessage(userData)],
      max_completion_tokens: 150,
    },
  })
  ```

  ```python Python theme={"system"}
  response: CallResponse[Dict[str, Any]] = await context.call(
      "call-openai",
      url="https://api.openai.com/v1/chat/completions",
      method="POST",
      headers={
          "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
      },
      body={
          "model": "gpt-4",
          "messages": [create_system_message(), create_user_message(user_data)],
          "max_tokens": 150,
      },
  )

  ```
</CodeGroup>

### 3. Processing a Successful Response (Status Code \< 300)

If the OpenAI response is successful (status code under 300), we store the response in our database. We create a new workflow task (`workflow.run`) to do this for maximum reliability.

<CodeGroup>
  ```typescript TypeScript theme={"system"}
  if (response.status < 300) {
    await context.run("store-response-in-db", () => storeResponse(response.body))
    return
  }
  ```

  ```python Python theme={"system"}
  if response.status_code < 300:

      async def _store_response_in_db() -> None:
          await store_response(response.body)

      await context.run("store-response-in-db", _store_response_in_db)
      return
      
  ```
</CodeGroup>

### 4. Handling Rate Limits (Status Code 429)

If the API response indicates a rate limit error (status code 429), we retrieve our rate limit reset values from the response headers. We calculate the time until the rate limit resets and then pause execution (`workflow.sleep`) for this duration.

<CodeGroup>
  ```typescript TypeScript theme={"system"}
  if (response.status === 429) {
    const resetTime =
      response.header["x-ratelimit-reset-tokens"]?.[0] ||
      response.header["x-ratelimit-reset-requests"]?.[0] ||
      BASE_DELAY

    // assuming `resetTime` is in seconds
    await context.sleep("sleep-until-retry", Number(resetTime))

    continue
  }
  ```

  ```python Python theme={"system"}
  if response.status_code == 429:
      ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens")
      ratelimit_requests_header = response.header.get(
          "x-ratelimit-reset-requests"
      )
      reset_time = (
          (ratelimit_tokens_header[0] if ratelimit_tokens_header else None)
          or (ratelimit_requests_header[0] if ratelimit_requests_header else None)
          or 10
      )

      # assuming `reset_time` is in seconds
      await context.sleep("sleep-until-retry", float(reset_time))
      continue

  ```
</CodeGroup>

### 5. Waiting Before the Next Retry Attempt

To avoid making too many requests in a short period and possibly overloading the OpenAI API, we pause our workflow before the next retry attempt (i.e., 5 seconds), regardless of rate limits.

<CodeGroup>
  ```typescript TypeScript theme={"system"}
  await context.sleep("pause-to-avoid-spam", 5)
  ```

  ```python Python theme={"system"}
  await context.sleep("pause-to-avoid-spam", 5)

  ```
</CodeGroup>
