---
title: AI SDK
description: Use AI SDK's streamText directly inside durable workflows when you need the raw AI SDK API or a per-turn durability boundary.
type: guide
summary: Use streamText() inside a workflow when the durability boundary is an entire user turn, or when you need AI SDK APIs not exposed by DurableAgent. Individual tool calls and LLM calls inside a turn are not separately durable.
related:
  - /docs/ai
  - /docs/ai/chat-session-modeling
  - /docs/ai/defining-tools
  - /docs/ai/resumable-streams
  - /docs/api-reference/workflow-ai/durable-agent
---

# AI SDK



[AI SDK](https://ai-sdk.dev/) is Vercel's framework-agnostic TypeScript toolkit for building AI-powered apps and agents — unified provider access, streaming, tool calling, structured output, and UI hooks. Workflow SDK complements it by making the multi-turn loop durable: the conversation state, hooks, and per-turn responses survive restarts and timeouts. Note that in this pattern the durability boundary is the entire turn — individual tool calls inside a turn are **not** durable on their own (see [Pitfalls](#tools-are-not-individually-durable) below).

For the full AI SDK reference (providers, `streamText`, `generateObject`, `useChat`, tool calling, etc.) see the [AI SDK docs](https://ai-sdk.dev/docs). This page covers the Workflow-specific integration points.

<Callout type="info">
  For most agent use cases, prefer [`DurableAgent`](/cookbook/agent-patterns/durable-agent), which implements the same agent loop as [`streamText`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text), manages tool calling automatically, and runs tools at workflow scope — each tool can be marked `"use step"` for per-call durability and retries, or stay at workflow level to use primitives like `sleep()` and hooks. Use this page's raw `streamText()` pattern when you want the exact AI SDK API (for example `toUIMessageStream()`, `onChunk`, or `generateText`), or when the durability boundary should be an entire user turn in one step — accepting that tool calls inside that turn are not individually durable.
</Callout>

## When to use streamText directly

Use [`streamText()`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) instead of `DurableAgent` when you need:

* **The raw AI SDK API** — `streamText().toUIMessageStream()`, `onChunk`, `smoothStream`, or other options that map directly to the [`streamText`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) return value rather than `DurableAgent.stream()`
* **Per-turn durability** — wrap the entire agent response (model + tools) in a single `"use step"` function so one user turn is the atomic retry unit; useful when you want all tool calls inside a turn to re-execute together
* **Custom multi-turn orchestration** — manual hook loops, per-turn stream slicing (`sliceUntilFinish`), or other workflow patterns shown below that don't map cleanly to `DurableAgent`

`DurableAgent` already supports `stopWhen`, `prepareStep`, `onStepFinish`, structured output (`experimental_output`), per-step model switching, and [provider options](https://ai-sdk.dev/docs/ai-sdk-core/provider-options). See the [DurableAgent reference](/docs/api-reference/workflow-ai/durable-agent).

## Multi-turn pattern

One workflow run = one full conversation. The workflow suspends between turns on a hook and resumes when the next user message arrives. Conversation state, tool history, and intermediate computation all live inside the run.

<Callout type="info">
  Because the conversation is one workflow run, it stays on the deployment that started it. If each turn should run on the latest deployment while preserving selected state or streams, see [Versioning](/docs/foundations/versioning) for the child-run continuation pattern.
</Callout>

<Tabs items={['Workflow', 'API Route', 'Client']}>
  <Tab value="Workflow">
    ```typescript title="workflows/support.ts" lineNumbers
    import { streamText, stepCountIs } from "ai";
    import { defineHook, getWritable, getWorkflowMetadata } from "workflow";
    import type { ModelMessage, UIMessageChunk } from "ai";
    import { z } from "zod";

    const MAX_TURNS = 20;

    export const turnHook = defineHook({ // [!code highlight]
      schema: z.object({ message: z.string() }),
    });

    // `streamText` runs tool executes inside `runTurn` (a step), so tool calls
    // are not individually durable — the entire turn retries together. See
    // "Tools are not individually durable" below. Make side-effectful tools idempotent.
    async function lookupOrder({ orderId }: { orderId: string }) {
      const res = await fetch(`https://api.store.com/orders/${orderId}`);
      return res.json();
    }

    async function processRefund({ orderId, reason }: { orderId: string; reason: string }) {
      const res = await fetch("https://api.store.com/refunds", {
        method: "POST",
        body: JSON.stringify({ orderId, reason }),
      });
      return res.json();
    }

    const TOOLS = {
      lookupOrder: {
        description: "Look up an order by ID",
        inputSchema: z.object({ orderId: z.string() }),
        execute: lookupOrder,
      },
      processRefund: {
        description: "Process a refund",
        inputSchema: z.object({ orderId: z.string(), reason: z.string() }),
        execute: processRefund,
      },
    };

    // Per-turn step — streams one agent response to the durable writable // [!code highlight]
    async function runTurn(messages: ModelMessage[]) {
      "use step";

      const result = streamText({
        model: "anthropic/claude-haiku-4.5",
        system: "You are a customer support agent.",
        messages,
        tools: TOOLS,
        stopWhen: stepCountIs(8),
      });

      const writable = getWritable<UIMessageChunk>();
      // preventClose keeps the durable writable open so the next turn can  // write to it. Each turn still emits its own start + finish chunks.
      await result.toUIMessageStream().pipeTo(writable, { preventClose: true }); // [!code highlight]

      const response = await result.response;
      return { responseMessages: response.messages };
    }

    export async function supportWorkflow(initialMessages: ModelMessage[]) {
      "use workflow";

      const { workflowRunId } = getWorkflowMetadata();
      // Create the hook once, outside the loop — same token = HookConflictError // [!code highlight]
      const hook = turnHook.create({ token: workflowRunId }); // [!code highlight]
      let allMessages = initialMessages;

      for (let turn = 0; turn < MAX_TURNS; turn++) {
        const { responseMessages } = await runTurn(allMessages);
        allMessages = [...allMessages, ...responseMessages];

        const { message } = await hook; // [!code highlight] suspend until next user message
        if (message === "/done") break;

        allMessages = [...allMessages, { role: "user", content: message }];
      }

      return { turns: MAX_TURNS };
    }
    ```
  </Tab>

  <Tab value="API Route">
    One endpoint handles first turn, follow-ups, and the `/done` exit. The client sends `runId` in the body to distinguish first vs follow-up.

    ```typescript title="app/api/support/route.ts" lineNumbers
    import type { UIMessage, UIMessageChunk } from "ai";
    import { convertToModelMessages, createUIMessageStreamResponse } from "ai";
    import { start, getRun } from "workflow/api";
    import { supportWorkflow, turnHook } from "@/workflows/support";

    // Pump the durable stream until this turn's `finish` chunk, then close  // the HTTP response. The source reader is released (not cancelled) so the
    // workflow's durable stream keeps flowing for the next turn.
    function sliceUntilFinish( // [!code highlight]
      source: ReadableStream<UIMessageChunk>
    ): ReadableStream<UIMessageChunk> {
      return new ReadableStream<UIMessageChunk>({
        async start(controller) {
          const reader = source.getReader();
          try {
            while (true) {
              const { done, value } = await reader.read();
              if (done) break;
              controller.enqueue(value);
              if (value.type === "finish") break; // [!code highlight]
            }
            controller.close();
          } catch (e) {
            controller.error(e);
          } finally {
            reader.releaseLock();
          }
        },
      });
    }

    // `/done` exits the workflow without emitting chunks. Return a synthetic
    // start+finish so useChat's lifecycle terminates cleanly.
    function emptyTurnStream(): ReadableStream<UIMessageChunk> {
      return new ReadableStream<UIMessageChunk>({
        start(controller) {
          controller.enqueue({ type: "start", messageId: crypto.randomUUID() });
          controller.enqueue({ type: "finish" });
          controller.close();
        },
      });
    }

    export async function POST(req: Request) {
      const { messages, runId }: { messages: UIMessage[]; runId?: string } =
        await req.json();
      const modelMessages = await convertToModelMessages(messages);

      // Follow-up turn: resume hook, return stream starting AFTER the last turn // [!code highlight]
      if (runId) {
        try {
          const run = getRun(runId);

          // Snapshot tail before resuming so our slice only contains this turn // [!code highlight]
          const probe = run.getReadable();
          const tailIndex = await probe.getTailIndex();
          await probe.cancel();

          const lastUser = modelMessages.filter((m) => m.role === "user").at(-1);
          const text =
            typeof lastUser?.content === "string"
              ? lastUser.content
              : Array.isArray(lastUser?.content)
                ? lastUser.content
                    .filter((p): p is { type: "text"; text: string } =>
                      "type" in p && p.type === "text"
                    )
                    .map((p) => p.text)
                    .join("")
                : "";

          await turnHook.resume(runId, { message: text }); // [!code highlight]

          if (text === "/done") {
            return createUIMessageStreamResponse({
              stream: emptyTurnStream(),
              headers: { "x-workflow-run-id": runId },
            });
          }

          const stream = sliceUntilFinish(
            run.getReadable({ startIndex: tailIndex + 1 }) // [!code highlight]
          );

          return createUIMessageStreamResponse({
            stream,
            headers: { "x-workflow-run-id": runId },
          });
        } catch (e: unknown) {
          const msg = e instanceof Error ? e.message.toLowerCase() : "";
          if (!msg.includes("not found") && !msg.includes("expired")) throw e;
          // Stale runId — fall through to start fresh
        }
      }

      // First turn: start a new workflow // [!code highlight]
      const run = await start(supportWorkflow, [modelMessages]);
      const stream = sliceUntilFinish(run.readable);

      return createUIMessageStreamResponse({
        stream,
        headers: { "x-workflow-run-id": run.runId },
      });
    }
    ```
  </Tab>

  <Tab value="Client">
    Store the `runId` in a ref and pass it in the body of every follow-up. `WorkflowChatTransport` forwards it for you.

    ```tsx title="components/support-chat.tsx" lineNumbers
    "use client";

    import { useChat } from "@ai-sdk/react";
    import { WorkflowChatTransport } from "@workflow/ai";
    import { useMemo, useRef, useState } from "react";

    export function SupportChat() {
      const [input, setInput] = useState("");
      const runIdRef = useRef<string | null>(null); // [!code highlight]

      const transport = useMemo(
        () =>
          new WorkflowChatTransport({
            api: "/api/support",
            prepareSendMessagesRequest: ({ messages, body }) => ({
              body: { ...body, messages, runId: runIdRef.current }, // [!code highlight]
            }),
            onChatSendMessage: (response) => {
              const id = response.headers.get("x-workflow-run-id");
              if (id) runIdRef.current = id; // [!code highlight]
            },
          }),
        []
      );

      const { messages, sendMessage, status } = useChat({ transport });
      const busy = status === "streaming" || status === "submitted";

      return (
        <form
          onSubmit={(e) => {
            e.preventDefault();
            if (busy || !input.trim()) return;
            sendMessage({ text: input });
            setInput("");
          }}
        >
          {messages.map((m) => (
            <div key={m.id}>{m.role}: {m.parts.map((p) => p.type === "text" ? p.text : "").join("")}</div>
          ))}
          <input value={input} onChange={(e) => setInput(e.target.value)} disabled={busy} />
        </form>
      );
    }
    ```
  </Tab>
</Tabs>

## How it works

1. **One workflow = one conversation.** The workflow loops on a hook, keeping `allMessages`, tool history, and state alive across turns.
2. **`runTurn` is the durability boundary.** Each turn is one step. The model request and all tool calls inside it run as plain inline functions within that step. If anything throws mid-turn, the whole `runTurn` retries — individual tool calls are not separately durable. See [Pitfalls](#tools-are-not-individually-durable).
3. **Hook is created once.** `turnHook.create({ token: workflowRunId })` outside the loop — calling it twice with the same token throws `HookConflictError`.
4. **`preventClose: true`** on `pipeTo` keeps the durable writable open so the next turn can write to it.
5. **`sliceUntilFinish`** in the API reads chunks until `type === "finish"`, then closes the HTTP response. The source reader is released — not cancelled — so the workflow stream keeps flowing.
6. **`startIndex: tailIndex + 1`** gives each follow-up response only the new chunks, avoiding replay of previous turns.
7. **`/done`** resumes the hook so the workflow exits cleanly, then returns a synthetic `start` + `finish` so `useChat` transitions out of "streaming".

## Pitfalls

Non-obvious correctness details worth knowing before adapting this pattern.

### Tools are not individually durable

`streamText()` is invoked from inside `runTurn` (a `"use step"` function), and the AI SDK calls each tool by directly invoking its `execute` function in that same step. Even if a tool body has its own `"use step"` directive, that directive is a [no-op when called from another step](/docs/foundations/workflows-and-steps#step-functions) — the function just runs inline.

The consequences:

* The atomic retry unit is the entire `runTurn`, not the individual tool call.
* If `processRefund` succeeds and then the model call (or a later tool) throws, the whole turn retries, and `processRefund` will run again.
* Tool calls do not appear as separate entries in the event log or observability dashboard.

**Mitigations:**

* Make side-effectful tool implementations idempotent — dedupe server-side on a stable key (e.g. `orderId`, an `Idempotency-Key` header, etc.).
* Or use [`DurableAgent`](/docs/api-reference/workflow-ai/durable-agent), which runs tools at workflow scope — each tool can be marked `"use step"` to become its own durable, retryable step, or stay at workflow level to use primitives like `sleep()` and hooks.

### Snapshot `tailIndex` *before* resuming the hook

{/* @skip-typecheck - fragment referencing variables from the surrounding multi-turn pattern */}

```typescript
const tailIndex = await probe.getTailIndex(); // [!code highlight] FIRST
await probe.cancel();
await turnHook.resume(runId, { message: text }); // [!code highlight] THEN
const stream = run.getReadable({ startIndex: tailIndex + 1 });
```

Reversing the order races the workflow: by the time you read `tailIndex`, the next turn has already written its `start` chunk, and your `startIndex + 1` skips past it.

### Don't call `writable.close()` inside a workflow function

I/O operations like closing streams must happen inside a `"use step"` function. Calling `writable.close()` directly in the workflow body throws `Not supported in workflow functions`. When the workflow returns, the runtime closes the underlying writable for you.

### Don't use `TransformStream.terminate()` to slice the stream

A `TransformStream` with `controller.terminate()` on the `finish` chunk seems like the obvious fit for `sliceUntilFinish`, but throws `Invalid state: TransformStream has been terminated` when late-arriving chunks hit the transform callback. Manual pumping through a custom `ReadableStream` (as shown above) sidesteps the problem entirely.

### Release the source reader, don't cancel it

In `sliceUntilFinish`, use `reader.releaseLock()` in the `finally` block rather than `source.cancel()`. Cancelling propagates upstream and closes the durable writable, breaking the next turn. Releasing the lock just detaches our reader; the durable stream keeps flowing.

### Handle stale `runId` gracefully

Clients can send a `runId` from a long-gone workflow (localStorage, back button, server restart). Wrap the follow-up path in a try/catch for `not found` / `expired` and fall through to the first-turn code path to start a fresh workflow.

## streamText vs DurableAgent

|                          | `streamText()` (this pattern)                               | `DurableAgent`                                                                                              |
| ------------------------ | ----------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------- |
| **Tool loop**            | AI SDK handles via `stopWhen`                               | Handles internally (AI SDK–compatible options)                                                              |
| **LLM call durability**  | Re-executes with the parent turn                            | Each LLM call is a durable step                                                                             |
| **Tool call durability** | Not individually durable — re-executes with the parent turn | Per tool — mark `"use step"` for a durable, retryable step, or keep at workflow level for `sleep()` / hooks |
| **Stop conditions**      | `stopWhen`, `prepareStep`                                   | `stopWhen`, `prepareStep`                                                                                   |
| **Structured output**    | `Output.object()`, `Output.array()`                         | `experimental_output` (`Output.object()`, `Output.text()`)                                                  |
| **Step callbacks**       | `onStepFinish`, `onChunk`, etc.                             | `onStepFinish`, `onFinish`, `onError`, `onAbort` (`onChunk` not available)                                  |
| **Setup**                | Manual stream piping and turn slicing                       | Automatic                                                                                                   |

Use `DurableAgent` for most agent use cases. Use `streamText` when you need the raw AI SDK surface or a per-turn durability boundary.

## Key APIs

**AI SDK** ([docs](https://ai-sdk.dev/docs))

* [`streamText()`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) — core streaming function; `toUIMessageStream()` pipes into the durable writable
* [`tool()` / tool calling](https://ai-sdk.dev/docs/ai-sdk-core/tools-and-tool-calling) — tools are plain async functions invoked by `streamText` inside the turn step; they are **not** individually durable in this pattern (see [Pitfalls](#tools-are-not-individually-durable))
* [`stepCountIs()` / `stopWhen`](https://ai-sdk.dev/docs/ai-sdk-core/agents#stop-conditions) — bound the agent loop inside each turn
* [`convertToModelMessages()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/convert-to-model-messages) / [`createUIMessageStreamResponse()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/create-ui-message-stream-response) — UI ↔ model message conversion at the API boundary
* [`useChat()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat) — React hook that consumes the UI message stream on the client

**Workflow SDK**

* [`"use step"`](/docs/api-reference/workflow/use-step) — applied to `runTurn` to make each turn a durable, retryable unit
* [`defineHook()`](/docs/api-reference/workflow/define-hook) — suspension point for follow-up messages
* [`getWritable()`](/docs/api-reference/workflow/get-writable) — resumable stream output
* [`getRun()`](/docs/api-reference/workflow-api/get-run) — `run.getReadable({ startIndex })` for slicing per-turn streams
* [`WorkflowChatTransport`](/docs/api-reference/workflow-ai/workflow-chat-transport) — passes `runId` between turns


## Sitemap
[Overview of all docs pages](/sitemap.md)
