Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
caaff3f
⚙️ FEATURE-#287: Expand Storage ABC with delete and CAS primitives
FernandoCelmer May 2, 2026
287d0a4
⚙️ FEATURE-#287: Add S3 delete and list_keys helpers
FernandoCelmer May 2, 2026
17c1fc4
⚙️ FEATURE-#287: Add GCS delete and list_keys helpers
FernandoCelmer May 2, 2026
ac5f173
⚙️ FEATURE-#287: Implement new ABC surface in StorageDefault
FernandoCelmer May 2, 2026
1fe6bb1
⚙️ FEATURE-#287: Implement new ABC surface in StorageFile
FernandoCelmer May 2, 2026
e97f31e
⚙️ FEATURE-#287: Implement new ABC surface in StorageS3
FernandoCelmer May 2, 2026
174550e
⚙️ FEATURE-#287: Implement new ABC surface in StorageGCS
FernandoCelmer May 2, 2026
dda6c51
⚙️ FEATURE-#287: Add dotflow.testing public package
FernandoCelmer May 2, 2026
ac412b5
⚙️ FEATURE-#287: Add StorageContract reusable test suite
FernandoCelmer May 2, 2026
c745840
❤️ TEST-#287: Run StorageContract against StorageDefault
FernandoCelmer May 2, 2026
4a97472
❤️ TEST-#287: Run StorageContract against StorageFile
FernandoCelmer May 2, 2026
8f4aeb9
🪲 BUG-#287: Fix S3.delete exception and add conditional helpers
FernandoCelmer May 2, 2026
d187d08
⚙️ FEATURE-#287: Add GCS conditional write helpers
FernandoCelmer May 2, 2026
566537e
⚙️ FEATURE-#287: Pass ttl and fingerprint through atomic_swap
FernandoCelmer May 2, 2026
41db3c9
🪲 BUG-#287: Clear stale TTL on StorageDefault.atomic_swap
FernandoCelmer May 2, 2026
49acb07
🪲 BUG-#287: Serialize StorageFile public methods through lock
FernandoCelmer May 2, 2026
549ef8c
⚙️ FEATURE-#287: Use S3 conditional write for atomic_swap
FernandoCelmer May 2, 2026
be2f3c6
⚙️ FEATURE-#287: Use GCS generation precondition for atomic_swap
FernandoCelmer May 2, 2026
3124bf3
❤️ TEST-#287: Cover ttl propagation through atomic_swap
FernandoCelmer May 2, 2026
3660cc9
📝 PEP8-#287: Apply ruff format to S3.write_if_match signature
FernandoCelmer May 2, 2026
181e16a
🪲 BUG-#287: Keep new ABC methods optional in 1.x for backward compat
FernandoCelmer May 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions dotflow/abc/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Storage ABC"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Iterable
from typing import Any

from dotflow.core.context import Context

Expand All @@ -13,21 +16,46 @@ def __init__(self, *args, **kwargs):
pass

@abstractmethod
def post(self, key: str, context: Context) -> None:
"""Post context somewhere"""
def post(
self,
key: str,
context: Context,
ttl: int | None = None,
fingerprint: str | None = None,
) -> None:
"""Persist context under key."""

@abstractmethod
def get(self, key: str) -> Context:
"""Get context somewhere"""
"""Return stored context or empty Context()."""

@abstractmethod
def key(self, task: Callable):
"""Function that returns a key to get and post storage"""
def delete(self, key: str) -> bool:
"""Remove key. Returns True when present."""

@abstractmethod
def clear(self, workflow_id: str) -> None:
"""Remove every persisted entry under ``workflow_id``.
def delete_prefix(self, prefix: str) -> int:
"""Remove keys starting with prefix. Returns count."""

Used by the input-fingerprint reset path when
``on_input_change='reset'``.
"""
@abstractmethod
def list_keys(self, prefix: str) -> Iterable[str]:
"""Iterate keys starting with prefix."""

@abstractmethod
def atomic_swap(
self,
key: str,
expected: Any,
new: Any,
ttl: int | None = None,
fingerprint: str | None = None,
) -> bool:
"""Replace value when current equals expected."""

@abstractmethod
def key(self, task: Callable) -> str:
"""Storage key for task."""

def clear(self, workflow_id: str) -> None:
"""Remove every entry under workflow_id."""
self.delete_prefix(f"{workflow_id}-")
78 changes: 78 additions & 0 deletions dotflow/cloud/aws/services/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from dotflow.cloud.core import ObjectStorage
from dotflow.core.exception import ModuleNotFound

_PRECONDITION_CODES = {
"PreconditionFailed",
"ConditionalRequestConflict",
}


class S3(ObjectStorage):
"""Amazon S3 object storage."""
Expand Down Expand Up @@ -46,6 +51,79 @@ def write(self, key: str, data: list) -> None:
ContentType="application/json",
)

def delete(self, key: str) -> bool:
"""Delete a single object."""
from botocore.exceptions import ClientError

full_key = f"{self.prefix}{key}"

try:
self._s3.head_object(Bucket=self.bucket, Key=full_key)
except ClientError as error:
code = error.response.get("Error", {}).get("Code")

if code in ("404", "NoSuchKey", "NotFound"):
return False

raise

self._s3.delete_object(Bucket=self.bucket, Key=full_key)

return True

def read_with_etag(self, key: str) -> tuple[list, str | None]:
"""Return (data, etag). Etag is None when key is missing."""
try:
response = self._s3.get_object(
Bucket=self.bucket,
Key=f"{self.prefix}{key}",
)
data = response["Body"].read().decode("utf-8")

