diff --git a/docs/develop/typescript/workflows/index.mdx b/docs/develop/typescript/workflows/index.mdx index 7173b90b2c..9b79035270 100644 --- a/docs/develop/typescript/workflows/index.mdx +++ b/docs/develop/typescript/workflows/index.mdx @@ -28,3 +28,4 @@ import * as Components from '@site/src/components'; - [Schedules](/develop/typescript/workflows/schedules) - [Timers](/develop/typescript/workflows/timers) - [Versioning](/develop/typescript/workflows/versioning) +- [Workflow Streams](/develop/typescript/workflows/workflow-streams) diff --git a/docs/develop/typescript/workflows/workflow-streams.mdx b/docs/develop/typescript/workflows/workflow-streams.mdx new file mode 100644 index 0000000000..59099e7b94 --- /dev/null +++ b/docs/develop/typescript/workflows/workflow-streams.mdx @@ -0,0 +1,574 @@ +--- +id: workflow-streams +title: Workflow Streams - TypeScript SDK +sidebar_label: Workflow Streams +toc_max_heading_level: 4 +keywords: + - workflow streams + - streaming + - signals + - updates + - queries + - typescript sdk +tags: + - Workflows + - Messages + - Streaming + - TypeScript SDK + - Temporal SDKs +description: Stream events from a Workflow to subscribers using the Temporal TypeScript SDK Workflow Streams contrib module. +--- + +**Workflow Streams** is a Temporal TypeScript SDK contrib library that gives a Workflow a durable, offset-addressed event channel built on Temporal's basic message primitives: Signals, Updates, and Queries. +It batch-publishes events to amortize per-Signal cost, deduplicates batches for exactly-once delivery to the log, supports topic filtering, and carries state across Continue-As-New for long-running streams. + +Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job. It is not suited to ultra-low-latency cases like real-time voice, and it targets modest fan-out: tens of publishers and subscribers per Workflow, not thousands. + +The Workflow hosts the event log. Publishers append events — the Workflow itself, Activities, or external processes via `WorkflowStreamClient`. Subscribers attach to the Workflow ID, optionally filter by **topic** (a string label set when publishing; topics are implicit and created on first publish), and consume events by long-polling from an offset they store. + +The package has no root entrypoint; import from one of two subpaths: + +- `@temporalio/workflow-streams/workflow` — the workflow-safe surface (`WorkflowStream`, `WorkflowStreamState`, ...). Bundles cleanly into Workflow code. +- `@temporalio/workflow-streams/client` — the client surface (`WorkflowStreamClient`, ...). Pulls in `crypto`, `@temporalio/activity`, and `@temporalio/client`, none of which resolve inside the Workflow sandbox; do not import from a Workflow file. + +:::tip SUPPORT, STABILITY, and DEPENDENCY INFO + +The `@temporalio/workflow-streams` module is currently in +[Public Preview](/evaluate/development-production-features/release-stages#public-preview). Refer to the +[Temporal product release stages guide](/evaluate/development-production-features/release-stages) for more information. + +Cross-language client support is on the roadmap. Only the TypeScript and Python clients are available today. + +The API may change before general availability. + +::: + +**Looking for...** + +- Runnable end-to-end samples (basic publish/subscribe, reconnecting subscriber, external publisher, bounded log, LLM streaming): [Workflow Streams samples](https://github.com/temporalio/samples-typescript/tree/main/workflow-streams). +- A complete LLM streaming example on this page (Activity publishes deltas, terminal consumer that resets on retry): [Stream LLM output](#stream-llm-output). +- Delivery guarantees, ordering, and retry semantics: [Delivery semantics](#delivery-semantics). +- History-size cost and tuning: [Architecture](#architecture). +- Long-running streams that need Continue-As-New: [Continue-As-New](#continue-as-new). + +## Choose where to host the stream + +A `WorkflowStream` is hosted inside a Workflow, so the first design choice is whether one Workflow handles both the work and the stream, or whether a separate Workflow exists only to host the stream. The choice is mostly about lifecycle. + +**Host the stream on the Workflow that does the work** when the events come from what that Workflow is already orchestrating: an agent run, an order pipeline, a chat session. The stream's lifecycle aligns with the run, starting when the run starts and ending when it returns. The Workflow ID you use to start the work is the same one subscribers attach to. This is the common shape for AI agents and most progress-streaming cases, where streaming is just one more thing the Workflow does as part of its job. + +**Use a dedicated Workflow for the stream alone** when the stream should outlive any single producer, accept fan-in from multiple unrelated sources, or be subscribable before any work has started. Producers publish from outside the stream Workflow (Activities of other Workflows, or external `WorkflowStreamClient` instances). The trade-off is explicit lifecycle management: a dedicated stream Workflow does not terminate on its own, so you need a signal-driven shutdown or a Continue-As-New strategy. + +Whichever shape you pick, the Workflow ID is the address subscribers use to attach. Multiple subscribers can attach to the same ID concurrently, which is the normal case for a UI with multiple browser tabs. Use distinct Workflow IDs for unrelated streams rather than packing them into one Workflow. + +## Enable streaming on a Workflow + +The library ships as `@temporalio/workflow-streams`; import the Workflow-side surface from `@temporalio/workflow-streams/workflow`. Enable streaming by constructing a `WorkflowStream` at the very top of your Workflow function, before any `await`. Construction must happen there because the stream's handlers have to be registered before the first publish Signal arrives; doing it after an `await` would miss any publishes that arrived before the run body resumed. + +```typescript +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; + +export interface OrderInput { + orderId: string; +} + +export async function orderWorkflow(input: OrderInput): Promise { + const stream = new WorkflowStream(); + // ... rest of the workflow +} +``` + +Constructing `WorkflowStream` creates the in-memory event log and registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. Constructing more than one `WorkflowStream` on the same Workflow silently replaces the handlers — the TypeScript Workflow runtime does not expose an inspection API for existing handlers, so the library cannot raise on a duplicate the way the Python SDK does. Construct exactly one `WorkflowStream` at the top of the Workflow function. + +If your Workflow uses Continue-As-New, see [Continue-As-New](#continue-as-new) below for how to carry stream state across runs so subscribers see no gap. + +## Publish from a Workflow + +Bind a topic name to its event type once via `stream.topic(name)`, then call `publish()` on the returned handle to append events. The handle carries the topic name and the type `T` so call sites don't have to repeat them on every publish, and so subscribers reading the same handle decode to the matching type. Repeated calls with the same name return the same handle instance. + +```typescript +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; + +export interface StatusEvent { + state: string; + progress?: number; + detail?: string; +} + +export interface OrderInput { + orderId: string; +} + +export async function orderWorkflow(input: OrderInput): Promise { + const stream = new WorkflowStream(); + const status = stream.topic('status'); + + status.publish({ state: 'validating', detail: 'checking inventory' }); + await validateOrder(input.orderId); + + status.publish({ state: 'charging', progress: 33, detail: 'authorizing payment' }); + await chargePayment(input.orderId); + + status.publish({ state: 'shipping', progress: 66, detail: 'dispatching to warehouse' }); + await dispatchOrder(input.orderId); + + status.publish({ state: 'completed', progress: 100 }); +} +``` + +`publish()` runs the default payload converter to encode each value. The codec chain (encryption, compression, and so on) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction. + +Unlike the Python SDK, `T` here is a compile-time annotation only: TypeScript has no runtime type representation, so the library cannot enforce per-topic type uniformity at the publish site. If two publishers bind the same topic name to different types, the mismatch is not caught at publish; the subscriber gets a decode error when it processes events from the mismatched publisher. A pre-built `Payload` may be passed to `publish()` regardless of the handle's `T`, taking the zero-copy fast path. + +## Publish from a client + +Any process that has a Temporal `Client` and the target Workflow ID can publish to that Workflow's stream by constructing a `WorkflowStreamClient`. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities. Construct one with `WorkflowStreamClient.create(client, workflowId)`, then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let `await using` flush on scope exit. + +When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but does not read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see [Delivery semantics](#delivery-semantics). + +```typescript +import { Client } from '@temporalio/client'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; + +export async function publishStatus(workflowId: string): Promise { + const temporalClient = new Client(); + await using streamClient = WorkflowStreamClient.create(temporalClient, workflowId, { + batchInterval: '200 milliseconds', + }); + + const status = streamClient.topic('status'); + status.publish({ state: 'started' }); + // ... + // Buffer is flushed automatically on `await using` scope exit. +} +``` + +The `await using` declaration relies on TypeScript 5.2+ and Node 20.11+; on older runtimes, call `await streamClient[Symbol.asyncDispose]()` explicitly at the end of the publishing scope. + +Inside an Activity scheduled by a Workflow, `WorkflowStreamClient.fromWithinActivity()` is a convenience that infers the Temporal `Client` and the parent Workflow ID from the Activity context, so you don't have to thread them through the Activity's input: + +```typescript +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; + +export interface Delta { + text: string; +} + +export async function streamDeltas(orderId: string): Promise { + await using client = WorkflowStreamClient.fromWithinActivity(); + const deltas = client.topic('delta'); + + for await (const delta of generateDeltas(orderId)) { + deltas.publish(delta); + Context.current().heartbeat(); + } + // Buffer is flushed automatically on scope exit. +} +``` + +For a standalone Activity (one started directly via `Client.workflow.start` rather than from a Workflow), there is no parent Workflow context to infer, so `fromWithinActivity()` throws. Fall back to the general pattern with `Context.current().client` and the target Workflow ID threaded through the Activity's input. + +Two operations give the application explicit control over when batches ship: `forceFlush: true` on a publish for latency, and `await client.flush()` for confirmation that prior publications have landed. + +Pass `{ forceFlush: true }` on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the client is alive (between first `publish()` and `Symbol.asyncDispose`); outside that, `forceFlush: true` queues the wake event but nothing ships until a flush or dispose occurs. The call returns immediately after appending to the buffer and signaling the flusher; it does not wait for delivery to the Workflow or to subscribers: + +```typescript +deltas.publish(delta, { forceFlush: true }); +``` + +Use it for latency-sensitive events: the first delta of a response so the user sees something fast, or punctuated events like `RETRY` and `STATUS_CHANGE`. See [Tuning](#tuning) for the trade-off against history pressure. + +Use `await client.flush()` when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Exiting `await using` already flushes on its way out, so the explicit call is only for barriers in the middle: + +```typescript +await using client = WorkflowStreamClient.fromWithinActivity(); +const deltas = client.topic('delta'); + +for (const delta of firstPhase()) { + deltas.publish(delta); +} + +await client.flush(); +const checkpointId = await recordPhaseOneComplete(); // only safe once phase-one events are durable + +for (const delta of secondPhase(checkpointId)) { + deltas.publish(delta); +} +``` + +`publish()` is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns; from inside a Workflow, it appends synchronously to the in-memory log (no buffer, nothing to flush). Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber does not slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals cannot keep up at all. + +If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of `publish()`. The choice (block, drop, error, sample) is application-specific, and Workflow Streams does not pick one for you. + +## Subscribe + +Subscribing uses the same client construction as publishing: `WorkflowStreamClient.create(client, workflowId)` from any process that has a Temporal `Client`, or `fromWithinActivity()` inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below. + +Subscribing from inside the host Workflow is intentionally unsupported. The Workflow only sees the successful return value of each Activity; the stream may carry partial output from attempts that failed and were retried. Letting the Workflow read its own stream would mix those two views and break the conduit role the Workflow is meant to play. + +The Workflow is the single source of truth for stream state, so any process bridging events to the outside world (an SSE proxy serving a browser, a forwarding Activity) can stay stateless — store the last delivered `item.offset`, and reconnects resume from that offset without coordinating with anyone but the Workflow. + +Once you have a client, iterate a topic handle's `subscribe()`, the counterpart to `publish()`. The handle's bound type drives decoding, so each `item.data` arrives as `T` via the client's payload converter. The codec chain is applied once at the Update envelope, not per item. + +```typescript +import { Client } from '@temporalio/client'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; + +export async function watchOrder(orderId: string): Promise { + const temporalClient = new Client(); + const stream = WorkflowStreamClient.create(temporalClient, orderId); + + const status = stream.topic('status'); + for await (const item of status.subscribe()) { + const evt = item.data; + console.log(`[${(evt.progress ?? 0).toString().padStart(3)}%] ${evt.state}: ${evt.detail ?? ''}`); + if (evt.state === 'completed') break; + } +} +``` + +The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. Callers don't need to wrap the iterator for the common cases. Two edge cases are worth knowing: an RPC timeout where Continue-As-New cannot be followed ends the iterator silently, and a validator rejection during a CAN handoff can surface as a `WorkflowUpdateFailedError`. A subscriber that does not need flushing can skip `await using` — the background flusher only runs for publishers. + +### Heterogeneous topics + +A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call `client.subscribe()` directly with a list of names (or `subscribe()` with no arguments for every topic). The default overload yields `WorkflowStreamItem`, so each item arrives as the raw `Payload` carrying encoding metadata. Dispatch on `item.topic` and decode the payload with `defaultPayloadConverter.fromPayload(item.data)`: + +```typescript +import { defaultPayloadConverter } from '@temporalio/common'; + +for await (const item of stream.subscribe(['status', 'progress'])) { + if (item.topic === 'status') { + const evt = defaultPayloadConverter.fromPayload(item.data); + console.log(`[status] ${evt.state}: ${evt.detail ?? ''}`); + } else if (item.topic === 'progress') { + const evt = defaultPayloadConverter.fromPayload(item.data); + console.log(`[progress] ${evt.message}`); + } +} +``` + +A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. Holding the raw `Payload` is also the right shape when you want to forward the bytes through to another system without decoding them. + +### Closing the stream {#closing-the-stream} + +A subscriber's `for await` does not know when the publisher is done. End-of-stream is an application-level concern; Workflow Streams does not impose a marker. Without coordination, a subscriber will keep polling until the Workflow reaches a terminal state, and a Workflow that returns immediately after its last publish can lose that publish's poll round-trip in the gap. + +How you close depends on what the application needs. As one example, a common pattern combines two pieces: + +1. **An in-band terminator.** The Workflow (or its Activity) publishes a sentinel event the subscriber recognizes and breaks on. In the `watchOrder` example above, `{ state: 'completed' }` is the minimal form, and the consumer's `if (evt.state === 'completed') break` is the matching half. Each subscription decides what its own end-of-stream marker is. +2. **A brief overlap before the Workflow returns.** A poll Update that is still in flight when the Workflow returns is surfaced to the iterator and consumed silently (the iterator either follows Continue-As-New or exits cleanly), and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it. + +There are two ways to provide that overlap. + +**Fixed sleep (simplest).** Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits: + +```typescript +import { sleep } from '@temporalio/workflow'; + +// at the end of the workflow function +status.publish({ state: 'completed', progress: 100 }); +await sleep('30 seconds'); +return result; +``` + +The sleep needs to be long enough to cover the time between when the terminator becomes visible and when the subscriber's next poll reaches the server, including any client-side cooldown and network round-trips. A few hundred milliseconds is tight under realistic conditions; thirty seconds is a generous default. The cost is small: the Workflow Run stays open for that duration but does no other work. + +**Acknowledgment handshake.** The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives: + +```typescript +import { condition, defineSignal, setHandler } from '@temporalio/workflow'; +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; + +export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator'); + +export async function chatWorkflow(input: ChatInput): Promise { + const stream = new WorkflowStream(); + let subscriberDone = false; + setHandler(subscriberAcknowledgedTerminator, () => { + subscriberDone = true; + }); + + // ... do work and publish events ... + + await condition(() => subscriberDone, '30 seconds'); + // Returns true if the ack arrived, false on timeout — either way, fall through. + return result; +} +``` + +The timeout is still required because the subscriber may not be attached, or may have gone away. With the ack on top, the typical case (subscriber online) exits as soon as the subscriber confirms receipt, regardless of how long the fallback timeout is. The full pattern is wired into the [Stream LLM output](#stream-llm-output) example below. + +**Inspecting terminal status.** `subscribe()` exits cleanly when the Workflow reaches `COMPLETED`, `FAILED`, `CANCELLED`, `TERMINATED`, or `TIMED_OUT`, but does not distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call `await temporalClient.workflow.getHandle(workflowId).describe()` after the loop returns to inspect the Workflow's status. + +## Continue-As-New {#continue-as-new} + +If your Workflow runs for minutes and finishes (a single chat completion, an order pipeline, a one-shot agent), you can skip this section. Continue-As-New becomes relevant for streams that run for hours or accumulate thousands of events, where you need to roll the run over to keep history bounded. + +Subscribers automatically follow Continue-As-New chains, so a long-running Workflow can roll over without disrupting active consumers. Workflow IDs are stable across Continue-As-New, so the iterator simply fetches a fresh handle for the same Workflow ID and continues polling from the carried offset. CAN-following requires the client retained from `WorkflowStreamClient.create()` or `fromWithinActivity()`; clients constructed directly from a single `WorkflowHandle` cannot re-target the new run. + +To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add an optional `streamState?: WorkflowStreamState` field to your Workflow input, pass it to the constructor, and call `stream.continueAsNew(buildArgs)` to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls `continueAsNew` with the args produced by `buildArgs(postDrainState)`: + +```typescript +import { workflowInfo } from '@temporalio/workflow'; +import { WorkflowStream, type WorkflowStreamState } from '@temporalio/workflow-streams/workflow'; + +export interface WorkflowInput { + itemsProcessed: number; + streamState?: WorkflowStreamState; +} + +export async function longRunningWorkflow(input: WorkflowInput): Promise { + const stream = new WorkflowStream(input.streamState); + let itemsProcessed = input.itemsProcessed; + + while (true) { + await doOneIteration(stream); + itemsProcessed++; + + if (workflowInfo().continueAsNewSuggested) { + await stream.continueAsNew((state) => [ + { + itemsProcessed, + streamState: state, + }, + ]); + } + } +} +``` + +The optional `streamState?` on the input field is required: `priorState` is `undefined` on a fresh start and a `WorkflowStreamState` after a rollover. The `buildArgs` lambda receives the post-detach `WorkflowStreamState` as its only argument so the snapshot is guaranteed to happen *after* pollers detach. + +To pass other Continue-As-New parameters such as `taskQueue`, `searchAttributes`, or `workflowRunTimeout`, use the explicit recipe with `makeContinueAsNewFunc` instead: + +```typescript +import { allHandlersFinished, condition, makeContinueAsNewFunc } from '@temporalio/workflow'; + +stream.detachPollers(); +await condition(allHandlersFinished); +const continueWithOptions = makeContinueAsNewFunc({ + taskQueue: 'other-tq', +}); +await continueWithOptions({ + itemsProcessed, + streamState: stream.getState(), +}); +``` + +The carried `WorkflowStreamState` includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. (Individual publish Signals and subscribe Update responses can also exceed the limit, but the carried state is the most acute case because it accumulates the full log window.) Offload the bytes via [External Storage](/external-storage) so each item is a small reference rather than the full payload, and combine that with `truncate()` to keep the carried log itself small. + +## Tuning + +The most important question when tuning is: how often do you want to update your UI? That answer drives the trade-off between user-perceived latency and the number of history events your Workflow accumulates. The library defaults assume a slow-moving UI; LLM token streaming and other interactive cases need lower latency, which means tuning. + +The trade-off is direct. Each batched publish is one Signal, and each subscriber poll is one Update. Each Signal and each Update accumulates against the Workflow's history. A more responsive UI means more messages and more history per second; messages drive workload (and on metered deployments, billing), while history accumulates against Temporal's per-run limits. For long-running streams, plan a [Continue-As-New](#continue-as-new) policy from the start. + +### Settings that matter most + +- **`batchInterval`** (default 2 seconds). Maximum time between automatic flushes from the client. Lower it to make the stream feel live; raise it to amortize Signal cost. For an LLM token stream feeding a chat UI, `'200 milliseconds'` is a good starting point: the user perceives it as live, and a 30-second response generates roughly 150 publish Signals rather than several hundred. Below 100 ms the per-Signal RPC overhead starts to dominate. + +For per-publish overrides where one specific event needs lower latency than the batch interval (for example, the first delta of a response so the user sees something fast, or punctuated events like `RETRY` and `STATUS_CHANGE`), pass `{ forceFlush: true }` on that publish. Don't make this the default mode: per-token `forceFlush: true` on a 500-token completion produces 500 publish Signals, which is meaningful but tractable; per-character `forceFlush: true` is not. + +### Other settings + +You usually do not need to touch these, but they are available when the basic settings are not enough: + +- **`maxBatchSize`** (default unbounded). Caps the number of items per batch. With the default, only `batchInterval` bounds batch size, so a hot publisher can accumulate enough items between intervals that the resulting Signal exceeds Temporal's per-message gRPC payload limit. Set `maxBatchSize` to bound by item count, or call `{ forceFlush: true }` after each logical chunk to bound by application boundaries (for example, publish per generated sentence in a TTS Activity so each Signal carries one audio chunk). For large items, offload via [External Storage](/external-storage) so each item is a small reference. +- **`pollCooldown`** (subscriber-side, default 100 ms). The subscriber sleeps for this interval between polls. The cooldown is skipped only when a poll response was capped at the ~1 MB gRPC limit and more items remain (a `more_ready` flag in the response), so the next poll can drain the rest immediately. That path is an optimization for bursty producers; in the steady state, every poll waits the cooldown before the next. Hold a single iterator and consume from it rather than opening and closing subscriptions in a loop. +- **`maxRetryDuration`** (default 10 minutes). How long the client retries a failed publish batch before giving up and raising `FlushTimeoutError`. Tune higher if your application can tolerate longer outages while a publisher retries through transient failures; lower if you want failures to surface quickly. +- **`publisherTtl`** (default 15 minutes). How long the Workflow retains per-publisher deduplicate state. At each Continue-As-New, entries older than this are dropped. Tune higher if your publishers can be silent for extended windows. + +The last two settings are related. Keep `maxRetryDuration < publisherTtl` so a long-running retry cannot outlast its dedup record and produce a duplicate when it finally succeeds. If you tune one, tune the other. See [Delivery semantics](#delivery-semantics) for the full failure model. + +## Delivery semantics + +**Exactly-once at the execution layer.** Each `(publisherId, sequence)` batch lands in the Workflow's event log at most once, even if the publisher's underlying Signal is retried by the SDK or the network. Once an event is in the log, every subscriber that polls past its offset will see it, and deduplicate state is carried across Continue-As-New so a retried publish that arrives after a rollover still lands at most once. + +**Ordering.** The log imposes a single total order on all events, fixed once written: an event at offset N stays at offset N on every read. Within one publisher (one `WorkflowStreamClient` instance, or the Workflow itself), events appear in publish order. Across concurrent publishers, the interleaving is whatever the Workflow saw when serializing inbound Signals; the order is stable once recorded but not under application control. If event A must precede event B, publish them from the same publisher. + +**Activity retries surface to subscribers.** When an Activity that publishes events fails partway through and Temporal retries it, *both* attempts' events appear in the stream. Concretely: an Activity that publishes three `TEXT_DELTA` events and then errors, then retries and publishes its full output, will deliver three partial events followed by the complete sequence. The Workflow itself sees only the successful attempt's return value (that's what durable execution hides), but a UI subscribed to the stream will see the partial output unless it dedupes. Consumers must reset or annotate on retry events; the library does not do this automatically. + +The conventional pattern is for an Activity that detects it's on a retry attempt to publish a `RETRY` event with `{ forceFlush: true }`, and for the consumer to clear or annotate prior-attempt output when it sees one. Treat the stream as an append-only log of attempts and let an idempotent consumer reducer reconcile them: overwrite on terminal events like `STATUS_CHANGE` or `TEXT_COMPLETE`, or reset an accumulator on a sentinel like `AGENT_START` before deltas resume. Because the Workflow processes only Activity return values rather than reading the stream itself, its own state stays independent of these retried events. + +This is the price of streaming events as they happen rather than waiting for the Workflow's durable view to settle. If the library waited for a successful Activity return before surfacing anything, there would be nothing to stream. + +**Other failure modes.** Events still in a publisher's in-memory client buffer are lost if the process crashes before they ship. Subscribers that handle an item and crash before persisting their next offset will reprocess that item on resume. Build consumer state with both in mind. + +Two limits on the deduplication window are worth understanding: + +- **`publisherTtl`** (default 15 minutes). Retention for the per-publisher deduplicate state. At each Continue-As-New, deduplicate entries whose `lastSeen` is older than this are dropped. `lastSeen` is updated on each *successful* publish (not on each retry attempt), so a publisher that retries through a long partition without success can still age out. A publisher that returns after a longer pause may produce a duplicate. Tune upward via `stream.continueAsNew(buildArgs, { publisherTtl: '...' })` if your publishers can be silent for extended windows. +- **`maxRetryDuration`** (default 10 minutes). A `WorkflowStreamClient` retries a failed batch for up to this long. If the duration elapses with the batch still pending (for example, during a sustained network partition), the client gives up, the pending batch is dropped, and a `FlushTimeoutError` is raised. + + On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. Subsequent publishes resume cleanly with the next sequence. One operational caveat: the `FlushTimeoutError` is raised from inside the background flusher task and terminates it. Until you call `await client.flush()` or exit the `await using` scope, subsequent publishes accumulate in the buffer with no flusher to ship them. + +**The two limits must satisfy `maxRetryDuration < publisherTtl`.** If a publisher's retry window exceeds the dedup retention, the dedup state for that publisher can age out (at the next Continue-As-New) before the retry lands. A retry that arrives after its dedup record has been pruned is treated as a fresh publish, and if the original delivery had also succeeded, the same logical batch lands twice. The defaults (10 minutes < 15 minutes) satisfy this; if you tune one, tune the other to preserve the relationship. + +## Architecture + +The user-facing API hides three pieces of machinery worth understanding when you tune throughput, debug delivery, or reason about history size. + +**Append-only log inside the Workflow.** `WorkflowStream` keeps an in-memory list of `(topic, data)` entries inside the Workflow's state, each with a monotonically increasing global offset. Subscribers maintain their own cursor and on each long-poll receive the next range past it. Because the log lives in Workflow state, it is replay-safe and is carried across Continue-As-New via `WorkflowStreamState`. + +Two mechanisms bound log growth, and they do different jobs: + +- **`stream.truncate(upToOffset)`** drops entries from the in-memory log (and therefore from the carried Continue-As-New payload). It does not remove publish Signals already recorded in history. Calling with an offset past the end raises an `ApplicationFailure` with `type: 'TruncateOutOfRange'`. +- **Continue-As-New** starts a fresh history. This is the only way to shrink history; `truncate` alone cannot. + +A subscriber whose offset falls below the new base after a `truncate()` is silently advanced. Internally, the poll raises `ApplicationFailure` with `type: 'TruncatedOffset'`; the TypeScript client catches it and resets to offset 0, which the Workflow reads as "from the current base." The iterator does not raise, but the subscriber may re-receive items already in the log past the new base. Applications that depend on seeing every event exactly once must keep subscribers ahead of truncation or implement their own gap and re-delivery handling using `item.offset`. + +**Wire-level handlers.** The three handlers registered when you construct a `WorkflowStream` are `__temporal_workflow_stream_publish` (the Signal that receives batched publishes), `__temporal_workflow_stream_poll` (the long-poll Update that subscribers use), and `__temporal_workflow_stream_offset` (the Query that reports the current head offset). Poll responses are capped at roughly 1 MB by accumulating items until the next would exceed the budget, so high-throughput producers see a steady stream of small batches. A single item that exceeds 1 MB on its own is admitted unconditionally; offload large items via [External Storage](/external-storage) so each item is a small reference. Each item's wire `data` is a base64-encoded `temporal.api.common.v1.Payload` protobuf, preserving payload metadata for typed decode and cross-language interop. + +**Batching and deduplicating.** Every batch carries a unique identifier (the client's id paired with a monotonic batch sequence number), so a Signal retried by the SDK or the network deduplicates to a single landing in the Workflow's event log. Deduplicate state is part of the Workflow's carried state, so the guarantee survives Continue-As-New (subject to `publisherTtl`). + +This dedup applies at the Signal layer, not the Activity layer. An *Activity retry* is a different concept from a *publish retry*: when Temporal retries the Activity, the retried execution constructs a new `WorkflowStreamClient` with its own client id, so from the stream's perspective every attempt is a fresh publisher whose batches will not deduplicate against the prior attempt's. That is why retried-attempt events appear in the stream alongside the successful attempt's output. + +### Gotchas + +A few details worth knowing about, mostly relevant if you're writing custom message handlers or pushing the library to its limits. + +- **`WorkflowStreamClient` is single-event-loop.** The client buffer is mutated on the publish path and read from the background flusher inside one Node event loop. Don't call `publish()` from a Worker thread; route events back to the loop that owns the client. +- **Constructing two `WorkflowStream`s silently replaces handlers.** The TypeScript Workflow runtime does not expose an inspection API for existing handlers, so the library cannot raise on a duplicate the way the Python SDK does. Construct exactly one `WorkflowStream` per Workflow at the top of the function. +- **Type bindings aren't shared across publishers.** Each `WorkflowStream` and each `WorkflowStreamClient` records topic types only for its own instance, and the type parameter `T` is erased at compile time, so no runtime check enforces uniformity. If two publishers bind the same topic name to different types, the mismatch is not caught at publish, and the subscriber gets a decode error when it processes events from the mismatched publisher. +- **Custom payload converters.** A `WorkflowStreamClient` created via `WorkflowStreamClient.create(client, ...)` picks up the client's configured payload converter; subscribers decode through the same converter. The Workflow side always uses `defaultPayloadConverter`. If you ship a custom converter, make sure both sides agree, or stick to types the default converter handles. +- **Cross-realm `Uint8Array` for binary publishes.** Hand-publishing a `Uint8Array` from Workflow code uses a dedicated code path that constructs a `binary/plain` Payload directly, because the sandbox's `TextEncoder` returns a host-realm `Uint8Array` that fails `instanceof` checks against the sandbox's own globals. You generally don't need to think about this, but if you bypass the workflow-side handle and construct payloads manually, prefer the workflow-side `WorkflowStream` API rather than building payloads by hand. + +## Application: Stream LLM output {#stream-llm-output} + +The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive; the Workflow kicks off the Activity and waits for the consumer to acknowledge end-of-stream; the consumer subscribes, accumulates the deltas, and clears its accumulated state on `RETRY` before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser; whatever holds the displayed state calls `render()` to display it. + +If your Activity can retry, the consumer side has to account for it: a retried attempt is a fresh publisher, so its output appears in the stream alongside the previous attempt's. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a `RETRY` event. The example wires up that pattern; see [Delivery semantics](#delivery-semantics) for the precise guarantees. + +```typescript +// activities.ts +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import OpenAI from 'openai'; + +export interface TextDelta { + text: string; +} + +export interface RetryEvent { + attempt: number; +} + +export async function streamCompletion(prompt: string): Promise { + const attempt = Context.current().info.attempt; + await using streamClient = WorkflowStreamClient.fromWithinActivity({ + batchInterval: '200 milliseconds', + }); + // Disable provider-side retries; let Temporal own retry policy at the Activity layer. + const openai = new OpenAI({ maxRetries: 0 }); + + const deltas = streamClient.topic('delta'); + const retry = streamClient.topic('retry'); + const close = streamClient.topic>('close'); + + // Tell consumers an earlier attempt's deltas are stale. + if (attempt > 1) { + retry.publish({ attempt }, { forceFlush: true }); + } + + const oaiStream = await openai.chat.completions.create({ + model: 'gpt-4o-mini', + messages: [{ role: 'user', content: prompt }], + stream: true, + }); + + const full: string[] = []; + let first = true; + for await (const chunk of oaiStream) { + const text = chunk.choices[0]?.delta?.content; + if (!text) continue; + // forceFlush only on the first delta so the user sees something + // immediately; subsequent deltas batch at the 200 ms interval. + deltas.publish({ text }, first ? { forceFlush: true } : undefined); + first = false; + full.push(text); + } + close.publish({}); + return full.join(''); +} +``` + +```typescript +// workflows.ts +import { condition, defineSignal, executeActivity, setHandler } from '@temporalio/workflow'; +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; +import type * as activities from './activities'; + +export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator'); + +export interface ChatInput { + prompt: string; +} + +export async function chatWorkflow(input: ChatInput): Promise { + const stream = new WorkflowStream(); + let subscriberDone = false; + setHandler(subscriberAcknowledgedTerminator, () => { + subscriberDone = true; + }); + + const result = await executeActivity('streamCompletion', input.prompt, { + startToCloseTimeout: '5 minutes', + }); + + // Wait for the subscriber to ack the terminal `close` event. The timeout + // is a fallback for when no subscriber is attached; with the ack, the + // typical case exits as soon as the subscriber confirms. + await condition(() => subscriberDone, '30 seconds'); + return result; +} +``` + +```typescript +// consumer.ts: accumulates the model's output and resets on retry +import { Client } from '@temporalio/client'; +import { defaultPayloadConverter } from '@temporalio/common'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { subscriberAcknowledgedTerminator } from './workflows'; + +export async function streamChat(chatId: string): Promise { + const temporalClient = new Client(); + // Subscribe-only; no `await using` needed because the flusher only runs for publishers. + const stream = WorkflowStreamClient.create(temporalClient, chatId); + const output: string[] = []; + + function render(): void { + // ... display the accumulated output (terminal redraw, UI update, etc.) + } + + for await (const item of stream.subscribe(['delta', 'retry', 'close'])) { + if (item.topic === 'retry') { + // Earlier attempt's deltas are stale; drop what we've shown. + output.length = 0; + render(); + } else if (item.topic === 'delta') { + const delta = defaultPayloadConverter.fromPayload(item.data); + output.push(delta.text); + render(); + } else if (item.topic === 'close') { + // Acknowledge so the Workflow can return without waiting on the fallback timeout. + await temporalClient.workflow.getHandle(chatId).signal(subscriberAcknowledgedTerminator); + break; + } + } + + return output.join(''); +} +``` + +A few choices in this shape are deliberate: + +- The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value, never reading its own stream — see [Publish from a client](#publish-from-a-client) for why. +- The Activity publishes a `RETRY` event when `Context.current().info.attempt > 1`. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see [Delivery semantics](#delivery-semantics)). +- Termination uses an *ack handshake*: the consumer signals the Workflow once it has received the `close` event, so the Workflow can return as soon as the subscriber confirms. The `condition` timeout is the fallback when no subscriber is attached (see [Closing the stream](#closing-the-stream) for the simpler fixed-sleep alternative). +- `{ forceFlush: true }` is used only on the first delta and on the `RETRY` sentinel, where latency matters. Subsequent deltas batch at the 200 ms `batchInterval`; per-delta `forceFlush: true` would generate one Signal per token (see [Tuning](#tuning) for the trade-off). + +## See also + +- [Workflow Streams samples (samples-typescript)](https://github.com/temporalio/samples-typescript/tree/main/workflow-streams): runnable scenarios covering basic publish/subscribe, reconnecting subscribers, external publishers, bounded logs, and LLM streaming. +- [`@temporalio/workflow-streams` API reference](https://typedoc.io/packages/@temporalio/workflow-streams). +- [Workflow message passing](/develop/typescript/workflows/message-passing): Signals, Updates, and Queries that Workflow Streams is built on. diff --git a/docs/encyclopedia/workflow-message-passing/workflow-streams.mdx b/docs/encyclopedia/workflow-message-passing/workflow-streams.mdx new file mode 100644 index 0000000000..f925629992 --- /dev/null +++ b/docs/encyclopedia/workflow-message-passing/workflow-streams.mdx @@ -0,0 +1,350 @@ +--- +id: workflow-streams +title: Workflow Streams +sidebar_label: Workflow Streams +description: + Learn how Workflow Streams provides a durable, offset-addressed event channel for streaming events from a Workflow to + external subscribers. +slug: /workflow-streams +toc_max_heading_level: 4 +keywords: + - workflow streams + - streaming + - workflow events + - publish subscribe + - topics + - signals + - updates + - queries + - batching + - deduplication + - continue as new + - llm streaming + - ai agent streaming +tags: + - Concepts + - Workflows + - Messages + - Streaming +--- + +import { RelatedReadContainer, RelatedReadItem } from '@site/src/components'; + +:::tip SUPPORT, STABILITY, and DEPENDENCY INFO + +Workflow Streams is currently in +[Public Preview](/evaluate/development-production-features/release-stages#public-preview). The API may change before +general availability. + +Cross-language client support is on the roadmap. The TypeScript and Python clients are available today. + +::: + +This page covers the following: + +- [What is Workflow Streams?](#what-is-workflow-streams) +- [How Workflow Streams works](#how-it-works) +- [Choose where to host the stream](#choose-where-to-host) +- [Closing the stream](#closing-the-stream) +- [Delivery semantics](#delivery-semantics) +- [Tuning](#tuning) +- [Continue-As-New](#continue-as-new) + +## What is Workflow Streams? {#what-is-workflow-streams} + +A Workflow Stream is a durable event channel hosted inside a Workflow. Publishers append events to topics on the stream. +Subscribers attach to the Workflow by its Workflow ID, optionally filter by topic, and consume events by long-polling. +Subscribers can disconnect and resume from where they left off. + +Use Workflow Streams when outside observers need to follow the progress of a Workflow and its Activities as work +happens: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting +intermediate results from a data job. + +Workflow Streams is not suited to ultra-low-latency cases like real-time voice. It targets modest fan-out: tens of +publishers and subscribers per Workflow, not thousands. + + + + + + +## How Workflow Streams works {#how-it-works} + +The stream lives inside a Workflow, not on a separate broker. +A Workflow creates the stream by constructing a `WorkflowStream` object, which sets up an in-memory, append-only event +log and registers [Signal](/sending-messages#sending-signals), [Update](/sending-messages#sending-updates), and +[Query](/sending-messages#sending-queries) handlers that publishers and subscribers interact with. +A Workflow can have at most one stream. +The Workflow ID is the address that publishers and subscribers use to connect. + +### Publishing + +A **publisher** appends events to the stream. +The Workflow itself, its Activities, and external processes can all publish to the same stream: + +- **The Workflow itself** appends events synchronously to the in-memory log. + Events are immediately available to subscribers on the next poll. +- **Activities** scheduled by the Workflow use a client that infers the Temporal Client and parent Workflow ID from + the Activity context. +- **External processes** (HTTP backends, starters, scripts, Activities of other Workflows) use a client constructed with + an explicit Temporal Client and Workflow ID. + +Activities and external processes publish through a client-side buffer. The client accumulates events in memory and +flushes the buffer as a single Signal on a configurable batch interval (default: 2 seconds). A single flush batches all +events across all topics that have accumulated since the last flush. This amortizes the cost of Signals: instead of one +Signal per event, one Signal carries an entire batch. + +To send a specific event without waiting for the next interval, mark the publish with a force-flush flag. The +force-flush flag wakes the background flusher immediately. The call returns after appending to the buffer and signaling +the flusher; it does not wait for delivery. + +Each client has a unique publisher ID and a monotonic sequence number. Every batch is tagged with this pair so that a +Signal retried by the SDK or the network deduplicates to a single landing in the log. See +[Delivery semantics](#delivery-semantics) for the full guarantee. + + + + + + + + +### Topics + +A **topic** is a string label attached to each event when published. Topics are implicit: they are created on first +publish, not declared ahead of time. + +A topic handle binds a name to a type so that publish and subscribe call sites carry the type with them. Subscribers can +filter by one or more topic names, or subscribe to all topics on the stream. + +Multiple publishers can publish to the same topic. Ordering is guaranteed within a single publisher. Across publishers, +events interleave in whatever order the Workflow receives the Signals. + +### Subscribing + +A **subscriber** is any process with a Temporal Client that long-polls the Workflow for new events. +Each poll is an Update that blocks until events are available past the subscriber's current offset, then returns a batch. + +The subscriber maintains its own offset. +On reconnect, the subscriber resumes from its last offset without coordinating with anyone but the Workflow. +Multiple subscribers can attach to the same Workflow concurrently. + +Poll responses are capped at roughly 1 MB. +When a response hits the cap, the subscriber polls again immediately to drain the rest before applying its cooldown. + +Subscribing from inside the Workflow that hosts the stream is not supported. +The Workflow processes only the successful return value of each Activity, while the stream may carry partial output from +attempts that failed and were retried. +Letting the Workflow read its own stream would mix those two views. + + + + + + +### Log management + +Two mechanisms bound the growth of the in-memory log: + +- **Truncation** drops entries below a given offset from the in-memory log (and from the Continue-As-New payload). It + does not remove publish Signals already recorded in Workflow history. A subscriber whose offset falls below the new + base after truncation is silently advanced to the current base. +- **Continue-As-New** starts a fresh Workflow history. This is the only way to shrink history. See + [Continue-As-New](#continue-as-new). + +## Choose where to host the stream {#choose-where-to-host} + +The first design choice is whether the Workflow that does the work also hosts the stream, or whether a separate Workflow +exists only to host the stream. + +**Host the stream on the Workflow that does the work** when the events come from what that Workflow is already +orchestrating: an agent run, an order pipeline, a chat session. The stream's lifecycle aligns with the work. The +Workflow ID that starts the work is the same one subscribers attach to. + +**Use a dedicated Workflow for the stream** when the stream should outlive any single producer, accept fan-in from +multiple unrelated sources, or be subscribable before any work has started. Producers publish from outside the stream +Workflow (Activities of other Workflows, or external clients). The trade-off is explicit lifecycle management: a +dedicated stream Workflow does not terminate on its own, so you need a Signal-driven shutdown or a Continue-As-New +strategy. + +Use distinct Workflow IDs for unrelated streams rather than packing them into one Workflow. + +## Closing the stream {#closing-the-stream} + +A subscriber's iterator does not know when the publisher is done. End-of-stream is an application-level concern. Without +coordination, a subscriber keeps polling until the Workflow reaches a terminal state. + +A Workflow that returns immediately after its last publish can return before the subscriber's next poll fetches that +event. Two patterns handle this. + +### Fixed sleep + +Sleep between the last publish and the Workflow return so any in-flight poll has time to fetch the final event: + +1. The Workflow (or its Activity) publishes a sentinel event the subscriber recognizes (for example, + `{ state: "completed" }`). +2. The Workflow sleeps for a duration long enough to cover the subscriber's poll round-trip (30 seconds is a generous + default). +3. The Workflow returns. + +The cost is small: the Workflow stays open for the sleep duration but does no other work. + +### Acknowledgment handshake + +The subscriber sends a Signal to the Workflow once it has received the sentinel event. The Workflow waits for that +Signal up to a timeout, returning as soon as the ack arrives: + +1. The Workflow (or its Activity) publishes a sentinel event. +2. The subscriber receives the sentinel and signals the Workflow. +3. The Workflow's wait condition resolves and the Workflow returns. + +The timeout is still required because the subscriber may not be attached. With the ack, the typical case (subscriber +online) exits as soon as the subscriber confirms receipt. + +### Inspecting terminal status + +The subscribe iterator exits cleanly when the Workflow reaches `COMPLETED`, `FAILED`, `CANCELLED`, `TERMINATED`, or +`TIMED_OUT`, but does not distinguish among them. If your application needs to know which, describe the Workflow handle +after the loop returns to inspect the status. + +## Delivery semantics {#delivery-semantics} + +### Exactly-once publishing + +Each `(publisher_id, sequence)` batch lands in the log at most once, even if the publisher's underlying Signal is +retried by the SDK or the network. Once an event is in the log, every subscriber that polls past its offset sees it. +Deduplicate state is carried across Continue-As-New, so a retried publish that arrives after a rollover still lands at +most once. + +### Ordering + +The log imposes a single total order on all events, fixed once written: an event at offset N stays at offset N on every +read. Within one publisher (one client instance or the Workflow itself), events appear in publish order. Across +concurrent publishers, the interleaving is whatever the Workflow saw when serializing inbound Signals. The order is +stable once recorded but not under application control. If event A must precede event B, publish them from the same +publisher. + +### Activity retries surface to subscribers + +When an Activity that publishes events fails partway through and Temporal retries it, both attempts' events appear in +the stream. An Activity that publishes three events and then errors, then retries and publishes its full output, +delivers three partial events followed by the complete sequence. The Workflow itself sees only the successful attempt's +return value, but a subscriber sees all attempts' output. + +Consumers must handle this. The conventional pattern is for an Activity that detects it is on a retry attempt to publish +a retry-sentinel event with force-flush. The consumer clears or annotates prior-attempt output when it sees the +sentinel. + +This is the price of streaming events as they happen rather than waiting for the Workflow's durable view to settle. If +the library waited for a successful Activity return before surfacing anything, there would be nothing to stream. + +### Other failure modes + +- Events still in a publisher's in-memory buffer are lost if the process crashes before they ship. +- Subscribers that handle an item and crash before persisting their next offset reprocess that item on resume. + +### Deduplication window + +Two settings control the deduplication window: + +| Setting | Default | Description | +| ---------------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------- | +| **publisher TTL** | 15 minutes | How long the Workflow retains per-publisher deduplicate state. Entries older than this are pruned at each Continue-As-New. | +| **max retry duration** | 10 minutes | How long a client retries a failed publish batch before dropping it and raising an error. | + +These two settings must satisfy **max retry duration < publisher TTL**. If a publisher's retry window exceeds the dedup +retention, the dedup state can age out before the retry lands. A retry that arrives after its dedup record has been +pruned is treated as a fresh publish, producing a duplicate. If you tune one, tune the other. + +## Tuning {#tuning} + +The most important question when tuning is: how often do you want to update your UI? That answer drives the trade-off +between user-perceived latency and the number of history events your Workflow accumulates. + +Each batched publish is one Signal, and each subscriber poll is one Update. Each Signal and each Update counts against +the Workflow's history. A more responsive UI means more messages and more history per second. For long-running streams, +plan a [Continue-As-New](#continue-as-new) policy from the start. + +### Batch interval + +The batch interval (default: 2 seconds) is the maximum time between automatic flushes from the client. Lower it to make +the stream feel live. Raise it to amortize Signal cost. For an LLM token stream feeding a chat UI, 200 milliseconds is a +good starting point: the user perceives it as live, and a 30-second response generates roughly 150 publish Signals. +Below 100 milliseconds, the per-Signal RPC overhead starts to dominate. + +For per-publish overrides where one event needs lower latency than the batch interval (the first delta of a response, or +punctuated events like retry sentinels), use force-flush on that publish. Per-token force-flush on a 500-token +completion produces 500 publish Signals, which is meaningful but tractable. Per-character force-flush is not. + +### Other settings + +| Setting | Default | Description | +| ------------------ | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **max batch size** | unbounded | Caps the number of items per batch. Without this, only batch interval bounds batch size. A hot publisher can accumulate enough items that the resulting Signal exceeds Temporal's per-message gRPC payload limit. | +| **poll cooldown** | 100 ms | Minimum interval between subscriber polls. Skipped only when a poll response was capped at the 1 MB limit and more items remain. | + +## Continue-As-New {#continue-as-new} + +If your Workflow runs for minutes and finishes (a single chat completion, an order pipeline, a one-shot agent), you can +skip this section. Continue-As-New becomes relevant for streams that run for hours or accumulate thousands of events, +where you need to roll the Workflow over to keep history bounded. + +Subscribers automatically follow Continue-As-New chains. Workflow IDs are stable across Continue-As-New, so the +subscriber fetches a fresh handle for the same Workflow ID and continues polling from its carried offset. + +To roll a long-running streaming Workflow over without subscribers seeing a gap: + +1. Add an optional stream-state field to your Workflow input. Pass it to the `WorkflowStream` constructor. On a fresh + start, this field is empty. After a rollover, it carries the accumulated state. +2. When the Workflow decides to roll over (for example, when `continueAsNewSuggested` is true), call the stream's + continue-as-new helper. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls + `continueAsNew` with the snapshot. + +The carried state includes the entire in-memory log of the previous run. Streams that carry large items can hit +Temporal's per-payload size limit at the rollover. To keep the carried state small, offload large payloads via external +storage so each item is a small reference, and use truncation to drop entries that subscribers have already consumed. + + + + + diff --git a/sidebars.js b/sidebars.js index 79d3fbd669..f4d3c4c97e 100644 --- a/sidebars.js +++ b/sidebars.js @@ -41,9 +41,7 @@ module.exports = { type: 'category', label: 'Serverless Workers', link: { type: 'doc', id: 'evaluate/development-production-features/serverless-workers/index' }, - items: [ - 'evaluate/development-production-features/serverless-workers/demo', - ], + items: ['evaluate/development-production-features/serverless-workers/demo'], }, { type: 'category', @@ -165,9 +163,7 @@ module.exports = { type: 'doc', id: 'develop/go/workers/serverless-workers/index', }, - items: [ - 'develop/go/workers/serverless-workers/aws-lambda', - ], + items: ['develop/go/workers/serverless-workers/aws-lambda'], }, ], }, @@ -179,10 +175,7 @@ module.exports = { type: 'doc', id: 'develop/go/client/index', }, - items: [ - 'develop/go/client/temporal-client', - 'develop/go/client/namespaces' - ], + items: ['develop/go/client/temporal-client', 'develop/go/client/namespaces'], }, { type: 'category', @@ -192,10 +185,7 @@ module.exports = { type: 'doc', id: 'develop/go/nexus/index', }, - items: [ - 'develop/go/nexus/quickstart', - 'develop/go/nexus/feature-guide', - ], + items: ['develop/go/nexus/quickstart', 'develop/go/nexus/feature-guide'], }, { type: 'category', @@ -205,10 +195,7 @@ module.exports = { type: 'doc', id: 'develop/go/platform/index', }, - items: [ - 'develop/go/platform/observability', - 'develop/go/platform/enriching-ui', - ], + items: ['develop/go/platform/observability', 'develop/go/platform/enriching-ui'], }, { type: 'category', @@ -298,9 +285,7 @@ module.exports = { type: 'doc', id: 'develop/java/workers/index', }, - items: [ - 'develop/java/workers/run-worker-process', - ], + items: ['develop/java/workers/run-worker-process'], }, { type: 'category', @@ -310,10 +295,7 @@ module.exports = { type: 'doc', id: 'develop/java/client/index', }, - items: [ - 'develop/java/client/temporal-client', - 'develop/java/client/namespaces', - ], + items: ['develop/java/client/temporal-client', 'develop/java/client/namespaces'], }, { type: 'category', @@ -323,10 +305,7 @@ module.exports = { type: 'doc', id: 'develop/java/nexus/index', }, - items: [ - 'develop/java/nexus/quickstart', - 'develop/java/nexus/feature-guide', - ], + items: ['develop/java/nexus/quickstart', 'develop/java/nexus/feature-guide'], }, { type: 'category', @@ -336,10 +315,7 @@ module.exports = { type: 'doc', id: 'develop/java/platform/index', }, - items: [ - 'develop/java/platform/observability', - 'develop/java/platform/enriching-ui', - ], + items: ['develop/java/platform/observability', 'develop/java/platform/enriching-ui'], }, { type: 'category', @@ -363,10 +339,7 @@ module.exports = { type: 'doc', id: 'develop/java/integrations/index', }, - items: [ - 'develop/java/integrations/spring-boot', - 'develop/java/integrations/spring-ai', - ], + items: ['develop/java/integrations/spring-boot', 'develop/java/integrations/spring-ai'], }, ], }, @@ -399,7 +372,7 @@ module.exports = { 'develop/php/workflows/timers', 'develop/php/workflows/side-effects', 'develop/php/workflows/versioning', - ] + ], }, { type: 'category', @@ -414,7 +387,7 @@ module.exports = { 'develop/php/activities/execution', 'develop/php/activities/timeouts', 'develop/php/activities/asynchronous-activity', - ] + ], }, { type: 'category', @@ -424,9 +397,7 @@ module.exports = { type: 'doc', id: 'develop/php/workers/index', }, - items: [ - 'develop/php/workers/run-worker-process', - ] + items: ['develop/php/workers/run-worker-process'], }, { type: 'category', @@ -436,9 +407,7 @@ module.exports = { type: 'doc', id: 'develop/php/client/index', }, - items: [ - 'develop/php/client/temporal-client', - ], + items: ['develop/php/client/temporal-client'], }, { type: 'category', @@ -448,10 +417,7 @@ module.exports = { type: 'doc', id: 'develop/php/platform/index', }, - items: [ - 'develop/php/platform/observability', - 'develop/php/platform/enriching-ui', - ], + items: ['develop/php/platform/observability', 'develop/php/platform/enriching-ui'], }, { type: 'category', @@ -461,10 +427,7 @@ module.exports = { type: 'doc', id: 'develop/php/best-practices/index', }, - items: [ - 'develop/php/best-practices/testing-suite', - 'develop/php/best-practices/debugging' - ] + items: ['develop/php/best-practices/testing-suite', 'develop/php/best-practices/debugging'], }, ], }, @@ -497,7 +460,7 @@ module.exports = { 'develop/python/workflows/timers', 'develop/python/workflows/versioning', 'develop/python/workflows/workflow-streams', - ] + ], }, { type: 'category', @@ -535,9 +498,7 @@ module.exports = { type: 'doc', id: 'develop/python/workers/serverless-workers/index', }, - items: [ - 'develop/python/workers/serverless-workers/aws-lambda', - ], + items: ['develop/python/workers/serverless-workers/aws-lambda'], }, ], }, @@ -549,9 +510,7 @@ module.exports = { type: 'doc', id: 'develop/python/client/index', }, - items: [ - 'develop/python/client/temporal-client', - ], + items: ['develop/python/client/temporal-client'], }, { type: 'category', @@ -561,10 +520,7 @@ module.exports = { type: 'doc', id: 'develop/python/nexus/index', }, - items: [ - 'develop/python/nexus/quickstart', - 'develop/python/nexus/feature-guide', - ], + items: ['develop/python/nexus/quickstart', 'develop/python/nexus/feature-guide'], }, { type: 'category', @@ -574,10 +530,7 @@ module.exports = { type: 'doc', id: 'develop/python/platform/index', }, - items: [ - 'develop/python/platform/observability', - 'develop/python/platform/enriching-ui', - ], + items: ['develop/python/platform/observability', 'develop/python/platform/enriching-ui'], }, { type: 'category', @@ -654,6 +607,7 @@ module.exports = { 'develop/typescript/workflows/schedules', 'develop/typescript/workflows/timers', 'develop/typescript/workflows/versioning', + 'develop/typescript/workflows/workflow-streams', ], }, { @@ -692,9 +646,7 @@ module.exports = { type: 'doc', id: 'develop/typescript/workers/serverless-workers/index', }, - items: [ - 'develop/typescript/workers/serverless-workers/aws-lambda', - ], + items: ['develop/typescript/workers/serverless-workers/aws-lambda'], }, ], }, @@ -706,10 +658,7 @@ module.exports = { type: 'doc', id: 'develop/typescript/client/index', }, - items: [ - 'develop/typescript/client/temporal-client', - 'develop/typescript/client/namespaces' - ], + items: ['develop/typescript/client/temporal-client', 'develop/typescript/client/namespaces'], }, { type: 'category', @@ -719,10 +668,7 @@ module.exports = { type: 'doc', id: 'develop/typescript/nexus/index', }, - items: [ - 'develop/typescript/nexus/quickstart', - 'develop/typescript/nexus/feature-guide' - ], + items: ['develop/typescript/nexus/quickstart', 'develop/typescript/nexus/feature-guide'], }, { type: 'category', @@ -732,10 +678,7 @@ module.exports = { type: 'doc', id: 'develop/typescript/platform/index', }, - items: [ - 'develop/typescript/platform/observability', - 'develop/typescript/platform/enriching-ui', - ], + items: ['develop/typescript/platform/observability', 'develop/typescript/platform/enriching-ui'], }, { type: 'category', @@ -793,7 +736,7 @@ module.exports = { 'develop/dotnet/workflows/timers', 'develop/dotnet/workflows/dynamic-workflow', 'develop/dotnet/workflows/versioning', - ] + ], }, { type: 'category', @@ -811,7 +754,7 @@ module.exports = { 'develop/dotnet/activities/dynamic-activity', 'develop/dotnet/activities/benign-exceptions', 'develop/dotnet/activities/standalone-activities', - ] + ], }, { type: 'category', @@ -834,9 +777,7 @@ module.exports = { type: 'doc', id: 'develop/dotnet/client/index', }, - items: [ - 'develop/dotnet/client/temporal-client', - ], + items: ['develop/dotnet/client/temporal-client'], }, { type: 'category', @@ -846,10 +787,7 @@ module.exports = { type: 'doc', id: 'develop/dotnet/nexus/index', }, - items: [ - 'develop/dotnet/nexus/quickstart', - 'develop/dotnet/nexus/feature-guide', - ], + items: ['develop/dotnet/nexus/quickstart', 'develop/dotnet/nexus/feature-guide'], }, { type: 'category', @@ -859,10 +797,7 @@ module.exports = { type: 'doc', id: 'develop/dotnet/platform/index', }, - items: [ - 'develop/dotnet/platform/observability', - 'develop/dotnet/platform/enriching-ui', - ], + items: ['develop/dotnet/platform/observability', 'develop/dotnet/platform/enriching-ui'], }, { type: 'category', @@ -877,7 +812,7 @@ module.exports = { 'develop/dotnet/best-practices/testing-suite', 'develop/dotnet/best-practices/debugging', 'develop/dotnet/best-practices/converters-and-encryption', - ] + ], }, ], }, @@ -911,7 +846,7 @@ module.exports = { 'develop/ruby/workflows/futures', 'develop/ruby/workflows/dynamic-workflow', 'develop/ruby/workflows/versioning', - ] + ], }, { type: 'category', @@ -928,7 +863,7 @@ module.exports = { 'develop/ruby/activities/asynchronous-activity', 'develop/ruby/activities/dynamic-activity', 'develop/ruby/activities/benign-exceptions', - ] + ], }, { type: 'category', @@ -938,9 +873,7 @@ module.exports = { type: 'doc', id: 'develop/ruby/workers/index', }, - items: [ - 'develop/ruby/workers/run-worker-process', - ] + items: ['develop/ruby/workers/run-worker-process'], }, { type: 'category', @@ -950,9 +883,7 @@ module.exports = { type: 'doc', id: 'develop/ruby/client/index', }, - items: [ - 'develop/ruby/client/temporal-client', - ], + items: ['develop/ruby/client/temporal-client'], }, { type: 'category', @@ -962,10 +893,7 @@ module.exports = { type: 'doc', id: 'develop/ruby/platform/index', }, - items: [ - 'develop/ruby/platform/observability', - 'develop/ruby/platform/enriching-ui', - ], + items: ['develop/ruby/platform/observability', 'develop/ruby/platform/enriching-ui'], }, { type: 'category', @@ -975,9 +903,7 @@ module.exports = { type: 'doc', id: 'develop/ruby/integrations/index', }, - items: [ - 'develop/ruby/integrations/rails-integration', - ], + items: ['develop/ruby/integrations/rails-integration'], }, { type: 'category', @@ -992,7 +918,7 @@ module.exports = { 'develop/ruby/best-practices/testing-suite', 'develop/ruby/best-practices/debugging', 'develop/ruby/best-practices/converters-and-encryption', - ] + ], }, ], }, @@ -1046,9 +972,7 @@ module.exports = { type: 'doc', id: 'develop/rust/workers/index', }, - items: [ - 'develop/rust/workers/worker-process', - ], + items: ['develop/rust/workers/worker-process'], }, { type: 'category', @@ -1058,9 +982,7 @@ module.exports = { type: 'doc', id: 'develop/rust/client/index', }, - items: [ - 'develop/rust/client/temporal-client', - ], + items: ['develop/rust/client/temporal-client'], }, { type: 'category', @@ -1101,11 +1023,7 @@ module.exports = { type: 'doc', id: 'cloud/get-started/index', }, - items: [ - 'cloud/get-started/namespaces', - 'cloud/get-started/api-keys', - 'cloud/get-started/certificates', - ], + items: ['cloud/get-started/namespaces', 'cloud/get-started/api-keys', 'cloud/get-started/certificates'], }, { type: 'category', @@ -1564,6 +1482,7 @@ module.exports = { items: [ 'encyclopedia/workflow-message-passing/sending-messages', 'encyclopedia/workflow-message-passing/handling-messages', + 'encyclopedia/workflow-message-passing/workflow-streams', ], }, {