Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
caaff3f
⚙️ FEATURE-#287: Expand Storage ABC with delete and CAS primitives
FernandoCelmer May 2, 2026
287d0a4
⚙️ FEATURE-#287: Add S3 delete and list_keys helpers
FernandoCelmer May 2, 2026
17c1fc4
⚙️ FEATURE-#287: Add GCS delete and list_keys helpers
FernandoCelmer May 2, 2026
ac5f173
⚙️ FEATURE-#287: Implement new ABC surface in StorageDefault
FernandoCelmer May 2, 2026
1fe6bb1
⚙️ FEATURE-#287: Implement new ABC surface in StorageFile
FernandoCelmer May 2, 2026
e97f31e
⚙️ FEATURE-#287: Implement new ABC surface in StorageS3
FernandoCelmer May 2, 2026
174550e
⚙️ FEATURE-#287: Implement new ABC surface in StorageGCS
FernandoCelmer May 2, 2026
dda6c51
⚙️ FEATURE-#287: Add dotflow.testing public package
FernandoCelmer May 2, 2026
ac412b5
⚙️ FEATURE-#287: Add StorageContract reusable test suite
FernandoCelmer May 2, 2026
c745840
❤️ TEST-#287: Run StorageContract against StorageDefault
FernandoCelmer May 2, 2026
4a97472
❤️ TEST-#287: Run StorageContract against StorageFile
FernandoCelmer May 2, 2026
8f4aeb9
🪲 BUG-#287: Fix S3.delete exception and add conditional helpers
FernandoCelmer May 2, 2026
d187d08
⚙️ FEATURE-#287: Add GCS conditional write helpers
FernandoCelmer May 2, 2026
566537e
⚙️ FEATURE-#287: Pass ttl and fingerprint through atomic_swap
FernandoCelmer May 2, 2026
41db3c9
🪲 BUG-#287: Clear stale TTL on StorageDefault.atomic_swap
FernandoCelmer May 2, 2026
49acb07
🪲 BUG-#287: Serialize StorageFile public methods through lock
FernandoCelmer May 2, 2026
549ef8c
⚙️ FEATURE-#287: Use S3 conditional write for atomic_swap
FernandoCelmer May 2, 2026
be2f3c6
⚙️ FEATURE-#287: Use GCS generation precondition for atomic_swap
FernandoCelmer May 2, 2026
3124bf3
❤️ TEST-#287: Cover ttl propagation through atomic_swap
FernandoCelmer May 2, 2026
3660cc9
📝 PEP8-#287: Apply ruff format to S3.write_if_match signature
FernandoCelmer May 2, 2026
181e16a
🪲 BUG-#287: Keep new ABC methods optional in 1.x for backward compat
FernandoCelmer May 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions dotflow/abc/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Storage ABC"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Iterable
from typing import Any

from dotflow.core.context import Context

Expand All @@ -13,21 +16,39 @@ def __init__(self, *args, **kwargs):
pass

@abstractmethod
def post(self, key: str, context: Context) -> None:
"""Post context somewhere"""
def post(
self,
key: str,
context: Context,
ttl: int | None = None,
fingerprint: str | None = None,
) -> None:
"""Persist context under key."""

@abstractmethod
def get(self, key: str) -> Context:
"""Get context somewhere"""
"""Return stored context or empty Context()."""

@abstractmethod
def key(self, task: Callable):
"""Function that returns a key to get and post storage"""
def delete(self, key: str) -> bool:
"""Remove key. Returns True when present."""

@abstractmethod
def clear(self, workflow_id: str) -> None:
"""Remove every persisted entry under ``workflow_id``.
def delete_prefix(self, prefix: str) -> int:
"""Remove keys starting with prefix. Returns count."""

Used by the input-fingerprint reset path when
``on_input_change='reset'``.
"""
@abstractmethod
def list_keys(self, prefix: str) -> Iterable[str]:
"""Iterate keys starting with prefix."""

@abstractmethod
def atomic_swap(self, key: str, expected: Any, new: Any) -> bool:
"""Replace value when current equals expected."""

@abstractmethod
def key(self, task: Callable) -> str:
"""Storage key for task."""

def clear(self, workflow_id: str) -> None:
"""Remove every entry under workflow_id."""
self.delete_prefix(f"{workflow_id}-")
26 changes: 26 additions & 0 deletions dotflow/cloud/aws/services/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ def write(self, key: str, data: list) -> None:
ContentType="application/json",
)

