-
Notifications
You must be signed in to change notification settings - Fork 108
Add Strands plugin samples #310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
brianstrauch
wants to merge
14
commits into
main
Choose a base branch
from
strands-plugin-samples
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 10 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
d8565fc
Add Strands plugin samples
brianstrauch b376e8e
Convert continue_as_new chat to workflow update
brianstrauch e4b4c0c
Fix interrupt sample: make delete_thing idempotent
brianstrauch 1faf2fe
Fix ruff import order in strands mock model
brianstrauch 18f0c31
Merge remote-tracking branch 'origin/main' into strands-plugin-samples
brianstrauch a2bc2b3
Merge branch 'main' into strands-plugin-samples
brianstrauch 974bca0
Rename Strands sample extra
brianstrauch 2e27f64
Use Windows-compatible Strands tool sample
brianstrauch 555a430
Bump sdk-python pin to pick up Strands MCP client fix
brianstrauch 61124cf
Rename Strands interrupt sample to activity_interrupt
brianstrauch 29221e6
Merge branch 'main' into strands-plugin-samples
lennessyy e209c43
Fix uv sync group name in Strands READMEs
brianstrauch 71d1ae4
Test the continue-as-new path in the Strands chat sample
brianstrauch 12f9d5d
add snipsync lines
lennessyy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| # Strands Agents Samples | ||
|
|
||
| These samples demonstrate the [Temporal Strands plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands), which runs [Strands Agents](https://strandsagents.com/) inside Temporal Workflows. Model invocations, tool calls, and MCP tool calls all execute as Temporal Activities, so you get durable execution, Temporal-managed retries, and timeouts. | ||
|
|
||
| ## Samples | ||
|
|
||
| | Sample | Description | | ||
| |--------|-------------| | ||
| | [hello_world](hello_world) | Minimal `TemporalAgent` invocation. Start here. | | ||
| | [tools](tools) | Three tool patterns side by side: in-workflow `@tool`, custom `@activity.defn` wrapped via `activity_as_tool`, and a `strands_tools` tool wrapped as a Temporal activity. | | ||
| | [human_in_the_loop](human_in_the_loop) | Pause a tool call on `BeforeToolCallEvent.interrupt()`, resume via Temporal signal. The canonical Strands HITL pattern. | | ||
| | [activity_interrupt](activity_interrupt) | Raise `InterruptException` from a Temporal activity to surface a HITL prompt across the activity boundary. Plugin-specific feature. | | ||
| | [hooks](hooks) | `HookProvider` with both an in-workflow callback and an `activity_as_hook` callback for I/O. | | ||
| | [mcp](mcp) | Connect to an MCP server (`FastMCP` echo) via `TemporalMCPClient`. | | ||
| | [structured_output](structured_output) | Pydantic-typed agent output via `structured_output_model`. | | ||
| | [streaming](streaming) | Forward model chunks to an external subscriber via `streaming_topic` + `WorkflowStream`. | | ||
| | [continue_as_new](continue_as_new) | Chat-style workflow that hands off `agent.messages` when history grows large. | | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| 1. Install dependencies: | ||
|
|
||
| ```bash | ||
| uv sync --group strands | ||
| ``` | ||
|
|
||
| > The `strands` extra of `temporalio` is shipping in an upcoming release. Until then, install the SDK from the strands branch: | ||
| > | ||
| > ```bash | ||
| > uv pip install -e ../sdk-python --extra strands-agents --extra pydantic | ||
| > ``` | ||
|
|
||
| 2. Configure AWS credentials. The samples use the plugin's default `BedrockModel()`, which picks up the standard AWS SDK credential chain. Make sure the credentials grant access to a Bedrock model in your selected region (e.g., `us-west-2`). | ||
|
|
||
| ```bash | ||
| export AWS_REGION=us-west-2 | ||
| # plus AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY or an SSO profile | ||
| ``` | ||
|
|
||
| You can pick a specific model by passing it to `BedrockModel(model_id="...")` in each sample's worker. | ||
|
|
||
| 3. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server): | ||
|
|
||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| ## Running a Sample | ||
|
|
||
| Each sample has two scripts. Start the Worker first, then the Workflow starter in a separate terminal: | ||
|
|
||
| ```bash | ||
| # Terminal 1: start the Worker | ||
| uv run strands_plugin/<sample>/run_worker.py | ||
|
|
||
| # Terminal 2: start the Workflow | ||
| uv run strands_plugin/<sample>/run_workflow.py | ||
| ``` | ||
|
|
||
| For example, to run the tools sample: | ||
|
|
||
| ```bash | ||
| # Terminal 1 | ||
| uv run strands_plugin/tools/run_worker.py | ||
|
|
||
| # Terminal 2 | ||
| uv run strands_plugin/tools/run_workflow.py | ||
| ``` | ||
|
|
||
| ## Key Features Demonstrated | ||
|
|
||
| - **Durable model invocation** — every model call runs in an `invoke_model` activity with configurable timeouts and retries. | ||
| - **Three ways to define tools** — pure Strands `@tool`, custom Temporal activities, and ecosystem `strands_tools` wrapped as activities. | ||
| - **Human-in-the-loop** — both hook-based (`BeforeToolCallEvent.interrupt()`) and tool-body (`raise InterruptException`) styles. | ||
| - **Hook system** — deterministic in-workflow callbacks plus I/O callbacks dispatched via `activity_as_hook`. | ||
| - **MCP integration** — connect to MCP servers at worker startup; tool calls dispatched through per-server activities. | ||
| - **Structured output** — Pydantic-typed agent results via the plugin's `pydantic_data_converter`. | ||
| - **Streaming** — forward model chunks live to external subscribers. | ||
| - **Long-lived chats** — hand off `agent.messages` via `continue-as-new` to stay under Temporal's history limit. | ||
|
|
||
| ## Related | ||
|
|
||
| - [Temporal Strands plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands) | ||
| - [Strands Agents](https://strandsagents.com/) | ||
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # Activity Interrupt | ||
|
|
||
| A `@activity.defn`-wrapped tool raises `InterruptException(Interrupt(...))` directly. The plugin's failure converter preserves the `Interrupt` payload across the activity boundary, so the agent stops with `stop_reason == "interrupt"` just like in the hook-based [human_in_the_loop](../human_in_the_loop) sample. | ||
|
|
||
| When to reach for this style instead of a hook: | ||
|
|
||
| - The decision to pause depends on data that's only visible inside the activity (a permissions service, a row in a database, etc.). | ||
| - You don't want to load that data into workflow context just to make the call. | ||
|
|
||
| ## What This Sample Demonstrates | ||
|
|
||
| - Raising `InterruptException` from a Temporal activity tool | ||
| - The plugin's failure converter carrying `Interrupt` across the activity boundary | ||
| - Why `StrandsPlugin` must be attached to the **client** (not just the worker) | ||
|
|
||
| ## Running the Sample | ||
|
|
||
| ```bash | ||
| # Terminal 1 | ||
| uv run strands_plugin/activity_interrupt/run_worker.py | ||
|
|
||
| # Terminal 2 | ||
| uv run strands_plugin/activity_interrupt/run_workflow.py | ||
| ``` | ||
|
|
||
| The starter requests deletion of a "protected" resource. The `delete_thing` activity raises an interrupt for protected names; the starter signals `"approve"` to release it. | ||
|
|
||
| ## Files | ||
|
|
||
| | File | Description | | ||
| |------|-------------| | ||
| | `workflow.py` | `delete_thing` activity that raises `InterruptException`, plus the workflow that handles resumption | | ||
| | `run_worker.py` | `StrandsPlugin` on the client + worker, registers the activity | | ||
| | `run_workflow.py` | Starts the workflow and signals approval | |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| """Worker for the activity interrupt sample.""" | ||
|
|
||
| import asyncio | ||
| import os | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.contrib.strands import StrandsPlugin | ||
| from temporalio.worker import Worker | ||
|
|
||
| from strands_plugin.activity_interrupt.workflow import ( | ||
| ActivityInterruptWorkflow, | ||
| delete_thing, | ||
| ) | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| plugin = StrandsPlugin() | ||
| # The plugin MUST be on the client so its failure converter is installed. | ||
| # Without it, the activity's InterruptException cannot survive serialization | ||
| # across the activity boundary as an Interrupt. | ||
| client = await Client.connect( | ||
| os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), | ||
| plugins=[plugin], | ||
| ) | ||
|
|
||
| worker = Worker( | ||
| client, | ||
| task_queue="strands-activity-interrupt", | ||
| workflows=[ActivityInterruptWorkflow], | ||
| activities=[delete_thing], | ||
| ) | ||
| print("Worker started. Ctrl+C to exit.") | ||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| """Start the activity interrupt workflow.""" | ||
|
|
||
| import asyncio | ||
| import os | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.contrib.strands import StrandsPlugin | ||
|
|
||
| from strands_plugin.activity_interrupt.workflow import ActivityInterruptWorkflow | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| # The starter also goes through the plugin's failure converter so the | ||
| # Interrupt payload deserializes cleanly when the workflow result is read. | ||
| client = await Client.connect( | ||
| os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), | ||
| plugins=[StrandsPlugin()], | ||
| ) | ||
|
|
||
| handle = await client.start_workflow( | ||
| ActivityInterruptWorkflow.run, | ||
| "Please delete the 'system' user.", | ||
| id="strands-activity-interrupt", | ||
| task_queue="strands-activity-interrupt", | ||
| ) | ||
|
|
||
| reason = None | ||
| while reason is None: | ||
| await asyncio.sleep(0.5) | ||
| reason = await handle.query(ActivityInterruptWorkflow.pending_approval) | ||
| print(f"Approval requested: {reason}") | ||
|
|
||
| await handle.signal(ActivityInterruptWorkflow.approve, "approve") | ||
|
|
||
| result = await handle.result() | ||
| print(f"Result: {result}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| """Activity interrupt: ``InterruptException`` raised from a Temporal activity. | ||
|
|
||
| The plugin's failure converter preserves the ``Interrupt`` payload across the | ||
| activity boundary, so a Temporal activity can pause the agent for human input | ||
| the same way a hook can. | ||
|
|
||
| For this to work, ``StrandsPlugin`` must be attached to the **client** (not | ||
| just the worker) so the failure converter is installed on the data converter. | ||
| The worker in this sample does exactly that. | ||
| """ | ||
|
|
||
| from datetime import timedelta | ||
| from typing import Optional | ||
|
|
||
| from strands.interrupt import Interrupt, InterruptException | ||
| from strands.types.interrupt import InterruptResponseContent | ||
| from temporalio import activity, workflow | ||
| from temporalio.contrib.strands import TemporalAgent | ||
| from temporalio.contrib.strands.workflow import activity_as_tool | ||
|
|
||
| # Tracks names that have been approved out-of-band. In a real system, this | ||
| # would be a row in a policy database; the human reviewer flips a flag during | ||
| # the pause, and the activity's next attempt reads the new value and proceeds. | ||
| _APPROVED: set[str] = set() | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def delete_thing(name: str) -> str: | ||
| if name not in _APPROVED: | ||
| # First attempt: mark the name as approved on the way out (simulating | ||
| # the human flipping a flag during the interrupt pause) and stop the | ||
| # agent. In production this branch would check a real authorization | ||
| # service and only raise when the resource is protected. | ||
| _APPROVED.add(name) | ||
| raise InterruptException( | ||
| Interrupt( | ||
| id=f"delete:{name}", | ||
| name="approval", | ||
| reason=f"approve delete of protected resource '{name}'?", | ||
| ) | ||
| ) | ||
| return f"deleted {name}" | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class ActivityInterruptWorkflow: | ||
| def __init__(self) -> None: | ||
| self.agent = TemporalAgent( | ||
| start_to_close_timeout=timedelta(seconds=60), | ||
| tools=[ | ||
| activity_as_tool( | ||
| delete_thing, | ||
| start_to_close_timeout=timedelta(seconds=30), | ||
| ), | ||
| ], | ||
| ) | ||
| self._approval: Optional[str] = None | ||
| self._pending_reason: Optional[str] = None | ||
|
|
||
| @workflow.signal | ||
| def approve(self, response: str) -> None: | ||
| self._approval = response | ||
|
|
||
| @workflow.query | ||
| def pending_approval(self) -> Optional[str]: | ||
| return self._pending_reason | ||
|
|
||
| @workflow.run | ||
| async def run(self, prompt: str) -> str: | ||
| result = await self.agent.invoke_async(prompt) | ||
| while result.stop_reason == "interrupt": | ||
| interrupts = list(result.interrupts or []) | ||
| self._pending_reason = interrupts[0].reason if interrupts else None | ||
| await workflow.wait_condition(lambda: self._approval is not None) | ||
| response = self._approval | ||
| self._approval = None | ||
| self._pending_reason = None | ||
| responses: list[InterruptResponseContent] = [ | ||
| {"interruptResponse": {"interruptId": i.id, "response": response}} | ||
| for i in interrupts | ||
| ] | ||
| result = await self.agent.invoke_async(responses) | ||
| return str(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| # Continue-as-new | ||
|
|
||
| A chat-style workflow accumulates history with every turn and will eventually hit Temporal's per-workflow history limit. `workflow.info().is_continue_as_new_suggested()` flips `True` once the server decides history has grown large enough; this sample checks it after each turn and hands off to a fresh run with `agent.messages` as input. | ||
|
|
||
| ## What This Sample Demonstrates | ||
|
|
||
| - Driving a multi-turn chat with **updates**, so each caller gets the assistant's reply back from the same call | ||
| - Seeding a new `TemporalAgent` with prior `agent.messages` | ||
| - Using `workflow.info().is_continue_as_new_suggested()` + `workflow.continue_as_new(...)` to keep the workflow alive indefinitely | ||
| - Draining in-flight update handlers with `workflow.all_handlers_finished` before continue-as-new | ||
|
|
||
| ## Running the Sample | ||
|
|
||
| ```bash | ||
| # Terminal 1 | ||
| uv run strands_plugin/continue_as_new/run_worker.py | ||
|
|
||
| # Terminal 2 | ||
| uv run strands_plugin/continue_as_new/run_workflow.py | ||
| ``` | ||
|
|
||
| The starter calls the `turn` update for each user message and prints the assistant's reply, then signals `end_chat`. In a real chatbot, a UI would drive the updates and the workflow would run indefinitely, continuing-as-new whenever history gets large. | ||
|
|
||
| ## Files | ||
|
|
||
| | File | Description | | ||
| |------|-------------| | ||
| | `workflow.py` | `ChatInput`, `ChatWorkflow` with `turn` update, `end_chat` signal, and `messages` query | | ||
| | `run_worker.py` | Registers `StrandsPlugin`, starts the worker | | ||
| | `run_workflow.py` | Starts the chat, sends a few turns, ends it | |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| """Worker for the chat continue-as-new sample.""" | ||
|
|
||
| import asyncio | ||
| import os | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.contrib.strands import StrandsPlugin | ||
| from temporalio.worker import Worker | ||
|
|
||
| from strands_plugin.continue_as_new.workflow import ChatWorkflow | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| plugin = StrandsPlugin() | ||
| client = await Client.connect( | ||
| os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), | ||
| plugins=[plugin], | ||
| ) | ||
|
|
||
| worker = Worker( | ||
| client, | ||
| task_queue="strands-chat", | ||
| workflows=[ChatWorkflow], | ||
| ) | ||
| print("Worker started. Ctrl+C to exit.") | ||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.