return loads(data), response.get("ETag")
except self._s3.exceptions.NoSuchKey:
return [], None

def write_if_match(self, key: str, data: list, etag: str | None) -> bool:
"""Conditional PutObject. Returns False on precondition failure."""
from botocore.exceptions import ClientError

kwargs = {
"Bucket": self.bucket,
"Key": f"{self.prefix}{key}",
"Body": dumps(data),
"ContentType": "application/json",
}

if etag is None:
kwargs["IfNoneMatch"] = "*"
else:
kwargs["IfMatch"] = etag

try:
self._s3.put_object(**kwargs)
return True
except ClientError as error:
code = error.response.get("Error", {}).get("Code")

if code in _PRECONDITION_CODES:
return False

raise

def list_keys(self, sub_prefix: str) -> list[str]:
"""Return keys starting with sub_prefix."""
full_prefix = f"{self.prefix}{sub_prefix}"
paginator = self._s3.get_paginator("list_objects_v2")
names = []
offset = len(self.prefix)

for page in paginator.paginate(Bucket=self.bucket, Prefix=full_prefix):
for item in page.get("Contents", []):
names.append(item["Key"][offset:])

return names

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every object whose key starts with prefix + sub_prefix.

Expand Down
51 changes: 51 additions & 0 deletions dotflow/cloud/gcp/services/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,57 @@ def write(self, key: str, data: list) -> None:
content_type="application/json",
)

def delete(self, key: str) -> bool:
"""Delete a single blob."""
blob = self._bucket.blob(f"{self.prefix}{key}")

try:
blob.delete()
return True
except self._not_found:
return False

def read_with_generation(self, key: str) -> tuple[list, int | None]:
"""Return (data, generation). Generation is None when missing."""
blob = self._bucket.blob(f"{self.prefix}{key}")

try:
data = blob.download_as_text()
return loads(data), blob.generation
except self._not_found:
return [], None

def write_if_generation_match(
self, key: str, data: list, generation: int | None
) -> bool:
"""Conditional upload. Returns False on precondition failure."""
from google.api_core.exceptions import PreconditionFailed

blob = self._bucket.blob(f"{self.prefix}{key}")
precondition = generation if generation is not None else 0

try:
blob.upload_from_string(
dumps(data),
content_type="application/json",
if_generation_match=precondition,
)
return True
except PreconditionFailed:
return False

def list_keys(self, sub_prefix: str) -> list[str]:
"""Return blob names starting with sub_prefix."""
full_prefix = f"{self.prefix}{sub_prefix}"
offset = len(self.prefix)

return [
blob.name[offset:]
for blob in self._client.list_blobs(
self._bucket, prefix=full_prefix
)
]

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every blob whose name starts with prefix + sub_prefix.

Expand Down
109 changes: 99 additions & 10 deletions dotflow/providers/storage_default.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,118 @@
"""Storage Default"""

from collections.abc import Callable
from __future__ import annotations

import threading
import time
from collections.abc import Callable, Iterable
from typing import Any

from dotflow.abc.storage import Storage
from dotflow.core.context import Context


class StorageDefault(Storage):
"""In-memory storage using a dictionary."""
"""In-memory storage."""

def __init__(self):
self._store: dict[str, Context] = {}
self._fingerprints: dict[str, str] = {}
self._expirations: dict[str, float] = {}
self._lock = threading.RLock()

def post(
self,
key: str,
context: Context,
ttl: int | None = None,
fingerprint: str | None = None,
) -> None:
with self._lock:
self._store[key] = context

if fingerprint is not None:
self._fingerprints[key] = fingerprint

def post(self, key: str, context: Context) -> None:
self._store[key] = context
if ttl is not None:
self._expirations[key] = time.monotonic() + ttl
else:
self._expirations.pop(key, None)

def get(self, key: str) -> Context:
return self._store.get(key, Context())
with self._lock:
self._evict_if_expired(key)

return self._store.get(key, Context())

def delete(self, key: str) -> bool:
with self._lock:
existed = key in self._store
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)

return existed

def delete_prefix(self, prefix: str) -> int:
with self._lock:
stale = [k for k in self._store if k.startswith(prefix)]

for key in stale:
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)

return len(stale)

def list_keys(self, prefix: str) -> Iterable[str]:
with self._lock:
for key in list(self._store):
self._evict_if_expired(key)

return [k for k in self._store if k.startswith(prefix)]

def atomic_swap(
self,
key: str,
expected: Any,
new: Any,
ttl: int | None = None,
fingerprint: str | None = None,
) -> bool:
with self._lock:
current = self._store.get(key)
current_value = (
current.storage if isinstance(current, Context) else current
)

if current_value != expected:
return False

payload = new if isinstance(new, Context) else Context(storage=new)
self._store[key] = payload
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)

if fingerprint is not None:
self._fingerprints[key] = fingerprint

if ttl is not None:
self._expirations[key] = time.monotonic() + ttl

Comment thread
FernandoCelmer marked this conversation as resolved.
return True

def key(self, task: Callable) -> str:
return f"{task.workflow_id}-{task.task_id}"

def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"
stale = [k for k in self._store if k.startswith(prefix)]
def _evict_if_expired(self, key: str) -> None:
expiry = self._expirations.get(key)

if expiry is None:
return

if time.monotonic() < expiry:
return

for key in stale:
del self._store[key]
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)
Loading
Loading