def delete(self, key: str) -> bool:
"""Delete a single object."""
full_key = f"{self.prefix}{key}"

try:
self._s3.head_object(Bucket=self.bucket, Key=full_key)
except self._s3.exceptions.ClientError:
return False
Comment thread
FernandoCelmer marked this conversation as resolved.
Outdated

self._s3.delete_object(Bucket=self.bucket, Key=full_key)

return True

def list_keys(self, sub_prefix: str) -> list[str]:
"""Return keys starting with sub_prefix."""
full_prefix = f"{self.prefix}{sub_prefix}"
paginator = self._s3.get_paginator("list_objects_v2")
names = []
offset = len(self.prefix)

for page in paginator.paginate(Bucket=self.bucket, Prefix=full_prefix):
for item in page.get("Contents", []):
names.append(item["Key"][offset:])

return names

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every object whose key starts with prefix + sub_prefix.

Expand Down
22 changes: 22 additions & 0 deletions dotflow/cloud/gcp/services/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ def write(self, key: str, data: list) -> None:
content_type="application/json",
)

def delete(self, key: str) -> bool:
"""Delete a single blob."""
blob = self._bucket.blob(f"{self.prefix}{key}")

try:
blob.delete()
return True
except self._not_found:
return False

def list_keys(self, sub_prefix: str) -> list[str]:
"""Return blob names starting with sub_prefix."""
full_prefix = f"{self.prefix}{sub_prefix}"
offset = len(self.prefix)

return [
blob.name[offset:]
for blob in self._client.list_blobs(
self._bucket, prefix=full_prefix
)
]

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every blob whose name starts with prefix + sub_prefix.

Expand Down
94 changes: 84 additions & 10 deletions dotflow/providers/storage_default.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,103 @@
"""Storage Default"""

from collections.abc import Callable
from __future__ import annotations

import threading
import time
from collections.abc import Callable, Iterable
from typing import Any

from dotflow.abc.storage import Storage
from dotflow.core.context import Context


class StorageDefault(Storage):
"""In-memory storage using a dictionary."""
"""In-memory storage."""

def __init__(self):
self._store: dict[str, Context] = {}
self._fingerprints: dict[str, str] = {}
self._expirations: dict[str, float] = {}
self._lock = threading.RLock()

def post(
self,
key: str,
context: Context,
ttl: int | None = None,
fingerprint: str | None = None,
) -> None:
with self._lock:
self._store[key] = context

if fingerprint is not None:
self._fingerprints[key] = fingerprint

def post(self, key: str, context: Context) -> None:
self._store[key] = context
if ttl is not None:
self._expirations[key] = time.monotonic() + ttl
else:
self._expirations.pop(key, None)

def get(self, key: str) -> Context:
return self._store.get(key, Context())
with self._lock:
self._evict_if_expired(key)

return self._store.get(key, Context())

def delete(self, key: str) -> bool:
with self._lock:
existed = key in self._store
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)

return existed

def delete_prefix(self, prefix: str) -> int:
with self._lock:
stale = [k for k in self._store if k.startswith(prefix)]

for key in stale:
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)

return len(stale)

def list_keys(self, prefix: str) -> Iterable[str]:
with self._lock:
for key in list(self._store):
self._evict_if_expired(key)

return [k for k in self._store if k.startswith(prefix)]

def atomic_swap(self, key: str, expected: Any, new: Any) -> bool:
with self._lock:
current = self._store.get(key)
current_value = (
current.storage if isinstance(current, Context) else current
)

if current_value != expected:
return False

payload = new if isinstance(new, Context) else Context(storage=new)
self._store[key] = payload

Comment thread
FernandoCelmer marked this conversation as resolved.
return True

def key(self, task: Callable) -> str:
return f"{task.workflow_id}-{task.task_id}"

def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"
stale = [k for k in self._store if k.startswith(prefix)]
def _evict_if_expired(self, key: str) -> None:
expiry = self._expirations.get(key)

if expiry is None:
return

if time.monotonic() < expiry:
return

for key in stale:
del self._store[key]
self._store.pop(key, None)
self._fingerprints.pop(key, None)
self._expirations.pop(key, None)
Loading
Loading