---
title: AI SDK
description: Use AI SDK's streamText directly inside durable workflows for lower-level control over model calls and tool execution.
type: guide
summary: Use streamText() inside a workflow for full control over model options, stop conditions, and output schemas — while tools remain durable steps.
related:
  - /docs/ai
  - /docs/ai/chat-session-modeling
  - /docs/ai/defining-tools
  - /docs/ai/resumable-streams
  - /docs/api-reference/workflow-ai/durable-agent
---

# AI SDK



[AI SDK](https://ai-sdk.dev/) is Vercel's framework-agnostic TypeScript toolkit for building AI-powered apps and agents — unified provider access, streaming, tool calling, structured output, and UI hooks. Workflow SDK complements it by making those calls durable: the model request, the tool loop, and the multi-turn conversation all survive restarts and timeouts.

For the full AI SDK reference (providers, `streamText`, `generateObject`, `useChat`, tool calling, etc.) see the [AI SDK docs](https://ai-sdk.dev/docs). This page covers the Workflow-specific integration points.

<Callout type="info">
  For most agent use cases, prefer [`DurableAgent`](/cookbook/agent-patterns/durable-agent) which wraps [`streamText`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) and manages the tool loop automatically. This page covers using `streamText()` directly when you need lower-level control.
</Callout>

## When to use streamText directly

Use [`streamText()`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) instead of `DurableAgent` when you need:

* **Custom stop conditions** — [`stopWhen`](https://ai-sdk.dev/docs/ai-sdk-core/agents#stop-conditions), [`prepareStep`](https://ai-sdk.dev/docs/ai-sdk-core/agents#prepare-step), or [`onStepFinish`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text#on-step-finish) callbacks
* **Structured output** — [`Output.object()`](https://ai-sdk.dev/docs/ai-sdk-core/generating-structured-data) or `Output.array()` alongside tool calling
* **Step-level callbacks** — `onStepFinish` for logging, metrics, or branching logic
* **Provider options** — per-step model switching, reasoning budgets, or custom [provider options](https://ai-sdk.dev/docs/ai-sdk-core/provider-options)

## Multi-turn pattern

One workflow run = one full conversation. The workflow suspends between turns on a hook and resumes when the next user message arrives. Conversation state, tool history, and intermediate computation all live inside the run.

<Tabs items={['Workflow', 'API Route', 'Client']}>
  <Tab value="Workflow">
    ```typescript title="workflows/support.ts" lineNumbers
    import { streamText, stepCountIs } from "ai";
    import { defineHook, getWritable, getWorkflowMetadata } from "workflow";
    import type { ModelMessage, UIMessageChunk } from "ai";
    import { z } from "zod";

    const MAX_TURNS = 20;

    export const turnHook = defineHook({ // [!code highlight]
      schema: z.object({ message: z.string() }),
    });

    async function lookupOrder({ orderId }: { orderId: string }) {
      "use step";
      const res = await fetch(`https://api.store.com/orders/${orderId}`);
      return res.json();
    }

    async function processRefund({ orderId, reason }: { orderId: string; reason: string }) {
      "use step";
      const res = await fetch("https://api.store.com/refunds", {
        method: "POST",
        body: JSON.stringify({ orderId, reason }),
      });
      return res.json();
    }

    const TOOLS = {
      lookupOrder: {
        description: "Look up an order by ID",
        inputSchema: z.object({ orderId: z.string() }),
        execute: lookupOrder,
      },
      processRefund: {
        description: "Process a refund",
        inputSchema: z.object({ orderId: z.string(), reason: z.string() }),
        execute: processRefund,
      },
    };

    // Per-turn step — streams one agent response to the durable writable // [!code highlight]
    async function runTurn(messages: ModelMessage[]) {
      "use step";

      const result = streamText({
        model: "anthropic/claude-haiku-4.5",
        system: "You are a customer support agent.",
        messages,
        tools: TOOLS,
        stopWhen: stepCountIs(8),
      });

      const writable = getWritable<UIMessageChunk>();
      // preventClose keeps the durable writable open so the next turn can  // write to it. Each turn still emits its own start + finish chunks.
      await result.toUIMessageStream().pipeTo(writable, { preventClose: true }); // [!code highlight]

      const response = await result.response;
      return { responseMessages: response.messages };
    }

    export async function supportWorkflow(initialMessages: ModelMessage[]) {
      "use workflow";

      const { workflowRunId } = getWorkflowMetadata();
      // Create the hook once, outside the loop — same token = HookConflictError // [!code highlight]
      const hook = turnHook.create({ token: workflowRunId }); // [!code highlight]
      let allMessages = initialMessages;

      for (let turn = 0; turn < MAX_TURNS; turn++) {
        const { responseMessages } = await runTurn(allMessages);
        allMessages = [...allMessages, ...responseMessages];

        const { message } = await hook; // [!code highlight] suspend until next user message
        if (message === "/done") break;

        allMessages = [...allMessages, { role: "user", content: message }];
      }

      return { turns: MAX_TURNS };
    }
    ```
  </Tab>

  <Tab value="API Route">
    One endpoint handles first turn, follow-ups, and the `/done` exit. The client sends `runId` in the body to distinguish first vs follow-up.

    ```typescript title="app/api/support/route.ts" lineNumbers
    import type { UIMessage, UIMessageChunk } from "ai";
    import { convertToModelMessages, createUIMessageStreamResponse } from "ai";
    import { start, getRun } from "workflow/api";
    import { supportWorkflow, turnHook } from "@/workflows/support";

    // Pump the durable stream until this turn's `finish` chunk, then close  // the HTTP response. The source reader is released (not cancelled) so the
    // workflow's durable stream keeps flowing for the next turn.
    function sliceUntilFinish( // [!code highlight]
      source: ReadableStream<UIMessageChunk>
    ): ReadableStream<UIMessageChunk> {
      return new ReadableStream<UIMessageChunk>({
        async start(controller) {
          const reader = source.getReader();
          try {
            while (true) {
              const { done, value } = await reader.read();
              if (done) break;
              controller.enqueue(value);
              if (value.type === "finish") break; // [!code highlight]
            }
            controller.close();
          } catch (e) {
            controller.error(e);
          } finally {
            reader.releaseLock();
          }
        },
      });
    }

    // `/done` exits the workflow without emitting chunks. Return a synthetic
    // start+finish so useChat's lifecycle terminates cleanly.
    function emptyTurnStream(): ReadableStream<UIMessageChunk> {
      return new ReadableStream<UIMessageChunk>({
        start(controller) {
          controller.enqueue({ type: "start", messageId: crypto.randomUUID() });
          controller.enqueue({ type: "finish" });
          controller.close();
        },
      });
    }

    export async function POST(req: Request) {
      const { messages, runId }: { messages: UIMessage[]; runId?: string } =
        await req.json();
      const modelMessages = await convertToModelMessages(messages);

      // Follow-up turn: resume hook, return stream starting AFTER the last turn // [!code highlight]
      if (runId) {
        try {
          const run = getRun(runId);

          // Snapshot tail before resuming so our slice only contains this turn // [!code highlight]
          const probe = run.getReadable();
          const tailIndex = await probe.getTailIndex();
          await probe.cancel();

          const lastUser = modelMessages.filter((m) => m.role === "user").at(-1);
          const text =
            typeof lastUser?.content === "string"
              ? lastUser.content
              : Array.isArray(lastUser?.content)
                ? lastUser.content
                    .filter((p): p is { type: "text"; text: string } =>
                      "type" in p && p.type === "text"
                    )
                    .map((p) => p.text)
                    .join("")
                : "";

          await turnHook.resume(runId, { message: text }); // [!code highlight]

          if (text === "/done") {
            return createUIMessageStreamResponse({
              stream: emptyTurnStream(),
              headers: { "x-workflow-run-id": runId },
            });
          }

          const stream = sliceUntilFinish(
            run.getReadable({ startIndex: tailIndex + 1 }) // [!code highlight]
          );

          return createUIMessageStreamResponse({
            stream,
            headers: { "x-workflow-run-id": runId },
          });
        } catch (e: unknown) {
          const msg = e instanceof Error ? e.message.toLowerCase() : "";
          if (!msg.includes("not found") && !msg.includes("expired")) throw e;
          // Stale runId — fall through to start fresh
        }
      }

      // First turn: start a new workflow // [!code highlight]
      const run = await start(supportWorkflow, [modelMessages]);
      const stream = sliceUntilFinish(run.readable);

      return createUIMessageStreamResponse({
        stream,
        headers: { "x-workflow-run-id": run.runId },
      });
    }
    ```
  </Tab>

  <Tab value="Client">
    Store the `runId` in a ref and pass it in the body of every follow-up. `WorkflowChatTransport` forwards it for you.

    ```tsx title="components/support-chat.tsx" lineNumbers
    "use client";

    import { useChat } from "@ai-sdk/react";
    import { WorkflowChatTransport } from "@workflow/ai";
    import { useMemo, useRef, useState } from "react";

    export function SupportChat() {
      const [input, setInput] = useState("");
      const runIdRef = useRef<string | null>(null); // [!code highlight]

      const transport = useMemo(
        () =>
          new WorkflowChatTransport({
            api: "/api/support",
            prepareSendMessagesRequest: ({ messages, body }) => ({
              body: { ...body, messages, runId: runIdRef.current }, // [!code highlight]
            }),
            onChatSendMessage: (response) => {
              const id = response.headers.get("x-workflow-run-id");
              if (id) runIdRef.current = id; // [!code highlight]
            },
          }),
        []
      );

      const { messages, sendMessage, status } = useChat({ transport });
      const busy = status === "streaming" || status === "submitted";

      return (
        <form
          onSubmit={(e) => {
            e.preventDefault();
            if (busy || !input.trim()) return;
            sendMessage({ text: input });
            setInput("");
          }}
        >
          {messages.map((m) => (
            <div key={m.id}>{m.role}: {m.parts.map((p) => p.type === "text" ? p.text : "").join("")}</div>
          ))}
          <input value={input} onChange={(e) => setInput(e.target.value)} disabled={busy} />
        </form>
      );
    }
    ```
  </Tab>
</Tabs>

## How it works

1. **One workflow = one conversation.** The workflow loops on a hook, keeping `allMessages`, tool history, and state alive across turns.
2. **Hook is created once.** `turnHook.create({ token: workflowRunId })` outside the loop — calling it twice with the same token throws `HookConflictError`.
3. **`preventClose: true`** on `pipeTo` keeps the durable writable open so the next turn can write to it.
4. **`sliceUntilFinish`** in the API reads chunks until `type === "finish"`, then closes the HTTP response. The source reader is released — not cancelled — so the workflow stream keeps flowing.
5. **`startIndex: tailIndex + 1`** gives each follow-up response only the new chunks, avoiding replay of previous turns.
6. **`/done`** resumes the hook so the workflow exits cleanly, then returns a synthetic `start` + `finish` so `useChat` transitions out of "streaming".

## Pitfalls

Non-obvious correctness details worth knowing before adapting this pattern.

### Snapshot `tailIndex` *before* resuming the hook

{/* @skip-typecheck - fragment referencing variables from the surrounding multi-turn pattern */}

```typescript
const tailIndex = await probe.getTailIndex(); // [!code highlight] FIRST
await probe.cancel();
await turnHook.resume(runId, { message: text }); // [!code highlight] THEN
const stream = run.getReadable({ startIndex: tailIndex + 1 });
```

Reversing the order races the workflow: by the time you read `tailIndex`, the next turn has already written its `start` chunk, and your `startIndex + 1` skips past it.

### Don't call `writable.close()` inside a workflow function

I/O operations like closing streams must happen inside a `"use step"` function. Calling `writable.close()` directly in the workflow body throws `Not supported in workflow functions`. When the workflow returns, the runtime closes the underlying writable for you.

### Don't use `TransformStream.terminate()` to slice the stream

A `TransformStream` with `controller.terminate()` on the `finish` chunk seems like the obvious fit for `sliceUntilFinish`, but throws `Invalid state: TransformStream has been terminated` when late-arriving chunks hit the transform callback. Manual pumping through a custom `ReadableStream` (as shown above) sidesteps the problem entirely.

### Release the source reader, don't cancel it

In `sliceUntilFinish`, use `reader.releaseLock()` in the `finally` block rather than `source.cancel()`. Cancelling propagates upstream and closes the durable writable, breaking the next turn. Releasing the lock just detaches our reader; the durable stream keeps flowing.

### Handle stale `runId` gracefully

Clients can send a `runId` from a long-gone workflow (localStorage, back button, server restart). Wrap the follow-up path in a try/catch for `not found` / `expired` and fall through to the first-turn code path to start a fresh workflow.

## streamText vs DurableAgent

|                         | `streamText()`                      | `DurableAgent`                  |
| ----------------------- | ----------------------------------- | ------------------------------- |
| **Tool loop**           | AI SDK handles via `stopWhen`       | DurableAgent handles internally |
| **LLM call durability** | Re-executes on replay               | Each LLM call is a durable step |
| **Stop conditions**     | `stopWhen`, `prepareStep`           | `prepareStep` only              |
| **Structured output**   | `Output.object()`, `Output.array()` | Not available                   |
| **Step callbacks**      | `onStepFinish`, `onChunk`           | Not available                   |
| **Setup**               | Manual stream piping                | Automatic                       |

Use `DurableAgent` for most agent use cases. Use `streamText` when you need the additional control.

## Key APIs

**AI SDK** ([docs](https://ai-sdk.dev/docs))

* [`streamText()`](https://ai-sdk.dev/docs/reference/ai-sdk-core/stream-text) — core streaming function; `toUIMessageStream()` pipes into the durable writable
* [`tool()` / tool calling](https://ai-sdk.dev/docs/ai-sdk-core/tools-and-tool-calling) — tools wrap `"use step"` functions so each tool call is replayed from the log, not re-executed
* [`stepCountIs()` / `stopWhen`](https://ai-sdk.dev/docs/ai-sdk-core/agents#stop-conditions) — bound the agent loop inside each turn
* [`convertToModelMessages()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/convert-to-model-messages) / [`createUIMessageStreamResponse()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/create-ui-message-stream-response) — UI ↔ model message conversion at the API boundary
* [`useChat()`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat) — React hook that consumes the UI message stream on the client

**Workflow SDK**

* [`"use step"`](/docs/api-reference/workflow/use-step) — makes tool executions durable
* [`defineHook()`](/docs/api-reference/workflow/define-hook) — suspension point for follow-up messages
* [`getWritable()`](/docs/api-reference/workflow/get-writable) — resumable stream output
* [`getRun()`](/docs/api-reference/workflow-api/get-run) — `run.getReadable({ startIndex })` for slicing per-turn streams
* [`WorkflowChatTransport`](/docs/api-reference/workflow-ai/workflow-chat-transport) — passes `runId` between turns


## Sitemap
[Overview of all docs pages](/sitemap.md)
