---
title: Distributed Abort Controller
description: A distributed AbortController that uses durable workflows for cross-process cancellation signaling.
type: guide
summary: Build a distributed abort controller that uses workflow streams and hooks to propagate cancellation signals across process boundaries.
---

# Distributed Abort Controller



Use this pattern when you need an `AbortController`-like interface that works across distributed systems. The controller uses a durable workflow to coordinate cancellation — calling `.abort()` on one machine triggers the `.signal` on any other machine.

## When to use this

* **Cross-process cancellation** — Cancel a long-running operation from a different server, worker, or edge function
* **Durable cancellation** — The abort signal persists even if the process that created it crashes
* **UI stop buttons** — Let users cancel operations running on the server from the browser
* **Timeout coordination** — The built-in TTL auto-expires stale controllers

## Pattern

The `DistributedAbortController` class encapsulates a workflow that:

1. Accepts a user-provided unique ID (like a chat ID or task ID)
2. Creates or reconnects to an existing workflow using that ID
3. Waits for a hook signal OR TTL expiration
4. Writes a cancellation message to the run's stream when triggered

### Core Implementation

```typescript lineNumbers
import { defineHook, getWritable, sleep } from "workflow";
import { start, getRun, getHookByToken } from "workflow/api";

// Default TTL: 24 hours
const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000;
// Default grace period: 1 hour (keeps hook alive after abort for late subscribers)
const DEFAULT_GRACE_MS = 60 * 60 * 1000;

// Hook to trigger the abort signal
export const abortHook = defineHook<{ reason?: string }>();

// The abort message written to the stream
export type AbortMessage = {
  type: "abort";
  reason?: string;
  expired?: boolean;
};

// Helper to create a consistent hook token from the user ID
function getAbortToken(id: string): string {
  return `abort:${id}`;
}

// Step function that writes the abort message to the stream
async function writeAbortSignal(reason?: string, expired?: boolean) {
  "use step";

  const writable = getWritable<AbortMessage>();
  const writer = writable.getWriter();
  try {
    await writer.write({ type: "abort", reason, expired });
  } finally {
    writer.releaseLock();
  }
  await writable.close();
}

// Workflow that waits for abort or TTL expiration
export async function abortControllerWorkflow(
  id: string,
  ttlMs: number,
  graceMs: number
) {
  "use workflow";

  const startTime = Date.now();
  const hook = abortHook.create({ token: getAbortToken(id) });

  // Race: manual abort OR TTL expiration // [!code highlight]
  const result = await Promise.race([
    hook.then((payload) => ({
      reason: payload.reason,
      expired: false,
    })),
    sleep(`${ttlMs}ms`).then(() => ({
      reason: "Controller expired",
      expired: true,
    })),
  ]);

  await writeAbortSignal(result.reason, result.expired);

  // Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers). // [!code highlight]
  // Manual aborts complete immediately.
  if (result.expired) {
    const elapsed = Date.now() - startTime;
    const remainingTime = graceMs - (elapsed - ttlMs);
    if (remainingTime > 0) {
      await sleep(`${remainingTime}ms`); // [!code highlight]
    }
  }

  return { aborted: true, reason: result.reason, expired: result.expired };
}

/**
 * A distributed abort controller that works across process boundaries.
 * Uses a semantically meaningful ID (like a chat ID or task ID) to coordinate.
 */
export class DistributedAbortController {
  private id: string;
  readonly runId: string;

  private constructor(id: string, runId: string) {
    this.id = id;
    this.runId = runId;
  }

  /**
   * Creates or reconnects to a distributed abort controller.
   * If a controller with this ID already exists, reconnects to it.
   * Otherwise, starts a new workflow.
   *
   * @param id - A unique, semantically meaningful ID (e.g., "chat:123")
   * @param options.ttlMs - Time-to-live in ms (default: 24 hours)
   * @param options.graceMs - Grace period after abort (default: 1 hour)
   */
  static async create( // [!code highlight]
    id: string,
    options: { ttlMs?: number; graceMs?: number } = {}
  ): Promise<DistributedAbortController> {
    const { ttlMs = DEFAULT_TTL_MS, graceMs = DEFAULT_GRACE_MS } = options;
    const token = getAbortToken(id);

    // Try to find an existing run with this hook token
    const existingHook = await getHookByToken(token).catch(() => null); // [!code highlight]

    if (existingHook) {
      // Reconnect to existing controller
      return new DistributedAbortController(id, existingHook.runId);
    }

    // Create a new workflow
    const run = await start(abortControllerWorkflow, [id, ttlMs, graceMs]); // [!code highlight]
    return new DistributedAbortController(id, run.runId);
  }

  /**
   * Triggers the abort signal.
   * Idempotent: safe to call multiple times or after the workflow has completed.
   */
  async abort(reason?: string): Promise<void> { // [!code highlight]
    try {
      await abortHook.resume(getAbortToken(this.id), { reason });
    } catch (error) {
      const msg = error instanceof Error ? error.message.toLowerCase() : '';
      if (msg.includes('not found') || msg.includes('expired')) {
        return;
      }
      throw error;
    }
  }

  /**
   * Returns an AbortSignal that fires when abort() is called or TTL expires.
   * The signal fires with a reason indicating what triggered it.
   */
  get signal(): AbortSignal { // [!code highlight]
    const run = getRun<{ aborted: boolean; reason?: string; expired?: boolean }>(this.runId);
    const controller = new AbortController();
    const readable = run.getReadable<AbortMessage>();

    (async () => {
      const reader = readable.getReader();
      try {
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          if (value.type === "abort") {
            const reason = value.expired
              ? `${value.reason} (expired)`
              : value.reason;
            controller.abort(reason);
            break;
          }
        }
      } catch (error) {
        if (!controller.signal.aborted) {
          controller.abort(
            error instanceof Error ? error.message : "Stream read failed"
          );
        }
      } finally {
        reader.releaseLock();
      }
    })();

    return controller.signal;
  }
}
```

