diff --git a/docs/develop/python/index.mdx b/docs/develop/python/index.mdx index 403be4645b..32811593f9 100644 --- a/docs/develop/python/index.mdx +++ b/docs/develop/python/index.mdx @@ -86,6 +86,7 @@ From there, you can dive deeper into any of the Temporal primitives to start bui - [LangSmith integration](/develop/python/integrations/langsmith) - [OpenAI Agents SDK integration](https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/openai_agents/README.md) - [Pydantic AI integration](https://ai.pydantic.dev/durable_execution/temporal/) +- [Strands Agents integration](/develop/python/integrations/strands-agents) - [Tenuo integration](https://tenuo.ai/temporal) ## Temporal Python Technical Resources diff --git a/docs/develop/python/integrations/index.mdx b/docs/develop/python/integrations/index.mdx index e0f3fb8d8a..8bdad96bd4 100644 --- a/docs/develop/python/integrations/index.mdx +++ b/docs/develop/python/integrations/index.mdx @@ -28,6 +28,7 @@ The following integrations are available between the Temporal Python SDK and thi | LangSmith | Observability | [smith.langchain.com](https://docs.smith.langchain.com/) | [Guide](./langsmith.mdx) | | OpenAI Agents SDK | Agent framework | [openai.github.io](https://openai.github.io/openai-agents-python/) | [Guide](https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/openai_agents/README.md) | | Pydantic AI | Agent framework | [ai.pydantic.dev](https://ai.pydantic.dev/) | [Guide](https://ai.pydantic.dev/durable_execution/temporal/) | +| Strands Agents | Agent framework | [strandsagents.com](https://strandsagents.com/) | [Guide](./strands-agents.mdx) | | Tenuo | Governance | [tenuo.ai](https://tenuo.ai/docs) | [Guide](https://tenuo.ai/temporal) | These integrations are built on the Temporal Python SDK's [Plugin system](/develop/plugins-guide), which you can also diff --git a/docs/develop/python/integrations/strands-agents.mdx b/docs/develop/python/integrations/strands-agents.mdx new file mode 100644 index 0000000000..f4734daced --- /dev/null +++ b/docs/develop/python/integrations/strands-agents.mdx @@ -0,0 +1,945 @@ +--- +id: strands-agents +title: Strands Agents integration +sidebar_label: Strands Agents +toc_max_heading_level: 3 +keywords: + - ai + - agents + - strands + - strands agents + - durable execution + - ai workflows +tags: + - Strands Agents + - Python SDK + - Temporal SDKs +description: Run Strands Agents AI workflows with durable execution using the Temporal Python SDK and Strands plugin. +--- + +Temporal's integration with [Strands Agents](https://strandsagents.com/) is an [SDK Plugin](/develop/plugins-guide) that +gives your Strands agents [Durable Execution](/temporal#durable-execution) via the Temporal platform. The plugin routes +model invocations, tool calls, MCP tool calls, and hooks through Temporal Activities, so every step your agent takes is +recorded in Workflow history and can survive crashes, restarts, and infrastructure failures. + +:::info + +The Temporal Python SDK integration with Strands Agents is currently at an experimental release stage. The API may +change in future versions. + +::: + +Code snippets in this guide are taken from the +[Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin). Refer to the +samples for the complete code. + +## Get started + +Install the plugin, then run a minimal Strands agent inside a Temporal Workflow. + +### Prerequisites + +- This guide assumes you are already familiar with Strands Agents. If you are not, refer to the + [Strands Agents documentation](https://strandsagents.com/) for more details. +- If you are new to Temporal, read [Understanding Temporal](/evaluate/understanding-temporal) or take the + [Temporal 101](https://learn.temporal.io/courses/temporal_101/) course. +- Set up your local development environment by following the + [Set up your local development environment](/develop/python/set-up-your-local-python) guide. Leave the Temporal + development server running if you want to test your code locally. + +### Install the plugin + +Install the Temporal Python SDK with Strands Agents support (requires `temporalio` 1.28.0 or later): + +```bash +uv add "temporalio[strands-agents]" +``` + +or with pip: + +```bash +pip install "temporalio[strands-agents]" +``` + +### Run a Strands agent with Durable Execution + +The following example runs a Strands agent inside a Temporal Workflow. Model calls execute as Temporal Activities, which +means they get automatic retries, timeouts, and durable execution. If the Worker process crashes mid-conversation, +Temporal replays the Workflow and resumes from the last completed Activity. + +**1. Define the Workflow** + +Create a Workflow that holds a `TemporalAgent` and invokes it with a prompt. The `start_to_close_timeout` sets the +maximum time each model call Activity can run: + + + +[strands_plugin/hello_world/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/workflow.py) + +```py +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.strands import TemporalAgent + + +@workflow.defn +class HelloWorldWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60)) + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + return str(result) +``` + + + +:::caution + +Inside a Workflow, always call `agent.invoke_async(message)`, not `agent(message)`. The synchronous form spawns a worker +thread, which the Workflow sandbox blocks. + +::: + +**2. Start a Worker** + +Create a Worker that registers the Workflow and the `StrandsPlugin`. The plugin automatically registers the Activities +that handle model calls: + + + +[strands_plugin/hello_world/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/run_worker.py) + +```py +import asyncio +import os + +from temporalio.client import Client +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker + +from strands_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + plugin = StrandsPlugin() + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="strands-hello-world", + workflows=[HelloWorldWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + + +**3. Run the Workflow** + +Start the Workflow from a separate client script. This example sends the prompt "Write a haiku about durable execution" +and prints the agent's response: + + + +[strands_plugin/hello_world/run_workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/run_workflow.py) + +```py +import asyncio +import os + +from temporalio.client import Client + +from strands_plugin.hello_world.workflow import HelloWorldWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + result = await client.execute_workflow( + HelloWorldWorkflow.run, + "Write a haiku about durable execution.", + id="strands-hello-world", + task_queue="strands-hello-world", + ) + + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + + +## Build the agent + +Customize which model provider your agent uses, add tools that run as Activities, subscribe to lifecycle events with +hooks, and connect to MCP servers. + +### Choose and configure models + +By default, `StrandsPlugin` uses Strands' own default model (`BedrockModel`). To use a different model, pass a `models` +mapping to `StrandsPlugin` on the Worker. When you provide a custom `models` mapping, each `TemporalAgent` must specify +which model to use by name. + +Each entry in the mapping pairs a name with a factory function that creates a model provider (such as `AnthropicModel` +or `BedrockModel`). The provider is created on first use and reused for the Worker's lifetime: + +```python +from strands.models.anthropic import AnthropicModel +from strands.models.bedrock import BedrockModel + +# Workflow +@workflow.defn +class MultiModelWorkflow: + def __init__(self) -> None: + self.agent_a = TemporalAgent( + model="claude", + start_to_close_timeout=timedelta(seconds=60), + ) + self.agent_b = TemporalAgent( + model="bedrock", + start_to_close_timeout=timedelta(seconds=60), + ) + +# Worker +Worker(..., plugins=[StrandsPlugin(models={ + "claude": lambda: AnthropicModel(client_args={"api_key": "..."}), + "bedrock": lambda: BedrockModel(), +})]) +``` + +Each `TemporalAgent` carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and +dispatches to a shared model Activity, which resolves the model name against the registered factories at runtime. A +model name not present in the `models` mapping raises `ValueError` inside the Activity. + +### Run non-deterministic tools as Activities + +Strands tools that perform I/O, access external services, or produce non-deterministic results need to run as Temporal +Activities rather than inline in the Workflow. Wrap each tool in an `@activity.defn` function, register the Activities +on the Worker, and pass them to the agent using `activity_as_tool`. + +Define an Activity for the tool: + + + +[strands_plugin/tools/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/workflow.py) + +```py +@activity.defn +async def fetch_weather(city: str) -> dict: + """Stub weather lookup — replace with a real HTTP call in production.""" + return { + "city": city, + "temperature_f": 72, + "conditions": "sunny", + } +``` + + + +Pass the Activity to the agent in the Workflow using `activity_as_tool`: + + + +[strands_plugin/tools/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/workflow.py) + +```py +@workflow.defn +class ToolsWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[ + letter_counter, + activity_as_tool( + fetch_weather, + start_to_close_timeout=timedelta(seconds=30), + ), + activity_as_tool( + environment_activity, + start_to_close_timeout=timedelta(seconds=30), + ), + ], + ) + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + return str(result) +``` + + + +Register the Activity functions on the Worker: + + + +[strands_plugin/tools/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/run_worker.py) + +```py +import asyncio +import os + +from temporalio.client import Client +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker + +from strands_plugin.tools.workflow import ( + ToolsWorkflow, + environment_activity, + fetch_weather, +) + + +async def main() -> None: + plugin = StrandsPlugin() + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="strands-tools", + workflows=[ToolsWorkflow], + activities=[fetch_weather, environment_activity], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + + +If you are using built-in `strands_tools`, wrap them in a thin async function decorated with `@activity.defn` so they +run as Temporal Activities. + +### React to agent lifecycle events + +Strands' [hook system](https://strandsagents.com/docs/user-guide/concepts/agents/hooks/) lets you subscribe callbacks to events in the agent lifecycle, such +as invocation start/end, model call before/after, tool call before/after, and message added. Use hooks to add logging, +metrics, or custom logic at each stage. + +Pass `hooks=[MyHookProvider()]` to `TemporalAgent`. Hook callbacks fire in Workflow context, so deterministic callbacks +work without any extra setup. + +For callbacks that need I/O (audit logging, metrics, alerting), use `activity_as_hook` to dispatch the work as a +Temporal Activity. The following example shows both patterns in one `HookProvider`. The `_record` callback runs in +Workflow context (deterministic), while `persist_tool_call` runs as an Activity (I/O-safe): + + + +[strands_plugin/hooks/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hooks/workflow.py) + +```py +@activity.defn +async def persist_tool_call(tool_name: str) -> None: + # In production, write to a database / S3 / your audit pipeline. + activity.logger.info(f"audit: tool {tool_name} completed") +``` + + + + + +[strands_plugin/hooks/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hooks/workflow.py) + +```py +class AuditHook(HookProvider): + def __init__(self) -> None: + self.fired: list[str] = [] + + def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None: + registry.add_callback(AfterToolCallEvent, self._record) + registry.add_callback( + AfterToolCallEvent, + activity_as_hook( + persist_tool_call, + activity_input=lambda event: event.tool_use["name"], + start_to_close_timeout=timedelta(seconds=15), + ), + ) + + def _record(self, event: AfterToolCallEvent) -> None: + self.fired.append(event.tool_use["name"]) +``` + + + +:::caution + +Hook callbacks run in Workflow context, so they must be +[deterministic](/develop/python/workflows/basics#workflow-logic-requirements). Do not use `time.time()`, `uuid.uuid4()`, +or I/O inside hook callbacks. Use `activity_as_hook` for anything that requires I/O. + +::: + +The `activity_input` parameter extracts serializable values from the event to pass as the Activity's input. Use a +dataclass or Pydantic model for multiple values. This is needed because hook events hold references to `Agent`, +`AgentTool` instances, and other objects that cannot cross the Activity boundary. + +### Connect to MCP servers + +If your agent needs access to tools provided by an [MCP](https://modelcontextprotocol.io/) server, configure the MCP +clients on the Worker and reference them by name in the Workflow. + +`StrandsPlugin(mcp_clients=...)` takes a mapping of `name` to `MCPClient` factory, mirroring the `models` pattern. The +plugin registers a per-server Activity and connects at Worker startup to enumerate available tools. In the Workflow, +`TemporalMCPClient(server="name")` is a handle that references the server by name and carries per-call Activity options. + +Define the Workflow with a `TemporalMCPClient`: + + + +[strands_plugin/mcp/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/mcp/workflow.py) + +```py +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.strands import TemporalAgent, TemporalMCPClient + + +@workflow.defn +class MCPWorkflow: + def __init__(self) -> None: + echo = TemporalMCPClient( + server="echo", + start_to_close_timeout=timedelta(seconds=30), + ) + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[echo], + ) + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + return str(result) +``` + + + +Register the MCP client factory on the Worker: + + + +[strands_plugin/mcp/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/mcp/run_worker.py) + +```py +# ... +from mcp import StdioServerParameters, stdio_client +from strands.tools.mcp.mcp_client import MCPClient +from temporalio.client import Client +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker +# ... +def _make_echo_client() -> MCPClient: + return MCPClient( + lambda: stdio_client( + StdioServerParameters( + command=sys.executable, + args=[str(ECHO_SERVER)], + ) + ) + ) +# ... +async def main() -> None: + plugin = StrandsPlugin(mcp_clients={"echo": _make_echo_client}) + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="strands-mcp", + workflows=[MCPWorkflow], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() +``` + + + +Each factory returns a fully configured `MCPClient`, so you can pass options like `tool_filters`, `prefix`, +`elicitation_callback`, or `tasks_config` to it. + +:::info + +The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the Worker's +lifetime. Restart Workers to pick up MCP server changes. If a server is unavailable at startup, the Worker fails to +start. + +::: + +## Interact with the agent + +Control the shape of agent responses, stream output in real time, and pause the agent for human approval. + +### Add human approval gates + +Some agent actions, such as deleting resources or sending messages, may require human approval before proceeding. +Strands offers two ways to interrupt an agent and wait for a response. Both work with the plugin. + +In each case, `agent.invoke_async()` returns `AgentResult(stop_reason="interrupt", interrupts=[...])` instead of +raising. Pair this with a Signal handler that supplies responses, then resume by calling +`agent.invoke_async(responses)`. + +#### Interrupt from a hook + +A hook on an interruptible event such as `BeforeToolCallEvent` can pause the agent by calling +`event.interrupt(name, reason=...)`. The hook runs in Workflow context, so it must be deterministic. + +Define the approval hook: + + + +[strands_plugin/human_in_the_loop/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/human_in_the_loop/workflow.py) + +```py +class ApprovalHook(HookProvider): + def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None: + registry.add_callback(BeforeToolCallEvent, self._gate) + + def _gate(self, event: BeforeToolCallEvent) -> None: + if event.tool_use["name"] != "delete_file": + return + approval = event.interrupt( + "approval", + reason=f"approve delete of {event.tool_use['input']['path']}?", + ) + if approval != "approve": + event.cancel_tool = "denied" +``` + + + +The Workflow waits for a Signal carrying the approval response, then resumes the agent: + + + +[strands_plugin/human_in_the_loop/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/human_in_the_loop/workflow.py) + +```py +@workflow.defn +class HumanInTheLoopWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[delete_file], + hooks=[ApprovalHook()], + ) + self._approval: Optional[str] = None + self._pending_reason: Optional[str] = None + + @workflow.signal + def approve(self, response: str) -> None: + self._approval = response + + @workflow.query + def pending_approval(self) -> Optional[str]: + return self._pending_reason + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + while result.stop_reason == "interrupt": + interrupts = list(result.interrupts or []) + self._pending_reason = interrupts[0].reason if interrupts else None + await workflow.wait_condition(lambda: self._approval is not None) + response = self._approval + self._approval = None + self._pending_reason = None + responses: list[InterruptResponseContent] = [ + {"interruptResponse": {"interruptId": i.id, "response": response}} + for i in interrupts + ] + result = await self.agent.invoke_async(responses) + return str(result) +``` + + + +#### Interrupt from a tool + +A `@strands.tool` function can raise `InterruptException(Interrupt(...))` directly. The agent stops with the interrupt, +and the Workflow handles the resume the same way as for hooks: + +```python +from strands import tool +from strands.interrupt import Interrupt, InterruptException + + +@tool +def delete_thing(name: str) -> str: + raise InterruptException( + Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?") + ) +``` + +The same approach works from an `activity_as_tool`-wrapped Activity. The plugin's failure converter preserves the +`Interrupt` payload across the Activity boundary, so `AgentResult.interrupts` is populated the same way. + +Define the Activity that raises the interrupt: + + + +[strands_plugin/activity_interrupt/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/activity_interrupt/workflow.py) + +```py +@activity.defn +async def delete_thing(name: str) -> str: + if name not in _APPROVED: + _APPROVED.add(name) + raise InterruptException( + Interrupt( + id=f"delete:{name}", + name="approval", + reason=f"approve delete of protected resource '{name}'?", + ) + ) + return f"deleted {name}" +``` + + + +:::caution + +Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter. +Attach `StrandsPlugin` to the **client** (not just the Worker) for Activity-tool interrupts to work. + +::: + +Workers built from that client pick up the plugin automatically: + + + +[strands_plugin/activity_interrupt/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/activity_interrupt/run_worker.py) + +```py +import asyncio +import os + +from temporalio.client import Client +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker + +from strands_plugin.activity_interrupt.workflow import ( + ActivityInterruptWorkflow, + delete_thing, +) + + +async def main() -> None: + plugin = StrandsPlugin() + # The plugin MUST be on the client so its failure converter is installed. + client = await Client.connect( + os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[plugin], + ) + + worker = Worker( + client, + task_queue="strands-activity-interrupt", + workflows=[ActivityInterruptWorkflow], + activities=[delete_thing], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + + +### Return structured data from an agent + +To have the agent return a typed object instead of free-form text, pass a `structured_output_model` to `TemporalAgent`. +The plugin defaults to the [`pydantic_data_converter`](/develop/python/data-handling/data-conversion), so Pydantic types +serialize cleanly across the Activity and Workflow boundary: + + + +[strands_plugin/structured_output/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/structured_output/workflow.py) + +```py +from datetime import timedelta + +from pydantic import BaseModel, Field +from temporalio import workflow +from temporalio.contrib.strands import TemporalAgent + + +class PersonInfo(BaseModel): + name: str = Field(description="Name of the person") + age: int = Field(description="Age of the person") + occupation: str = Field(description="Occupation of the person") + + +@workflow.defn +class StructuredOutputWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + structured_output_model=PersonInfo, + ) + + @workflow.run + async def run(self, prompt: str) -> PersonInfo: + result = await self.agent.invoke_async(prompt) + assert isinstance(result.structured_output, PersonInfo) + return result.structured_output +``` + + + +### Stream agent output to clients + +For long-running agent calls, you may want to forward model output chunks to an external consumer as they arrive rather +than waiting for the full response. + +Pass `streaming_topic="..."` to `TemporalAgent` and host a `WorkflowStream` on the Workflow. Each `StreamEvent` is +published from inside the model Activity. Subscribers read events through `WorkflowStreamClient`. Chunks are batched on +`streaming_batch_interval` (default 100 ms). + +Define the Workflow with a `WorkflowStream` and a streaming topic: + + + +[strands_plugin/streaming/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/streaming/workflow.py) + +```py +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.strands import TemporalAgent +from temporalio.contrib.workflow_streams import WorkflowStream + + +@workflow.defn +class StreamingWorkflow: + def __init__(self) -> None: + self.stream = WorkflowStream() + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + streaming_topic="events", + ) + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + return str(result) +``` + + + +Subscribe to the stream from a client: + + + +[strands_plugin/streaming/run_workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/streaming/run_workflow.py) + +```py +import asyncio +import os +from datetime import timedelta + +from strands.types.streaming import StreamEvent +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from strands_plugin.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + workflow_id = "strands-streaming" + + handle = await client.start_workflow( + StreamingWorkflow.run, + "Count from 1 to 5, one number per sentence.", + id=workflow_id, + task_queue="strands-streaming", + ) + + async def consume() -> None: + stream = WorkflowStreamClient.create(client, workflow_id) + async for item in stream.subscribe( + ["events"], + from_offset=0, + result_type=StreamEvent, + poll_cooldown=timedelta(milliseconds=50), + ): + event: StreamEvent = item.data + if "contentBlockDelta" in event: + delta = event["contentBlockDelta"].get("delta", {}) + if "text" in delta: + print(delta["text"], end="", flush=True) + elif "messageStop" in event: + print() + return + + consume_task = asyncio.create_task(consume()) + result = await handle.result() + await asyncio.wait_for(consume_task, timeout=10.0) + print(f"Final result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + + +## Run in production + +Configure retry policies, handle long-running chat sessions, and add distributed tracing. + +### Configure retries + +`TemporalAgent` disables Strands' built-in `ModelRetryStrategy` so that retries are handled exclusively by Temporal. +Configure retries with `retry_policy` on `TemporalAgent` for model calls, and on the Activity options accepted by +`activity_as_tool`, `activity_as_hook`, and `TemporalMCPClient` for their respective calls: + +```python +from temporalio.common import RetryPolicy + + +TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy(maximum_attempts=3), +) +``` + +Passing `retry_strategy=...` to `TemporalAgent(...)` raises `ValueError`. Remove the argument (or pass +`retry_strategy=None`) and use `retry_policy` instead. + +### Handle long-running chat sessions + +A chat-style Workflow accumulates message history with every turn. Over a long session, the Workflow's event history can +grow large enough to hit Temporal's per-Workflow history limit. To avoid this, use +[Continue-as-New](/develop/python/workflows/continue-as-new) to start a fresh Workflow execution while carrying the +agent's message history forward as input. + +In this example, each user turn arrives as a Workflow [Update](/develop/python/workflows/message-passing#updates), so +the caller gets the agent's reply back from the same call. The `run` method creates the agent, then waits until either +the chat ends or Temporal suggests continue-as-new. When it does, the Workflow drains any in-flight updates and starts a +fresh execution with the agent's accumulated messages: + + + +[strands_plugin/continue_as_new/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/continue_as_new/workflow.py) + +```py +import asyncio +from dataclasses import dataclass, field +from datetime import timedelta + +from strands.types.content import Messages +from temporalio import workflow +from temporalio.contrib.strands import TemporalAgent + + +@dataclass +class ChatInput: + messages: Messages = field(default_factory=list) + + +@workflow.defn +class ChatWorkflow: + def __init__(self) -> None: + self._done = False + self._lock = asyncio.Lock() + self._agent: TemporalAgent | None = None + + @workflow.update + async def turn(self, prompt: str) -> str: + await workflow.wait_condition(lambda: self._agent is not None) + async with self._lock: + assert self._agent is not None + result = await self._agent.invoke_async(prompt) + return str(result).strip() + + @workflow.signal + def end_chat(self) -> None: + self._done = True + + @workflow.query + def messages(self) -> Messages: + return list(self._agent.messages) if self._agent else [] + + @workflow.run + async def run(self, input: ChatInput) -> None: + self._agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + messages=list(input.messages), + ) + + await workflow.wait_condition( + lambda: self._done or workflow.info().is_continue_as_new_suggested() + ) + + await workflow.wait_condition(workflow.all_handlers_finished) + + if not self._done: + workflow.continue_as_new(ChatInput(messages=self._agent.messages)) +``` + + + +### Add tracing with OpenTelemetry + +To get distributed traces across model, tool, and MCP Activities, combine `StrandsPlugin` with the +[OpenTelemetry plugin](/develop/python/platform/observability#tracing). Register `OpenTelemetryPlugin` on the client and +`StrandsPlugin` on the Worker. Workers built from that client pick up the OpenTelemetry plugin automatically: + +```python +import opentelemetry.trace +from temporalio.client import Client +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker + + +opentelemetry.trace.set_tracer_provider(create_tracer_provider()) + +client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()]) + +Worker( + client, + task_queue="strands", + workflows=[MyWorkflow], + plugins=[StrandsPlugin()], +) +``` + +Set the tracer provider before connecting the client. + +### Snapshots are not supported + +`TemporalAgent.take_snapshot()` and `TemporalAgent.load_snapshot()` raise `NotImplementedError`. Temporal's event +history already persists Workflow state durably at a finer granularity than Strands snapshots, so snapshots are +redundant inside a Workflow. + +### Samples + +The [Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin) demonstrate +all supported patterns end-to-end. diff --git a/sidebars.js b/sidebars.js index 79d3fbd669..5555068d1d 100644 --- a/sidebars.js +++ b/sidebars.js @@ -621,6 +621,7 @@ module.exports = { 'develop/python/integrations/braintrust', 'develop/python/integrations/langgraph', 'develop/python/integrations/langsmith', + 'develop/python/integrations/strands-agents', ], }, ], diff --git a/src/theme/Admonition/index.js b/src/theme/Admonition/index.js index 247e58a509..8c105681c8 100644 --- a/src/theme/Admonition/index.js +++ b/src/theme/Admonition/index.js @@ -1,8 +1,8 @@ -import React from "react"; -import clsx from "clsx"; -import { ThemeClassNames } from "@docusaurus/theme-common"; -import Translate from "@docusaurus/Translate"; -import styles from "./styles.module.css"; +import React from 'react'; +import clsx from 'clsx'; +import { ThemeClassNames } from '@docusaurus/theme-common'; +import Translate from '@docusaurus/Translate'; +import styles from './styles.module.css'; function NoteIcon() { return ( @@ -108,7 +108,7 @@ function CopyCodeIcon() { // eslint-disable-next-line @typescript-eslint/consistent-indexed-object-style const AdmonitionConfigs = { note: { - infimaClassName: "secondary", + infimaClassName: 'secondary', iconComponent: NoteIcon, label: ( @@ -117,7 +117,7 @@ const AdmonitionConfigs = { ), }, tip: { - infimaClassName: "success", + infimaClassName: 'success', iconComponent: TipIcon, label: ( @@ -126,7 +126,7 @@ const AdmonitionConfigs = { ), }, danger: { - infimaClassName: "danger", + infimaClassName: 'danger', iconComponent: DangerIcon, label: ( @@ -147,7 +147,7 @@ const AdmonitionConfigs = { ), }, caution: { - infimaClassName: "warning", + infimaClassName: 'warning', iconComponent: CautionIcon, label: ( React.isValidElement(item) && item.props?.mdxType === "mdxAdmonitionTitle" + (item) => React.isValidElement(item) && item.props?.mdxType === 'mdxAdmonitionTitle' ); const rest = <>{items.filter((item) => item !== mdxAdmonitionTitle)}; return { @@ -231,7 +232,7 @@ export default function Admonition(props) { className={clsx( ThemeClassNames.common.admonition, ThemeClassNames.common.admonitionType(props.type), - "alert", + 'alert', `alert--${typeConfig.infimaClassName}`, styles.admonition )}