From bef4b501c6d9d1780c457c98ae6dceccc63df42d Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Fri, 3 Jul 2026 11:17:02 +0100 Subject: [PATCH] fix(security): harden SDK auth transports --- src/adcp/__init__.py | 2 + src/adcp/adagents.py | 211 +++++++++--------- src/adcp/client.py | 10 + src/adcp/config.py | 21 +- src/adcp/decisioning/upstream.py | 1 + src/adcp/exceptions.py | 21 ++ src/adcp/protocols/a2a.py | 1 + src/adcp/protocols/mcp.py | 76 +++++-- src/adcp/registry.py | 1 + src/adcp/server/idempotency/store.py | 25 ++- src/adcp/types/core.py | 31 ++- .../conformance/signing/test_autosign_e2e.py | 8 +- .../conformance/signing/test_autosign_mcp.py | 24 +- tests/test_adagents.py | 51 +++++ tests/test_cli.py | 19 ++ tests/test_security_hardening.py | 176 +++++++++++++++ tests/test_server_caller_identity.py | 17 +- tests/test_server_idempotency.py | 43 ++-- 18 files changed, 553 insertions(+), 185 deletions(-) create mode 100644 tests/test_security_hardening.py diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 826ecc5e3..2cd0c7125 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -159,6 +159,7 @@ def _resolve_version() -> str: "ConfigurationError", "IdempotencyConflictError", "IdempotencyExpiredError", + "IdempotencyScopeError", "IdempotencyUnsupportedError", "RegistryError", ), @@ -1344,6 +1345,7 @@ def get_adcp_version() -> str: ConfigurationError, IdempotencyConflictError, IdempotencyExpiredError, + IdempotencyScopeError, IdempotencyUnsupportedError, RegistryError, ) diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 723e0b23b..7831aadc3 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -2514,131 +2514,124 @@ async def detect_publisher_properties_divergence( an empty list means counts agree but set-equality is not guaranteed. """ - own_client = client is None - http = client or httpx.AsyncClient() - try: - collected: list[DirectoryPublisherEntry] = [] - cursor: str | None = None - seen_cursors: set[str] = set() - page_count = 0 - while True: - page = await fetch_agent_authorizations_from_directory( - agent_url, - directory_url=directory_url, - cursor=cursor, - include=["properties"], - timeout=timeout, - client=http, + http = client + collected: list[DirectoryPublisherEntry] = [] + cursor: str | None = None + seen_cursors: set[str] = set() + page_count = 0 + while True: + page = await fetch_agent_authorizations_from_directory( + agent_url, + directory_url=directory_url, + cursor=cursor, + include=["properties"], + timeout=timeout, + client=http, + ) + page_count += 1 + collected.extend(page.publishers) + if sample_size is not None and len(collected) >= sample_size: + collected = collected[:sample_size] + break + cursor = page.next_cursor + if not cursor: + break + if cursor in seen_cursors: + raise AdagentsValidationError( + f"Directory page cursor {cursor!r} repeated — refusing to loop forever." ) - page_count += 1 - collected.extend(page.publishers) - if sample_size is not None and len(collected) >= sample_size: - collected = collected[:sample_size] - break - cursor = page.next_cursor - if not cursor: - break - if cursor in seen_cursors: - raise AdagentsValidationError( - f"Directory page cursor {cursor!r} repeated — refusing to loop forever." - ) - seen_cursors.add(cursor) - if page_count >= MAX_DIRECTORY_PAGES: - raise AdagentsValidationError( - f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep." - ) - - # Dedupe by publisher_domain before fan-out: a hostile directory - # returning N rows for the same publisher would otherwise amplify - # into N concurrent fetches against a single victim host. First - # occurrence wins (deterministic) — conflicting property_ids / - # properties_authorized across duplicates are dropped here; the - # directory's behavior is itself a divergence signal for ops. - seen_domains: set[str] = set() - deduped: list[DirectoryPublisherEntry] = [] - for entry in collected: - if entry.publisher_domain in seen_domains: - continue - seen_domains.add(entry.publisher_domain) - deduped.append(entry) - collected = deduped - - # Emit a one-shot warning when the entire sample comes back without - # property_ids[]. In count-only mode, same-count substitutions are - # undetectable — adopters should pin include=["properties"] support - # on directories that offer it. - if collected and all(e.property_ids is None for e in collected): - logger.warning( - "AAO directory %s did not return property_ids[] on any publisher " - "entry — falling back to count-only divergence detection. Same-count " - "substitutions are undetectable in this mode. Upgrade the directory " - "or pin include=['properties'] support.", - directory_url, + seen_cursors.add(cursor) + if page_count >= MAX_DIRECTORY_PAGES: + raise AdagentsValidationError( + f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep." ) - sem = asyncio.Semaphore(max_concurrency) + # Dedupe by publisher_domain before fan-out: a hostile directory + # returning N rows for the same publisher would otherwise amplify + # into N concurrent fetches against a single victim host. First + # occurrence wins (deterministic) — conflicting property_ids / + # properties_authorized across duplicates are dropped here; the + # directory's behavior is itself a divergence signal for ops. + seen_domains: set[str] = set() + deduped: list[DirectoryPublisherEntry] = [] + for entry in collected: + if entry.publisher_domain in seen_domains: + continue + seen_domains.add(entry.publisher_domain) + deduped.append(entry) + collected = deduped + + # Emit a one-shot warning when the entire sample comes back without + # property_ids[]. In count-only mode, same-count substitutions are + # undetectable — adopters should pin include=["properties"] support + # on directories that offer it. + if collected and all(e.property_ids is None for e in collected): + logger.warning( + "AAO directory %s did not return property_ids[] on any publisher " + "entry — falling back to count-only divergence detection. Same-count " + "substitutions are undetectable in this mode. Upgrade the directory " + "or pin include=['properties'] support.", + directory_url, + ) - async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None: - async with sem: - try: - data = await fetch_adagents( - entry.publisher_domain, timeout=timeout, client=http - ) - federated_props = get_properties_by_agent(data, agent_url) - # Falsy/empty property_id is silently dropped: upstream - # schema requires a non-empty string, so an empty value - # is a structural violation that belongs in - # validate_adagents, not a divergence signal. Federated - # properties with valid IDs only. - federated_ids = { - str(p.get("property_id")) for p in federated_props if p.get("property_id") - } - except ( - AdagentsNotFoundError, - AdagentsValidationError, - AdagentsTimeoutError, - httpx.HTTPError, - OSError, - ValueError, - ) as exc: - return PublisherDivergence( - publisher_domain=entry.publisher_domain, - directory_properties_authorized=entry.properties_authorized, - federated_properties_found=0, - missing_in_inline=None, - missing_in_federated=None, - child_fetch_error=str(exc), - ) + sem = asyncio.Semaphore(max_concurrency) - if entry.property_ids is not None: - # Full set-diff path (adcp#4894). - dir_ids = set(entry.property_ids) - missing_in_inline = sorted(federated_ids - dir_ids) - missing_in_federated = sorted(dir_ids - federated_ids) - if not missing_in_inline and not missing_in_federated: - return None + async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None: + async with sem: + try: + data = await fetch_adagents(entry.publisher_domain, timeout=timeout, client=http) + federated_props = get_properties_by_agent(data, agent_url) + # Falsy/empty property_id is silently dropped: upstream + # schema requires a non-empty string, so an empty value + # is a structural violation that belongs in + # validate_adagents, not a divergence signal. Federated + # properties with valid IDs only. + federated_ids = { + str(p.get("property_id")) for p in federated_props if p.get("property_id") + } + except ( + AdagentsNotFoundError, + AdagentsValidationError, + AdagentsTimeoutError, + httpx.HTTPError, + OSError, + ValueError, + ) as exc: return PublisherDivergence( publisher_domain=entry.publisher_domain, directory_properties_authorized=entry.properties_authorized, - federated_properties_found=len(federated_ids), - missing_in_inline=missing_in_inline, - missing_in_federated=missing_in_federated, + federated_properties_found=0, + missing_in_inline=None, + missing_in_federated=None, + child_fetch_error=str(exc), ) - # Count-only fallback (older directories). - if len(federated_ids) == entry.properties_authorized: + if entry.property_ids is not None: + # Full set-diff path (adcp#4894). + dir_ids = set(entry.property_ids) + missing_in_inline = sorted(federated_ids - dir_ids) + missing_in_federated = sorted(dir_ids - federated_ids) + if not missing_in_inline and not missing_in_federated: return None return PublisherDivergence( publisher_domain=entry.publisher_domain, directory_properties_authorized=entry.properties_authorized, federated_properties_found=len(federated_ids), - missing_in_inline=None, - missing_in_federated=None, + missing_in_inline=missing_in_inline, + missing_in_federated=missing_in_federated, ) - probes = await asyncio.gather(*[_probe(e) for e in collected]) - finally: - if own_client: - await http.aclose() + # Count-only fallback (older directories). + if len(federated_ids) == entry.properties_authorized: + return None + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=len(federated_ids), + missing_in_inline=None, + missing_in_federated=None, + ) + + probes = await asyncio.gather(*[_probe(e) for e in collected]) return [p for p in probes if p is not None] diff --git a/src/adcp/client.py b/src/adcp/client.py index 47b6cf75a..f7c03cb18 100644 --- a/src/adcp/client.py +++ b/src/adcp/client.py @@ -90,6 +90,7 @@ Protocol, TaskResult, TaskStatus, + is_loopback_http_uri, ) # V3 Governance (Sync Governance) types @@ -537,6 +538,15 @@ def __init__( self.validate_features = validate_features self.strict_idempotency = strict_idempotency self.signing = signing + if ( + signing is not None + and agent_config.agent_uri.startswith("http://") + and not is_loopback_http_uri(agent_config.agent_uri) + ): + raise ValueError( + "request signing requires an https:// agent_uri for non-loopback hosts; " + "plain HTTP is only allowed for localhost/loopback development" + ) # Capabilities cache self._capabilities: GetAdcpCapabilitiesResponse | None = None diff --git a/src/adcp/config.py b/src/adcp/config.py index ac25e3a2e..d95bba1c0 100644 --- a/src/adcp/config.py +++ b/src/adcp/config.py @@ -3,6 +3,7 @@ """Configuration management for AdCP CLI.""" import json +import os from pathlib import Path from typing import Any, cast @@ -12,7 +13,11 @@ def ensure_config_dir() -> None: """Ensure config directory exists.""" - CONFIG_DIR.mkdir(parents=True, exist_ok=True) + CONFIG_DIR.mkdir(parents=True, exist_ok=True, mode=0o700) + try: + CONFIG_DIR.chmod(0o700) + except OSError: + pass def load_config() -> dict[str, Any]: @@ -28,13 +33,23 @@ def save_config(config: dict[str, Any]) -> None: """Save configuration file with atomic write.""" ensure_config_dir() - # Write to temporary file first + # Write to temporary file first, with restrictive permissions before + # credentials hit disk. temp_file = CONFIG_FILE.with_suffix(".tmp") - with open(temp_file, "w") as f: + try: + temp_file.unlink() + except FileNotFoundError: + pass + fd = os.open(temp_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) + with os.fdopen(fd, "w") as f: json.dump(config, f, indent=2) # Atomic rename temp_file.replace(CONFIG_FILE) + try: + CONFIG_FILE.chmod(0o600) + except OSError: + pass def save_agent( diff --git a/src/adcp/decisioning/upstream.py b/src/adcp/decisioning/upstream.py index 9e44a82fa..4c184d50e 100644 --- a/src/adcp/decisioning/upstream.py +++ b/src/adcp/decisioning/upstream.py @@ -212,6 +212,7 @@ async def _get_client(self) -> httpx.AsyncClient: self._client = httpx.AsyncClient( base_url=self._base_url, timeout=self._timeout, + trust_env=False, limits=httpx.Limits( max_keepalive_connections=10, max_connections=20, diff --git a/src/adcp/exceptions.py b/src/adcp/exceptions.py index f2af977c2..ddb3a177e 100644 --- a/src/adcp/exceptions.py +++ b/src/adcp/exceptions.py @@ -425,6 +425,27 @@ def __init__( ADCPError.__init__(self, message, agent_id=agent_id, suggestion=suggestion) +class IdempotencyScopeError(ADCPTaskError): + """Server cannot safely scope an idempotency_key to an authenticated caller.""" + + def __init__(self, operation: str, agent_id: str | None = None): + self.operation = operation + self.errors = [ + { + "code": "INVALID_REQUEST", + "message": "idempotency_key requires authenticated caller_identity", + } + ] + self.error_codes = ["INVALID_REQUEST"] + message = f"{operation}: idempotency_key requires authenticated caller_identity" + suggestion = ( + "Populate ToolContext.caller_identity from the authenticated principal before " + "using idempotency replay protection. Rejecting the request avoids a shared " + "cross-principal idempotency namespace." + ) + ADCPError.__init__(self, message, agent_id=agent_id, suggestion=suggestion) + + class IdempotencyUnsupportedError(ADCPError): """Seller does not support idempotency replay protection on mutating requests. diff --git a/src/adcp/protocols/a2a.py b/src/adcp/protocols/a2a.py index 446302497..39d2f9caf 100644 --- a/src/adcp/protocols/a2a.py +++ b/src/adcp/protocols/a2a.py @@ -257,6 +257,7 @@ async def _get_httpx_client(self) -> httpx.AsyncClient: "limits": limits, "headers": headers, "timeout": self.agent_config.timeout, + "trust_env": False, } if self.signing_request_hook is not None: event_hooks["request"] = [self.signing_request_hook] diff --git a/src/adcp/protocols/mcp.py b/src/adcp/protocols/mcp.py index ee895e4a1..6ea740aba 100644 --- a/src/adcp/protocols/mcp.py +++ b/src/adcp/protocols/mcp.py @@ -30,7 +30,7 @@ from mcp import ClientSession as _ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import MCP_SESSION_ID, streamablehttp_client - from mcp.shared._httpx_utils import create_mcp_http_client + from mcp.shared._httpx_utils import MCP_DEFAULT_SSE_READ_TIMEOUT, MCP_DEFAULT_TIMEOUT MCP_AVAILABLE = True except ImportError: @@ -71,6 +71,35 @@ _MAX_TEXT_SIZE_BYTES = 1_048_576 # 1MB cap on text items before JSON.parse +def _make_hardened_mcp_http_factory() -> Callable[..., httpx.AsyncClient]: + """Build an MCP HTTP client factory that ignores proxy environment variables.""" + + def factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + **extra: Any, + ) -> httpx.AsyncClient: + kwargs: dict[str, Any] = { + "follow_redirects": True, + **extra, + "trust_env": False, + } + if timeout is None: + kwargs["timeout"] = _httpx.Timeout( + MCP_DEFAULT_TIMEOUT, read=MCP_DEFAULT_SSE_READ_TIMEOUT + ) + else: + kwargs["timeout"] = timeout + if headers is not None: + kwargs["headers"] = headers + if auth is not None: + kwargs["auth"] = auth + return _httpx.AsyncClient(**kwargs) + + return factory + + def _make_signing_http_factory( hook: Callable[[httpx.Request], Awaitable[None]], ) -> Callable[..., httpx.AsyncClient]: @@ -93,9 +122,10 @@ def factory( # Forward any future MCP-SDK kwargs (e.g. verify=, cert=) verbatim # so adding a new factory parameter upstream doesn't break signing. kwargs: dict[str, Any] = { + **extra, "follow_redirects": False, "event_hooks": {"request": [hook]}, - **extra, + "trust_env": False, } if timeout is not None: kwargs["timeout"] = timeout @@ -244,10 +274,10 @@ def _urls_to_try(self) -> list[str]: return urls_to_try def _streamable_http_client_factory(self) -> Callable[..., httpx.AsyncClient]: - """Return the HTTP client factory used for streamable-http requests.""" + """Return the HTTP client factory used for MCP HTTP requests.""" if self.signing_request_hook is not None: return _make_signing_http_factory(self.signing_request_hook) - return create_mcp_http_client + return _make_hardened_mcp_http_factory() def current_mcp_session_id(self) -> str | None: """Return the current SDK-managed MCP Streamable HTTP session id.""" @@ -371,21 +401,23 @@ async def _get_session(self) -> ClientSession: # RFC 9421 auto-signing: if ADCPClient installed a signing request # hook, wire it into streamable_http via a custom httpx client - # factory. SSE transport has no equivalent knob — warn the user - # and fall through to unsigned SSE. - streamable_http_extra: dict[str, Any] = {} - if self.signing_request_hook is not None: - if self.agent_config.mcp_transport == "streamable_http": - streamable_http_extra["httpx_client_factory"] = ( - self._streamable_http_client_factory() - ) - else: - logger.warning( - "RFC 9421 auto-signing is not supported on MCP SSE " - "transport for agent %s; use mcp_transport='streamable_http' " - "to sign outgoing requests.", - self.agent_config.id, - ) + # factory. SSE transport has no equivalent signing knob — warn the + # user and fall through to unsigned SSE. Both HTTP transports use + # SDK-owned factories with trust_env=False so auth headers are not + # sent through ambient proxy settings. + streamable_http_extra: dict[str, Any] = { + "httpx_client_factory": self._streamable_http_client_factory() + } + if ( + self.signing_request_hook is not None + and self.agent_config.mcp_transport != "streamable_http" + ): + logger.warning( + "RFC 9421 auto-signing is not supported on MCP SSE " + "transport for agent %s; use mcp_transport='streamable_http' " + "to sign outgoing requests.", + self.agent_config.id, + ) last_error = None for url in urls_to_try: @@ -405,7 +437,11 @@ async def _get_session(self) -> ClientSession: else: # Use SSE transport (legacy, but widely supported) read, write = await self._exit_stack.enter_async_context( - sse_client(url, headers=headers) + sse_client( + url, + headers=headers, + httpx_client_factory=_make_hardened_mcp_http_factory(), + ) ) self._session = await self._exit_stack.enter_async_context( diff --git a/src/adcp/registry.py b/src/adcp/registry.py index 7a3cfc03e..d50aa35c9 100644 --- a/src/adcp/registry.py +++ b/src/adcp/registry.py @@ -130,6 +130,7 @@ async def _get_client(self) -> httpx.AsyncClient: max_keepalive_connections=10, max_connections=20, ), + trust_env=False, ) return self._owned_client diff --git a/src/adcp/server/idempotency/store.py b/src/adcp/server/idempotency/store.py index 1ca8a352e..fbd558835 100644 --- a/src/adcp/server/idempotency/store.py +++ b/src/adcp/server/idempotency/store.py @@ -40,7 +40,7 @@ from pydantic import BaseModel -from adcp.exceptions import IdempotencyConflictError +from adcp.exceptions import IdempotencyConflictError, IdempotencyScopeError from adcp.server.idempotency.backends import CachedResponse, IdempotencyBackend from adcp.server.idempotency.canonicalize import canonical_json_sha256 @@ -162,7 +162,8 @@ def wrap(self, handler: HandlerFn) -> HandlerFn: async def _wrapped(*args: Any, **kwargs: Any) -> Any: handler_self, hash_source, context = _resolve_call_args(args, kwargs) - scope_key, idempotency_key, params_dict = self._prepare(hash_source, context) + operation = getattr(handler, "__name__", "handler") + scope_key, idempotency_key, params_dict = self._prepare(hash_source, context, operation) if scope_key is None or idempotency_key is None: # No key → spec says the server MUST reject with INVALID_REQUEST. # We let the handler run so validation layers above us (Pydantic, @@ -200,7 +201,7 @@ async def _wrapped(*args: Any, **kwargs: Any) -> Any: return replay # Same key, different payload — spec-defined conflict. raise IdempotencyConflictError( - operation=getattr(handler, "__name__", "handler"), + operation=operation, errors=[ { "code": "IDEMPOTENCY_CONFLICT", @@ -264,7 +265,9 @@ async def _wrapped(*args: Any, **kwargs: Any) -> Any: _WRAPPED_FUNCTIONS.add(_wrapped) return _wrapped - def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, dict[str, Any]]: + def _prepare( + self, params: Any, context: Any, operation: str + ) -> tuple[str | None, str | None, dict[str, Any]]: """Normalize inputs and extract the (scope_key, key, params_dict) tuple. ``scope_key`` composes ``tenant_id`` (when present) with @@ -272,9 +275,9 @@ def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, d if the seller's principal IDs are only unique within each tenant. Returns ``(None, None, params_dict)`` when idempotency doesn't apply - (no caller identity or no key supplied). The caller falls through to - the plain handler in that case — validation of missing-key lives in - the request schema, not here. + because no key was supplied. If a key is supplied without caller + identity, raises :class:`IdempotencyScopeError` so the request fails + closed before side effects execute. """ params_dict = _to_dict(params) idempotency_key = params_dict.get("idempotency_key") @@ -284,10 +287,10 @@ def _prepare(self, params: Any, context: Any) -> tuple[str | None, str | None, d if scope_key is None: # No caller identity: we can't safely scope the key. Spec requires # per-principal scope; anything else is a cross-principal replay - # attack surface. Fall through to the handler (which will process - # the request normally — no dedup, but no security regression). + # attack surface. Fail closed instead of executing an unscoped + # idempotent operation. self._warn_missing_principal_once() - return None, None, params_dict + raise IdempotencyScopeError(operation=operation) return scope_key, idempotency_key, params_dict _missing_principal_warned: bool = False @@ -305,7 +308,7 @@ def _warn_missing_principal_once(self) -> None: self._missing_principal_warned = True warnings.warn( "IdempotencyStore received a request with idempotency_key but no " - "caller_identity on ToolContext — dedup is SKIPPED. This usually " + "caller_identity on ToolContext — request is rejected. This usually " "means your transport isn't populating the authenticated principal. " "A2A: wire an a2a-sdk auth middleware that sets ServerCallContext.user; " "MCP: populate ToolContext.caller_identity from your FastMCP auth " diff --git a/src/adcp/types/core.py b/src/adcp/types/core.py index 4ce0b9fa9..92aa3822a 100644 --- a/src/adcp/types/core.py +++ b/src/adcp/types/core.py @@ -2,12 +2,31 @@ from __future__ import annotations +import ipaddress from enum import Enum from typing import Any, Generic, Literal, TypeVar +from urllib.parse import urlparse from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator +def is_loopback_http_uri(uri: str) -> bool: + """Return True for HTTP URIs that target the local machine.""" + parsed = urlparse(uri) + if parsed.scheme != "http": + return False + host = parsed.hostname + if host is None: + return False + normalized = host.lower().rstrip(".") + if normalized == "localhost" or normalized.endswith(".localhost"): + return True + try: + return ipaddress.ip_address(normalized).is_loopback + except ValueError: + return False + + class Protocol(str, Enum): """Supported protocols.""" @@ -103,7 +122,17 @@ def validate_auth_type(cls, v: str) -> str: return v @model_validator(mode="after") - def _validate_extra_headers(self) -> AgentConfig: + def _validate_security_constraints(self) -> AgentConfig: + if ( + self.auth_token + and self.agent_uri.startswith("http://") + and not is_loopback_http_uri(self.agent_uri) + ): + raise ValueError( + "auth_token requires an https:// agent_uri for non-loopback hosts; " + "plain HTTP is only allowed for localhost/loopback development" + ) + if not self.extra_headers: return self reserved = {self.auth_header.lower(), "authorization"} diff --git a/tests/conformance/signing/test_autosign_e2e.py b/tests/conformance/signing/test_autosign_e2e.py index c48efd788..ddd6658d3 100644 --- a/tests/conformance/signing/test_autosign_e2e.py +++ b/tests/conformance/signing/test_autosign_e2e.py @@ -119,7 +119,7 @@ def signing_config() -> SigningConfig: def signing_client(signing_config: SigningConfig) -> ADCPClient: agent = AgentConfig( id="smoke-seller", - agent_uri="http://verifier.test", + agent_uri="https://verifier.test", protocol=Protocol.A2A, ) return ADCPClient(agent, signing=signing_config) @@ -157,7 +157,7 @@ async def test_hook_on_real_httpx_round_trip_accepted_by_verifier( async with httpx.AsyncClient( transport=transport, - base_url="http://verifier.test", + base_url="https://verifier.test", event_hooks={"request": [signing_client._sign_outgoing_request]}, ) as client: token = current_operation.set(operation) @@ -188,7 +188,7 @@ async def test_hook_skips_when_context_var_unset_server_rejects( async with httpx.AsyncClient( transport=transport, - base_url="http://verifier.test", + base_url="https://verifier.test", event_hooks={"request": [signing_client._sign_outgoing_request]}, ) as client: # No current_operation.set — the hook will see None and skip. @@ -221,7 +221,7 @@ async def test_hook_skips_get_adcp_capabilities_server_rejects_if_listed( async with httpx.AsyncClient( transport=transport, - base_url="http://verifier.test", + base_url="https://verifier.test", event_hooks={"request": [signing_client._sign_outgoing_request]}, ) as client: token = current_operation.set("get_adcp_capabilities") diff --git a/tests/conformance/signing/test_autosign_mcp.py b/tests/conformance/signing/test_autosign_mcp.py index 5397046f8..f3519c230 100644 --- a/tests/conformance/signing/test_autosign_mcp.py +++ b/tests/conformance/signing/test_autosign_mcp.py @@ -108,8 +108,7 @@ async def test_sse_transport_with_signing_logs_warning_and_skips( warnings = [r for r in caplog.records if r.levelno == logging.WARNING] assert any( - "RFC 9421 auto-signing is not supported on MCP SSE" in r.getMessage() - for r in warnings + "RFC 9421 auto-signing is not supported on MCP SSE" in r.getMessage() for r in warnings ), [r.getMessage() for r in warnings] @@ -135,13 +134,17 @@ def _fake_streamable_http(*args: Any, **kwargs: Any) -> Any: factory = captured_kwargs["httpx_client_factory"] # Sanity check: the factory produces an AsyncClient with the signing hook. client = factory() - assert client.follow_redirects is False - assert client.event_hooks["request"] == [adapter.signing_request_hook] + try: + assert client.follow_redirects is False + assert client.event_hooks["request"] == [adapter.signing_request_hook] + finally: + await client.aclose() -async def test_streamable_http_without_signing_no_factory_kwarg() -> None: +async def test_streamable_http_without_signing_wires_hardened_factory() -> None: adapter = _make_mcp_adapter("streamable_http") - # No signing_request_hook installed → factory kwarg not added. + # No signing_request_hook installed → hardened unsigned factory still + # prevents auth headers from following ambient proxy environment settings. captured_kwargs: dict[str, Any] = {} @@ -153,7 +156,14 @@ def _fake_streamable_http(*args: Any, **kwargs: Any) -> Any: with pytest.raises(Exception): await adapter._get_session() - assert "httpx_client_factory" not in captured_kwargs + assert "httpx_client_factory" in captured_kwargs + factory = captured_kwargs["httpx_client_factory"] + client = factory(trust_env=True) + try: + assert client.trust_env is False + assert client.follow_redirects is True + finally: + await client.aclose() # -- ContextVar scope around call_tool ---------------------------------- diff --git a/tests/test_adagents.py b/tests/test_adagents.py index ec8461008..c7ef44048 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -5099,6 +5099,57 @@ def _entry( entry["property_ids"] = property_ids return entry + async def test_default_path_leaves_client_creation_to_hardened_fetchers(self, monkeypatch): + """No shared client is created, preserving per-target SSRF/IP pinning.""" + from adcp import adagents as adagents_mod + from adcp.adagents import ( + AgentAuthorizationsDirectoryResult, + DirectoryPublisherEntry, + detect_publisher_properties_divergence, + ) + + client_args = [] + + async def fake_fetch_directory(*args, client=None, **kwargs): + client_args.append(("directory", client)) + return AgentAuthorizationsDirectoryResult( + agent_url="https://agent.example.com/", + directory_indexed_at="2026-05-20T12:00:00Z", + publishers=[ + DirectoryPublisherEntry( + publisher_domain="nytimes.example", + discovery_method="direct", + properties_authorized=1, + properties_total=1, + status="authorized", + last_verified_at="2026-05-20T11:50:00Z", + property_ids=["p1"], + ) + ], + ) + + async def fake_fetch_adagents(domain, timeout=10.0, client=None): + client_args.append(("federated", client)) + return {} + + monkeypatch.setattr( + adagents_mod, "fetch_agent_authorizations_from_directory", fake_fetch_directory + ) + monkeypatch.setattr(adagents_mod, "fetch_adagents", fake_fetch_adagents) + monkeypatch.setattr( + adagents_mod, + "get_properties_by_agent", + lambda data, agent_url: [{"property_id": "p1"}], + ) + + report = await detect_publisher_properties_divergence( + "https://agent.example.com/", + directory_url="https://aao.example.com", + ) + + assert report == [] + assert client_args == [("directory", None), ("federated", None)] + async def test_full_set_diff_when_property_ids_present(self, monkeypatch): """Directory says {p1,p2}; federated returns {p2,p3} → set-diff reported.""" from adcp import adagents as adagents_mod diff --git a/tests/test_cli.py b/tests/test_cli.py index eaef9e812..518a31b74 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,7 @@ """ import json +import stat import subprocess import sys import tempfile @@ -163,6 +164,7 @@ def test_resolve_saved_alias(self, tmp_path, monkeypatch): # Monkey-patch CONFIG_FILE import adcp.config + monkeypatch.setattr(adcp.config, "CONFIG_DIR", tmp_path) monkeypatch.setattr(adcp.config, "CONFIG_FILE", config_file) config = resolve_agent_config("myagent") @@ -184,6 +186,7 @@ def test_save_agent_command(self, tmp_path, monkeypatch): import adcp.config + monkeypatch.setattr(adcp.config, "CONFIG_DIR", tmp_path) monkeypatch.setattr(adcp.config, "CONFIG_FILE", config_file) # Save agent @@ -195,6 +198,20 @@ def test_save_agent_command(self, tmp_path, monkeypatch): assert config["agents"]["test_agent"]["agent_uri"] == "https://test.com" assert config["agents"]["test_agent"]["auth_token"] == "secret_token" + def test_save_agent_restricts_config_permissions(self, tmp_path, monkeypatch): + """Token-bearing CLI config should be owner-readable only.""" + config_file = tmp_path / "config.json" + + import adcp.config + + monkeypatch.setattr(adcp.config, "CONFIG_DIR", tmp_path) + monkeypatch.setattr(adcp.config, "CONFIG_FILE", config_file) + + save_agent("test_agent", "https://test.com", "mcp", "secret_token") + + assert stat.S_IMODE(tmp_path.stat().st_mode) == 0o700 + assert stat.S_IMODE(config_file.stat().st_mode) == 0o600 + def test_save_agent_persists_extra_headers(self, tmp_path, monkeypatch): """save_agent writes extra_headers into the saved config.""" config_file = tmp_path / "config.json" @@ -202,6 +219,7 @@ def test_save_agent_persists_extra_headers(self, tmp_path, monkeypatch): import adcp.config + monkeypatch.setattr(adcp.config, "CONFIG_DIR", tmp_path) monkeypatch.setattr(adcp.config, "CONFIG_FILE", config_file) save_agent( @@ -237,6 +255,7 @@ def test_list_agents_command(self, tmp_path, monkeypatch): import adcp.config + monkeypatch.setattr(adcp.config, "CONFIG_DIR", tmp_path) monkeypatch.setattr(adcp.config, "CONFIG_FILE", config_file) # Set environment variable to override config file location for subprocess diff --git a/tests/test_security_hardening.py b/tests/test_security_hardening.py new file mode 100644 index 000000000..d827bb87c --- /dev/null +++ b/tests/test_security_hardening.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import pytest +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from adcp import ADCPClient +from adcp.decisioning.upstream import NoAuth, UpstreamHttpClient +from adcp.protocols import mcp as mcp_mod +from adcp.protocols.a2a import A2AAdapter +from adcp.protocols.mcp import ( + MCPAdapter, + _make_hardened_mcp_http_factory, + _make_signing_http_factory, +) +from adcp.registry import RegistryClient +from adcp.signing.autosign import SigningConfig +from adcp.types.core import AgentConfig, Protocol + + +def test_auth_token_rejects_non_loopback_plaintext_http() -> None: + with pytest.raises(ValueError, match="auth_token requires an https://"): + AgentConfig( + id="remote", + agent_uri="http://seller.example.com/mcp", + protocol=Protocol.MCP, + auth_token="secret", + ) + + +def test_auth_token_allows_loopback_plaintext_http() -> None: + cfg = AgentConfig( + id="local", + agent_uri="http://127.0.0.1:8000/mcp", + protocol=Protocol.MCP, + auth_token="secret", + ) + assert cfg.agent_uri.startswith("http://127.0.0.1") + + +def test_request_signing_rejects_non_loopback_plaintext_http() -> None: + signing = SigningConfig( + private_key=ed25519.Ed25519PrivateKey.generate(), + key_id="buyer-test-key", + ) + cfg = AgentConfig( + id="remote", + agent_uri="http://seller.example.com/mcp", + protocol=Protocol.MCP, + ) + + with pytest.raises(ValueError, match="request signing requires an https://"): + ADCPClient(cfg, signing=signing) + + +@pytest.mark.asyncio +async def test_a2a_owned_client_ignores_proxy_environment() -> None: + adapter = A2AAdapter( + AgentConfig(id="a2a", agent_uri="https://seller.example.com", protocol=Protocol.A2A) + ) + try: + client = await adapter._get_httpx_client() + assert client.trust_env is False + finally: + await adapter.close() + + +@pytest.mark.asyncio +async def test_registry_owned_client_ignores_proxy_environment() -> None: + registry = RegistryClient("https://registry.example.com") + try: + client = await registry._get_client() + assert client.trust_env is False + finally: + await registry.close() + + +@pytest.mark.asyncio +async def test_upstream_owned_client_ignores_proxy_environment() -> None: + upstream = UpstreamHttpClient(base_url="https://upstream.example.com", auth=NoAuth()) + try: + client = await upstream._get_client() + assert client.trust_env is False + finally: + await upstream.aclose() + + +@pytest.mark.asyncio +async def test_mcp_unsigned_client_factory_ignores_proxy_environment() -> None: + factory = _make_hardened_mcp_http_factory() + client = factory(trust_env=True, follow_redirects=False) + try: + assert client.trust_env is False + assert client.follow_redirects is False + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_unsigned_streamable_http_adapter_uses_hardened_factory() -> None: + adapter = MCPAdapter( + AgentConfig(id="mcp", agent_uri="https://seller.example.com/mcp", protocol=Protocol.MCP) + ) + factory = adapter._streamable_http_client_factory() + client = factory(trust_env=True) + try: + assert client.trust_env is False + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_mcp_sse_adapter_passes_hardened_factory(monkeypatch) -> None: + captured: dict[str, object] = {} + + class FakeSseContext: + async def __aenter__(self): + return object(), object() + + async def __aexit__(self, *args): + return None + + class FakeSession: + def __init__(self, read, write): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return None + + async def initialize(self): + return None + + def fake_sse_client(url, **kwargs): + captured.update(kwargs) + return FakeSseContext() + + monkeypatch.setattr(mcp_mod, "sse_client", fake_sse_client) + monkeypatch.setattr(mcp_mod, "_ClientSession", FakeSession) + + adapter = MCPAdapter( + AgentConfig( + id="mcp-sse", + agent_uri="https://seller.example.com/sse", + protocol=Protocol.MCP, + mcp_transport="sse", + auth_token="secret", + ) + ) + try: + await adapter._get_session() + factory = captured["httpx_client_factory"] + assert callable(factory) + client = factory(trust_env=True) + try: + assert client.trust_env is False + finally: + await client.aclose() + finally: + await adapter.close() + + +@pytest.mark.asyncio +async def test_mcp_signing_client_factory_ignores_proxy_environment() -> None: + async def hook(request): + return None + + factory = _make_signing_http_factory(hook) + client = factory(trust_env=True, follow_redirects=True, event_hooks={}) + try: + assert client.trust_env is False + assert client.follow_redirects is False + assert client.event_hooks["request"] == [hook] + finally: + await client.aclose() diff --git a/tests/test_server_caller_identity.py b/tests/test_server_caller_identity.py index 268771fd2..2d87b60af 100644 --- a/tests/test_server_caller_identity.py +++ b/tests/test_server_caller_identity.py @@ -4,8 +4,8 @@ A2A ``ServerCallContext.user`` / sellers' FastMCP auth middleware) and the server-side middleware layer (idempotency per-principal scoping, future audit logging). Without this wiring, ``ToolContext.caller_identity`` is -always ``None`` and the idempotency middleware's fail-closed path skips -dedup entirely — effectively inert in production. +always ``None`` and the idempotency middleware rejects keyed requests +before side effects execute. """ from __future__ import annotations @@ -15,6 +15,7 @@ import pytest +from adcp.exceptions import IdempotencyScopeError from adcp.server.a2a_server import ADCPAgentExecutor, _tool_context_from_request from adcp.server.base import ADCPHandler, ToolContext from adcp.server.idempotency import IdempotencyStore, MemoryBackend @@ -177,8 +178,8 @@ async def create_media_buy( assert r_a["media_buy_id"] != r_b["media_buy_id"] @pytest.mark.asyncio - async def test_unauthenticated_falls_through_no_dedup(self) -> None: - """Without a principal, the middleware's fail-closed path skips dedup.""" + async def test_unauthenticated_rejects_idempotency_key(self) -> None: + """Without a principal, the middleware fails closed before side effects.""" store = IdempotencyStore(backend=MemoryBackend(), ttl_seconds=86400) class Seller(ADCPHandler): @@ -200,10 +201,10 @@ async def create_media_buy( anon_ctx = _tool_context_from_request( _FakeRequestContext(user=_FakeUser("", authenticated=False)) ) - await executor._tool_callers["create_media_buy"](params, anon_ctx) - await executor._tool_callers["create_media_buy"](params, anon_ctx) - # Both executed — no principal to scope by, middleware skipped dedup. - assert seller.calls == 2 + with pytest.warns(UserWarning, match="request is rejected"): + with pytest.raises(IdempotencyScopeError): + await executor._tool_callers["create_media_buy"](params, anon_ctx) + assert seller.calls == 0 class TestA2AExecutorUsesRealContext: diff --git a/tests/test_server_idempotency.py b/tests/test_server_idempotency.py index 3a7250f37..1268ea237 100644 --- a/tests/test_server_idempotency.py +++ b/tests/test_server_idempotency.py @@ -10,7 +10,7 @@ import pytest from pydantic import BaseModel -from adcp.exceptions import IdempotencyConflictError +from adcp.exceptions import IdempotencyConflictError, IdempotencyScopeError from adcp.server.base import ToolContext from adcp.server.idempotency import ( EXCLUDED_FIELDS, @@ -577,19 +577,6 @@ async def test_replay_flag_does_not_poison_cached_entry(self) -> None: assert r3.get("replayed") is True assert "extra" not in r3 - -def _extract_first_entry(store: IdempotencyStore) -> tuple[str, str, CachedResponse]: - """Helper to read out the single entry from a MemoryBackend used - in tests. Returns ``(scope_key, idempotency_key, entry)``. Only - valid for tests that have stored exactly one entry.""" - backend = store.backend - # MemoryBackend stores entries in ``backend._store`` as - # ``{(scope_key, idempotency_key): CachedResponse}``. - entries = list(backend._store.items()) - assert len(entries) == 1, f"expected one entry, found {len(entries)}" - (scope_key, idempotency_key), entry = entries[0] - return scope_key, idempotency_key, entry - @pytest.mark.asyncio async def test_cache_hit_different_payload_raises_conflict(self) -> None: store = self._make_store() @@ -703,24 +690,25 @@ async def test_no_idempotency_key_falls_through(self) -> None: assert r1 != r2 @pytest.mark.asyncio - async def test_no_caller_identity_falls_through(self) -> None: - # Fail-closed: without a principal we can't safely scope the key, - # so skip dedup rather than collapse every buyer into one namespace. + async def test_no_caller_identity_rejected(self) -> None: + # Fail-closed: without a principal we can't safely scope the key. # Also fires a one-time UserWarning so operators notice. store = self._make_store() handler = _FakeHandler() wrapped = store.wrap(_FakeHandler.create_media_buy) params = {"idempotency_key": str(uuid.uuid4()), "brand": "A"} - with pytest.warns(UserWarning, match="dedup is SKIPPED"): - r1 = await wrapped(handler, params, None) + with pytest.warns(UserWarning, match="request is rejected"): + with pytest.raises(IdempotencyScopeError) as exc_info: + await wrapped(handler, params, None) + assert exc_info.value.error_codes == ["INVALID_REQUEST"] # Second call in the same store: warning must NOT fire again. import warnings as _warnings with _warnings.catch_warnings(): _warnings.simplefilter("error") - r2 = await wrapped(handler, params, None) - assert handler.call_count == 2 - assert r1 != r2 + with pytest.raises(IdempotencyScopeError): + await wrapped(handler, params, None) + assert handler.call_count == 0 @pytest.mark.asyncio async def test_context_as_dict(self) -> None: @@ -753,6 +741,17 @@ class Req(BaseModel): assert handler.call_count == 1 +def _extract_first_entry(store: IdempotencyStore) -> tuple[str, str, CachedResponse]: + """Read the single MemoryBackend entry used by these tests.""" + backend = store.backend + # MemoryBackend stores entries in ``backend._store`` as + # ``{(scope_key, idempotency_key): CachedResponse}``. + entries = list(backend._store.items()) + assert len(entries) == 1, f"expected one entry, found {len(entries)}" + (scope_key, idempotency_key), entry = entries[0] + return scope_key, idempotency_key, entry + + class TestInstanceMethodDecorator: """Exercise the canonical `@idempotency.wrap` shape on an instance method."""