### Usage: Single Process

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Create a controller with a meaningful ID
const controller = await DistributedAbortController.create("chat:user-123");

// Get the signal and use it with fetch
const signal = controller.signal;
const response = await fetch("https://api.example.com/long-operation", {
  signal,
});

// Later: abort the operation
await controller.abort("User cancelled");
```

### Usage: Cross-Process Coordination

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Process A: Create the controller
const controller = await DistributedAbortController.create("task:build-123");
// start long operation using controller.signal...

// Process B: Reconnect and abort (no run ID sharing needed!)
const sameController = await DistributedAbortController.create("task:build-123"); // [!code highlight]
await sameController.abort("Cancelled by admin");

// Process C: Reconnect and listen
const anotherRef = await DistributedAbortController.create("task:build-123");
anotherRef.signal.addEventListener("abort", (e) => {
  console.log("Task was cancelled:", (e.target as AbortSignal).reason);
});
```

### Custom TTL

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Short-lived controller for a quick operation (5 minutes)
const shortLived = await DistributedAbortController.create("quick-task", {
  ttlMs: 5 * 60 * 1000,
});

// Long-lived controller for batch jobs (7 days)
const longLived = await DistributedAbortController.create("batch-job", {
  ttlMs: 7 * 24 * 60 * 60 * 1000,
});

// When TTL expires, the signal fires with expired reason
shortLived.signal.addEventListener("abort", (e) => {
  const reason = (e.target as AbortSignal).reason;
  if (reason?.includes("expired")) {
    console.log("Controller expired, cleaning up...");
  }
});
```

### API Route for Remote Abort

```typescript lineNumbers
import { DistributedAbortController } from "@/lib/distributed-abort-controller";

export async function POST(
  request: Request,
  { params }: { params: Promise<{ id: string }> }
) {
  const { id } = await params;
  const { reason } = await request.json();

  const controller = await DistributedAbortController.create(id);
  await controller.abort(reason || "Cancelled via API");

  return Response.json({ success: true });
}
```

### Client Cancel Button

```tsx lineNumbers
"use client";

export function CancelButton({ taskId }: { taskId: string }) {
  const handleCancel = async () => {
    await fetch(`/api/abort/${taskId}`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ reason: "User clicked cancel" }),
    });
  };

  return (
    <button type="button" onClick={handleCancel}>
      Cancel Operation
    </button>
  );
}
```

## Tips

* **Use semantic IDs** — Use meaningful IDs like `chat:123` or `task:abc` instead of random UUIDs
* **Create is idempotent** — Calling `create()` with the same ID reconnects to the existing controller
* **TTL auto-cleanup** — Workflows self-terminate after TTL expires; no manual cleanup needed
* **Signal is a getter** — Each access to `.signal` creates a new listener; cache it if needed
* **One-shot** — Once aborted or expired, the workflow completes; create a new controller for new operations

## Key APIs

* [`defineHook()`](/docs/api-reference/workflow/define-hook) — type-safe hook for the abort trigger
* [`getWritable()`](/docs/api-reference/workflow/get-writable) — write abort messages to the stream
* [`sleep()`](/docs/api-reference/workflow/sleep) — TTL timer for auto-expiration
* [`start()`](/docs/api-reference/workflow-api/start) — start the abort controller workflow
* [`getHookByToken()`](/docs/api-reference/workflow-api/get-hook-by-token) — find existing run by hook token
* [`getRun()`](/docs/api-reference/workflow-api/get-run) — reconnect to the workflow's readable stream


## Sitemap
[Overview of all docs pages](/sitemap.md)
