Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def _resolve_version() -> str:
"ConfigurationError",
"IdempotencyConflictError",
"IdempotencyExpiredError",
"IdempotencyScopeError",
"IdempotencyUnsupportedError",
"RegistryError",
),
Expand Down Expand Up @@ -1344,6 +1345,7 @@ def get_adcp_version() -> str:
ConfigurationError,
IdempotencyConflictError,
IdempotencyExpiredError,
IdempotencyScopeError,
IdempotencyUnsupportedError,
RegistryError,
)
Expand Down
211 changes: 102 additions & 109 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2514,131 +2514,124 @@ async def detect_publisher_properties_divergence(
an empty list means counts agree but set-equality is not
guaranteed.
"""
own_client = client is None
http = client or httpx.AsyncClient()
try:
collected: list[DirectoryPublisherEntry] = []
cursor: str | None = None
seen_cursors: set[str] = set()
page_count = 0
while True:
page = await fetch_agent_authorizations_from_directory(
agent_url,
directory_url=directory_url,
cursor=cursor,
include=["properties"],
timeout=timeout,
client=http,
http = client
collected: list[DirectoryPublisherEntry] = []
cursor: str | None = None
seen_cursors: set[str] = set()
page_count = 0
while True:
page = await fetch_agent_authorizations_from_directory(
agent_url,
directory_url=directory_url,
cursor=cursor,
include=["properties"],
timeout=timeout,
client=http,
)
page_count += 1
collected.extend(page.publishers)
if sample_size is not None and len(collected) >= sample_size:
collected = collected[:sample_size]
break
cursor = page.next_cursor
if not cursor:
break
if cursor in seen_cursors:
raise AdagentsValidationError(
f"Directory page cursor {cursor!r} repeated — refusing to loop forever."
)
page_count += 1
collected.extend(page.publishers)
if sample_size is not None and len(collected) >= sample_size:
collected = collected[:sample_size]
break
cursor = page.next_cursor
if not cursor:
break
if cursor in seen_cursors:
raise AdagentsValidationError(
f"Directory page cursor {cursor!r} repeated — refusing to loop forever."
)
seen_cursors.add(cursor)
if page_count >= MAX_DIRECTORY_PAGES:
raise AdagentsValidationError(
f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep."
)

# Dedupe by publisher_domain before fan-out: a hostile directory
# returning N rows for the same publisher would otherwise amplify
# into N concurrent fetches against a single victim host. First
# occurrence wins (deterministic) — conflicting property_ids /
# properties_authorized across duplicates are dropped here; the
# directory's behavior is itself a divergence signal for ops.
seen_domains: set[str] = set()
deduped: list[DirectoryPublisherEntry] = []
for entry in collected:
if entry.publisher_domain in seen_domains:
continue
seen_domains.add(entry.publisher_domain)
deduped.append(entry)
collected = deduped

# Emit a one-shot warning when the entire sample comes back without
# property_ids[]. In count-only mode, same-count substitutions are
# undetectable — adopters should pin include=["properties"] support
# on directories that offer it.
if collected and all(e.property_ids is None for e in collected):
logger.warning(
"AAO directory %s did not return property_ids[] on any publisher "
"entry — falling back to count-only divergence detection. Same-count "
"substitutions are undetectable in this mode. Upgrade the directory "
"or pin include=['properties'] support.",
directory_url,
seen_cursors.add(cursor)
if page_count >= MAX_DIRECTORY_PAGES:
raise AdagentsValidationError(
f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep."
)

sem = asyncio.Semaphore(max_concurrency)
# Dedupe by publisher_domain before fan-out: a hostile directory
# returning N rows for the same publisher would otherwise amplify
# into N concurrent fetches against a single victim host. First
# occurrence wins (deterministic) — conflicting property_ids /
# properties_authorized across duplicates are dropped here; the
# directory's behavior is itself a divergence signal for ops.
seen_domains: set[str] = set()
deduped: list[DirectoryPublisherEntry] = []
for entry in collected:
if entry.publisher_domain in seen_domains:
continue
seen_domains.add(entry.publisher_domain)
deduped.append(entry)
collected = deduped

# Emit a one-shot warning when the entire sample comes back without
# property_ids[]. In count-only mode, same-count substitutions are
# undetectable — adopters should pin include=["properties"] support
# on directories that offer it.
if collected and all(e.property_ids is None for e in collected):
logger.warning(
"AAO directory %s did not return property_ids[] on any publisher "
"entry — falling back to count-only divergence detection. Same-count "
"substitutions are undetectable in this mode. Upgrade the directory "
"or pin include=['properties'] support.",
directory_url,
)

async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None:
async with sem:
try:
data = await fetch_adagents(
entry.publisher_domain, timeout=timeout, client=http
)
federated_props = get_properties_by_agent(data, agent_url)
# Falsy/empty property_id is silently dropped: upstream
# schema requires a non-empty string, so an empty value
# is a structural violation that belongs in
# validate_adagents, not a divergence signal. Federated
# properties with valid IDs only.
federated_ids = {
str(p.get("property_id")) for p in federated_props if p.get("property_id")
}
except (
AdagentsNotFoundError,
AdagentsValidationError,
AdagentsTimeoutError,
httpx.HTTPError,
OSError,
ValueError,
) as exc:
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=0,
missing_in_inline=None,
missing_in_federated=None,
child_fetch_error=str(exc),
)
sem = asyncio.Semaphore(max_concurrency)

if entry.property_ids is not None:
# Full set-diff path (adcp#4894).
dir_ids = set(entry.property_ids)
missing_in_inline = sorted(federated_ids - dir_ids)
missing_in_federated = sorted(dir_ids - federated_ids)
if not missing_in_inline and not missing_in_federated:
return None
async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None:
async with sem:
try:
data = await fetch_adagents(entry.publisher_domain, timeout=timeout, client=http)
federated_props = get_properties_by_agent(data, agent_url)
# Falsy/empty property_id is silently dropped: upstream
# schema requires a non-empty string, so an empty value
# is a structural violation that belongs in
# validate_adagents, not a divergence signal. Federated
# properties with valid IDs only.
federated_ids = {
str(p.get("property_id")) for p in federated_props if p.get("property_id")
}
except (
AdagentsNotFoundError,
AdagentsValidationError,
AdagentsTimeoutError,
httpx.HTTPError,
OSError,
ValueError,
) as exc:
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=len(federated_ids),
missing_in_inline=missing_in_inline,
missing_in_federated=missing_in_federated,
federated_properties_found=0,
missing_in_inline=None,
missing_in_federated=None,
child_fetch_error=str(exc),
)

# Count-only fallback (older directories).
if len(federated_ids) == entry.properties_authorized:
if entry.property_ids is not None:
# Full set-diff path (adcp#4894).
dir_ids = set(entry.property_ids)
missing_in_inline = sorted(federated_ids - dir_ids)
missing_in_federated = sorted(dir_ids - federated_ids)
if not missing_in_inline and not missing_in_federated:
return None
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=len(federated_ids),
missing_in_inline=None,
missing_in_federated=None,
missing_in_inline=missing_in_inline,
missing_in_federated=missing_in_federated,
)

probes = await asyncio.gather(*[_probe(e) for e in collected])
finally:
if own_client:
await http.aclose()
# Count-only fallback (older directories).
if len(federated_ids) == entry.properties_authorized:
return None
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=len(federated_ids),
missing_in_inline=None,
missing_in_federated=None,
)

probes = await asyncio.gather(*[_probe(e) for e in collected])

return [p for p in probes if p is not None]
10 changes: 10 additions & 0 deletions src/adcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
Protocol,
TaskResult,
TaskStatus,
is_loopback_http_uri,
)

# V3 Governance (Sync Governance) types
Expand Down Expand Up @@ -537,6 +538,15 @@ def __init__(
self.validate_features = validate_features
self.strict_idempotency = strict_idempotency
self.signing = signing
if (
signing is not None
and agent_config.agent_uri.startswith("http://")
and not is_loopback_http_uri(agent_config.agent_uri)
):
raise ValueError(
"request signing requires an https:// agent_uri for non-loopback hosts; "
"plain HTTP is only allowed for localhost/loopback development"
)

# Capabilities cache
self._capabilities: GetAdcpCapabilitiesResponse | None = None
Expand Down
21 changes: 18 additions & 3 deletions src/adcp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Configuration management for AdCP CLI."""

import json
import os
from pathlib import Path
from typing import Any, cast

Expand All @@ -12,7 +13,11 @@

def ensure_config_dir() -> None:
"""Ensure config directory exists."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
CONFIG_DIR.mkdir(parents=True, exist_ok=True, mode=0o700)
try:
CONFIG_DIR.chmod(0o700)
except OSError:
pass


def load_config() -> dict[str, Any]:
Expand All @@ -28,13 +33,23 @@ def save_config(config: dict[str, Any]) -> None:
"""Save configuration file with atomic write."""
ensure_config_dir()

# Write to temporary file first
# Write to temporary file first, with restrictive permissions before
# credentials hit disk.
temp_file = CONFIG_FILE.with_suffix(".tmp")
with open(temp_file, "w") as f:
try:
temp_file.unlink()
except FileNotFoundError:
pass
fd = os.open(temp_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
with os.fdopen(fd, "w") as f:
json.dump(config, f, indent=2)

# Atomic rename
temp_file.replace(CONFIG_FILE)
try:
CONFIG_FILE.chmod(0o600)
except OSError:
pass


def save_agent(
Expand Down
1 change: 1 addition & 0 deletions src/adcp/decisioning/upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ async def _get_client(self) -> httpx.AsyncClient:
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._timeout,
trust_env=False,
limits=httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
Expand Down
21 changes: 21 additions & 0 deletions src/adcp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,27 @@ def __init__(
ADCPError.__init__(self, message, agent_id=agent_id, suggestion=suggestion)


class IdempotencyScopeError(ADCPTaskError):
"""Server cannot safely scope an idempotency_key to an authenticated caller."""

def __init__(self, operation: str, agent_id: str | None = None):
self.operation = operation
self.errors = [
{
"code": "INVALID_REQUEST",
"message": "idempotency_key requires authenticated caller_identity",
}
]
self.error_codes = ["INVALID_REQUEST"]
message = f"{operation}: idempotency_key requires authenticated caller_identity"
suggestion = (
"Populate ToolContext.caller_identity from the authenticated principal before "
"using idempotency replay protection. Rejecting the request avoids a shared "
"cross-principal idempotency namespace."
)
ADCPError.__init__(self, message, agent_id=agent_id, suggestion=suggestion)


class IdempotencyUnsupportedError(ADCPError):
"""Seller does not support idempotency replay protection on mutating requests.

Expand Down
1 change: 1 addition & 0 deletions src/adcp/protocols/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async def _get_httpx_client(self) -> httpx.AsyncClient:
"limits": limits,
"headers": headers,
"timeout": self.agent_config.timeout,
"trust_env": False,
}
if self.signing_request_hook is not None:
event_hooks["request"] = [self.signing_request_hook]
Expand Down
Loading
Loading