Flow Control enables you to limit the number of calls made to your workflow by delaying the delivery.

There are two main use cases for Flow-Control in Workflow:

  1. Limiting the Workflow Environment: Controlling the execution environment.
  2. Limiting External API Calls: Preventing excessive requests to external services.

There are three parameters you can configure to achieve the desired behavior:

  • Rate and Period:

    • Rate specifies the maximum number of calls allowed within a given period (default: 1 second, configurable via the period parameter). All calls sharing the same FlowControl key count toward this limit. Instead of rejecting calls that exceed the rate, QStash automatically queues and delays them to ensure the limit is respected.
    • Period is the time window over which the rate limit is enforced. Adjust this to control how frequently calls are allowed.
  • Parallelism:
    Sets the maximum number of calls that can be executed concurrently. Unlike rate limiting, this parameter considers the duration of each call. If the parallelism limit is reached, additional calls are queued and will only start when an active call completes, ensuring that the number of simultaneous executions never exceeds the specified limit.

Using Rate and Parallelism Together: All parameters can be combined. For example, with a rate of 10 per second and parallelism of 20, if each request takes a minute to complete, QStash will trigger 10 calls in the first second and another 10 in the next. Since none of them will have finished, the system will wait until one completes before triggering another.

For the FlowControl, you need to choose a key first. This key is used to count the number of calls made to your endpoint. There are not limits to number of keys you can use.

The rate/parallelism limits are not applied per url, they are applied per Flow-Control-Key.

Keep in mind that rate/period and parallelism info are kept on each publish separately. That means if you change the rate/period or parallelism on a new publish, the old fired ones will not be affected. They will keep their flowControl config. During the period that old publishes has not delivered but there are also the publishes with the new rates, QStash will effectively allow the highest rate/period or highest parallelism. Eventually(after the old publishes are delivered), the new rate/period and parallelism will be used.

Limiting the Workflow Environment

To limit the execution environment, you need to configure both the serve and trigger methods. When configured, all the steps of the workflow will respect the limits. Due to the nature of the Workflow SDK, QStash calls the serve method multiple times. This means that to stay within the limits of the deployed environments, the given rate will be applied to all calls going from QStash servers to the serve method.

Note that if there are multiple Workflows running in the same environment, their steps can interleave, but overall rate and parallelism limits will be respected if they share the same flowControl key.

  • In the serve method:
export const { POST } = serve<string>(
  async (context) => {
    await context.run("step-1", async () => {
      return someWork();
    });
  },
  {
    flowControl: { key: "app1", parallelism: 3, rate: 10, period: "1m" }
  }
);

For more details, see the flowControl documentation under serve parameters.

  • In the trigger method:
import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QStash_TOKEN>" });
const { workflowRunId } = await client.trigger({
  url: "https://workflow-endpoint.com",
  body: "hello there!",
  flowControl: { key: "app1", parallelism: 3, rate: 10 }
});

For more details on trigger, see the documentation here.

Limiting External API Calls

To limit requests to an external API, use context.call:

import { serve } from "@upstash/workflow/nextjs";

export const { POST } = serve<{ topic: string }>(async (context) => {
  const request = context.requestPayload;

  const response = await context.call(
    "generate-long-essay", 
    {
      url: "https://api.openai.com/v1/chat/completions",
      method: "POST",
      body: {/*****/},
      flowControl: { key: "opani-call", parallelism: 3, rate: 10 }
    }
  );
});

For more details, see the documentation here.

Rest API for Flow Control Information

You can also use the Rest API to get information on the flow control. See the API documentation for more details.