Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ repos:
types: ["python"]
- id: pip-audit
name: pip-audit
entry: uv run pip-audit
entry: uv run pip-audit --ignore-vuln CVE-2026-3219
language: system
pass_filenames: false
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ruff: # Run 'ruff' linter and print a preview of errors
uv run ruff check .

safety: # Check for security vulnerabilities and verify Pipfile.lock is up-to-date
uv run pip-audit
uv run pip-audit --ignore-vuln CVE-2026-3219

lint-apply: black-apply ruff-apply # Apply changes with 'black' and resolve 'fixable errors' with 'ruff'

Expand Down
48 changes: 4 additions & 44 deletions dsc/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import logging
import subprocess
from time import perf_counter

import click
Expand All @@ -9,6 +8,7 @@
from dsc.db.models import ItemSubmissionDB
from dsc.exceptions import BatchCreationFailedError
from dsc.reports import CreateReport, FinalizeReport, SubmitReport
from dsc.utils.aws.s3 import run_aws_cli_sync
from dsc.workflows.base import Workflow

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -212,49 +212,10 @@ def sync(
"or set the S3_BUCKET_SYNC_SOURCE environment variable"
)

logger.info(f"Syncing data from {source} to {destination}")

args = [
"aws",
"s3",
"sync",
source,
destination,
"--delete",
"--exclude",
"dspace_metadata/*",
]

# add optional args
if dry_run:
args.append("--dryrun")

process = subprocess.Popen( # noqa: S603
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
return_code = run_aws_cli_sync(
Comment thread
jonavellecuerdo marked this conversation as resolved.
source, destination, exclude_patterns=["dspace_metadata/*"], dry_run=dry_run
)

# log process output (stdout and stderr) in real-time
if process.stdout:
for line in process.stdout:
if line:
logger.info(line)
else:
logger.info("No changes detected in source, no sync required")

if process.stderr:
for line in process.stderr:
if line:
logger.error(line)

# wait for the process to complete
process.wait()
return_code = process.returncode

if return_code != 0:
logger.error(f"Failed to sync (exit code: {return_code})")
ctx.exit(return_code) # exit with the same code as subprocess
else:
logger.info("Sync completed successfully")
ctx.exit(return_code)


@main.command()
Expand All @@ -263,7 +224,6 @@ def sync(
"-c",
"--collection-handle",
help="The handle of the DSpace collection to which the batch will be submitted",
required=True,
)
@click.option(
"-e",
Expand Down
90 changes: 79 additions & 11 deletions dsc/utils/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import logging
import subprocess
from typing import TYPE_CHECKING
from urllib.parse import urlparse

import boto3

Expand All @@ -17,25 +19,42 @@ class S3Client:
def __init__(self) -> None:
self.client = boto3.client("s3")

def archive_file_with_new_key(
self, bucket: str, key: str, archived_key_prefix: str
def move_file(
self,
source_file: str,
destination_file: str,
) -> None:
"""Update the key of the specified file to archive it from processing.
"""Move an S3 object to another location.

Like the AWS CLI 'mv' command, this method copies the source object or file
to the specified destination and then deletes the source object or file.

Args:
bucket: The S3 bucket containing the files to be archived.
key: The key of the file to archive.
archived_key_prefix: The prefix to be applied to the archived file.
source_file: S3 URI to source object to copy.
destination_file: S3 URI to destination object.
"""
parsed_source_uri = urlparse(source_file, allow_fragments=False)
parsed_destination_uri = urlparse(destination_file, allow_fragments=False)

source_bucket, source_key = (
parsed_source_uri.netloc,
parsed_source_uri.path.lstrip("/"),
)
destination_bucket, destination_key = (
parsed_destination_uri.netloc,
parsed_destination_uri.path.lstrip("/"),
)

self.client.copy_object(
Bucket=bucket,
CopySource=f"{bucket}/{key}",
Key=f"{archived_key_prefix}/{key}",
Bucket=destination_bucket,
CopySource=f"{source_bucket}/{source_key}",
Key=destination_key,
)
self.client.delete_object(
Bucket=bucket,
Key=key,
Bucket=source_bucket,
Key=source_key,
)
logger.debug(f"Moved file from {source_file} to {destination_file}")

def put_file(
self,
Expand Down Expand Up @@ -107,3 +126,52 @@ def files_iter(
continue

yield f"s3://{bucket}/{content["Key"]}"


def run_aws_cli_sync(
source: str,
destination: str,
*,
exclude_patterns: list[str] | None = None,
dry_run: bool = False,
) -> int:
logger.info(f"Syncing data from {source} to {destination}")

args = ["aws", "s3", "sync", source, destination, "--delete"]

# add optional args
# exclude all files or objects from the command that matches the specified patterns
if exclude_patterns:
for pattern in exclude_patterns:
args.extend(["--exclude", pattern])
# only display operations that would be performed without execution
if dry_run:
args.append("--dryrun")

Comment thread
jonavellecuerdo marked this conversation as resolved.
process = subprocess.Popen( # noqa: S603
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)

# log process output (stdout and stderr) in real-time
if process.stdout:
for line in process.stdout:
if line:
logger.info(line)
Comment thread
jonavellecuerdo marked this conversation as resolved.
else:
logger.info("No changes detected in source, no sync required")

if process.stderr:
for line in process.stderr:
if line:
logger.error(line)
Comment thread
jonavellecuerdo marked this conversation as resolved.

# wait for the process to complete
process.wait()
return_code = process.returncode

if return_code != 0:
logger.error(f"Failed to sync (exit code: {return_code})")
else:
logger.info("Sync completed successfully")

return return_code
31 changes: 24 additions & 7 deletions dsc/workflows/base/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class Workflow(ABC):
"""A base workflow class from which other workflow classes are derived."""

workflow_name: str = "base"
submission_system: str = "DSpace@MIT"
submission_system: str = "IR-8"
Comment thread
jonavellecuerdo marked this conversation as resolved.

def __init__(self, batch_id: str) -> None:
"""Initialize base instance.
Expand Down Expand Up @@ -239,23 +239,21 @@ def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]:
pass # noqa: PIE790

@final
def _create_batch_in_db(self, item_submissions: list[dict]) -> None:
def _create_batch_in_db(self, item_submissions: list[ItemSubmission]) -> None:
"""Write records for a batch of item submissions to DynamoDB.

This method loops through the item submissions (init params)
represented as a list dicts. For each item submission, the
method creates an instance of ItemSubmission and saves the
record to DynamoDB.
"""
for item_submission_init_params in item_submissions:
item_submission = ItemSubmission.create(**item_submission_init_params)
for item_submission in item_submissions:
item_submission.last_run_date = self.run_date
item_submission.status = ItemSubmissionStatus.BATCH_CREATED
item_submission.status_details = None
item_submission.save()

@final
def submit_items(self, collection_handle: str) -> list:
def submit_items(self, collection_handle: str | None = None) -> list:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here, but not a formal request for this PR.

I think this method might benefit from leaning on another like _submit_item(...). Then, this could be simplified as a for loop that orchestrates, but the private _submit_item(...) method has most of the business logic.

You'd end up with something more like this, with the try only attemping one statement:

for item_submission in ItemSubmission.get_batch(self.batch_id):
    self.submission_summary["total"] += 1

    if not item_submission.ready_to_submit():
        self.submission_summary["skipped"] += 1
        continue

    try:
        items.append(
            self._submit_item(item_submission, collection_handle, batch_metadata)
        )

    except NotImplementedError:
        raise

    except Exception as exception:  # noqa: BLE001
        self.submission_summary["errors"] += 1
        item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED
        item_submission.status_details = str(exception)
        item_submission.submit_attempts += 1
        item_submission.upsert_db()

What I feel like that refactor work does is expose how collection_handle and batch_metadata continue to feel kind of awkward.

It would appear that for any given invocation of submit_items() they are shared across all items, so why couldn't they be saved to the workflow instance? or, attached to the ItemSubmission instances that are getting passed around?

Feel free to ignore this for now, but I may reference it for the collection handle resolution if not provided in another comment.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm circling back to this now given my continued reading and comment down below.

I get how this works, but am finding there is some cognitive dissonance here.

Is it correct that for most workflows, collection_handle is static / fixed / same for all items, and that's how it gets passed to submit_items()? But for some -- e.g. digi theses -- it is not, and can theoretically vary per item?

If so, I think I'd double down and always use a dedicated method for getting the collection handle. And, make all workflows implement it. Maybe 90% of them just return a single, hardcoded, static string, but then there is parity on how a single item gets its collection handle: it calls get_collection_handle(<ItemSubmission>) or something.

This still leaves the awkward batch_metadata in my comment above about refactoring to a private _submit_item(), but it knocks out the collection_handle passing weirdness!

Copy link
Copy Markdown
Contributor Author

@jonavellecuerdo jonavellecuerdo Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To recap, the collection_handle is passed into DSC ad follows:

  • Users can provide a collection_handle in the input payload for submit command when invoking the Step Function.
  • The collection_handle is set for the DSC submit CLI command.
  • The collection_handle is passed to Workflow.submit_items() call.
  • The collection_handle is set on the ItemSubmission object.

In my initial passes at defining this method, I was planning for the following signature: _get_item_collection_handle(item_metadata: dict), rather than accepting an ItemSubmission instance (for context, item metadata is not saved to the ItemSubmission object and is thus not present when the object is loaded from DynamoDB). Thus, item metadata has to come from somewhere, and currently it is from this line (which is less than ideal).

Continuing the assumption that item metadata is not saved to the ItemSubmission object, if we were to define the method for all workflows, a workflow like opencourseware or sccs would look like this:

class Workflow:
   def submit_items(self, collection_handle: str):
      get_item_collection_handle(collection_handle)

   # where
   def get_item_collection_handle(collection_handle: str): 
      return collection_handle

The sample code above feels redundant/somewhat of an anti-pattern. 🤔 For this reason, I proposed:

class Workflow:
   def submit_items(self, collection_handle: str | None = None): 
      item_submission.collection_handle = (
                    collection_handle or self._get_item_collection_handle(<item metadata accessed from somewhere>)
                )

I hope this provides some context into the updates proposed here! Lastly, I selected the name _get_item_collection_handle in hopes it would imply how, if the method is called, this collection handle is at the item-level and not intended to be used across all the item submissions for a given batch.

Let me know what you think about keeping the derivation as is. Fully acknowledging that we still should tackle how we access item metadata in a follow-up PR!

Tagging @ehanson8 to request additional feedback and thoughts, too.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonavellecuerdo - you're definitely much closer to the codebase, so I say go for it if it's feeling right still. I can always dig back in post PR and, if something still feels off, propose a small new PR at that time. Thanks for hearing me out!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will merge as-is for now but continue to ponder in next PR!

"""Submit items to the DSpace Submission Service according to the workflow class.

Args:
Expand Down Expand Up @@ -300,12 +298,16 @@ def submit_items(self, collection_handle: str) -> list:
item_identifier
)

item_submission.collection_handle = (
collection_handle or self._get_item_collection_handle()
)

# Send submission message to DSS input queue
response = item_submission.send_submission_message(
self.workflow_name,
self.output_queue,
self.submission_system,
collection_handle,
item_submission.collection_handle,
)

# Record details of the item submission message
Expand All @@ -323,6 +325,8 @@ def submit_items(self, collection_handle: str) -> list:
item_submission.status_details = None
item_submission.submit_attempts += 1
item_submission.upsert_db()
except NotImplementedError:
raise
except Exception as exception: # noqa: BLE001
self.submission_summary["errors"] += 1
item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED
Expand All @@ -336,6 +340,19 @@ def submit_items(self, collection_handle: str) -> list:
)
return items

def _get_item_collection_handle(self) -> str:
"""Get collection handle for an item submission.

This method is required for workflows where the collection handle for an item
must be derived dynamically based on the provided item metadata.

OPTIONAL override by workflow subclasses.
"""
raise NotImplementedError(
f"The '{self.workflow_name}' workflow expects collection_handle"
"when calling submit_items()"
Comment thread
jonavellecuerdo marked this conversation as resolved.
)

@final
def finalize_items(self) -> None:
"""Examine results for all item submissions in the batch.
Expand Down
11 changes: 6 additions & 5 deletions dsc/workflows/opencourseware/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import smart_open

from dsc.exceptions import ItemMetadataNotFoundError
from dsc.item_submission import ItemSubmission
from dsc.utils.aws.s3 import S3Client
from dsc.workflows.base import Workflow
from dsc.workflows.opencourseware import OpenCourseWareTransformer
Expand Down Expand Up @@ -120,11 +121,11 @@ def prepare_batch(

# if item submission includes metadata, save init params
item_submissions.append(
{
"batch_id": self.batch_id,
"item_identifier": item_metadata["item_identifier"],
"workflow_name": self.workflow_name,
}
ItemSubmission(
batch_id=self.batch_id,
item_identifier=item_metadata["item_identifier"],
workflow_name=self.workflow_name,
)
Comment thread
jonavellecuerdo marked this conversation as resolved.
)

return item_submissions, errors
11 changes: 6 additions & 5 deletions dsc/workflows/simple_csv/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import smart_open

from dsc.exceptions import ItemBitstreamsNotFoundError
from dsc.item_submission import ItemSubmission
from dsc.utils.aws import S3Client
from dsc.workflows.base import Workflow

Expand Down Expand Up @@ -114,11 +115,11 @@ def prepare_batch(

# if item submission has associated bitstreams, save init params
item_submissions.append(
{
"batch_id": self.batch_id,
"item_identifier": item_metadata["item_identifier"],
"workflow_name": self.workflow_name,
}
ItemSubmission(
batch_id=self.batch_id,
item_identifier=item_metadata["item_identifier"],
workflow_name=self.workflow_name,
)
)

return item_submissions, errors
12 changes: 8 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ dependencies = [
"click>=8.3.1",
"jinja2>=3.1.6",
"jsonschema>=4.26.0",
"lxml>=6.0.2",
"pandas>=2.3.3",
"pynamodb>=6.1.0",
"sentry-sdk>=2.50.0",
"smart_open[s3]>=7.5.0"
"smart_open",
]

[dependency-groups]
dev = [
"black>=26.1.0",
"coveralls>=4.0.2",
"freezegun>=1.5.5",
"ipython>=9.12.0",
"moto[server]>=5.1.20",
"mypy>=1.19.1",
"pandas-stubs>=2.3.3.260113",
"pip-audit>=2.10.0",
"pre-commit>=4.5.1",
"pytest>=9.0.2",
"ruff>=0.14.13",
"types-jsonschema>=4.26.0.20260109"
"types-jsonschema>=4.26.0.20260109",
"types-requests>=2.33.0.20260327",
]

[tool.black]
Expand All @@ -43,9 +46,10 @@ disallow_untyped_defs = true
exclude = ["tests/"]

[[tool.mypy.overrides]]
module = ["smart_open"]
module = ["dspace_rest_client.*", "smart_open"]
ignore_missing_imports = true


[tool.pytest.ini_options]
log_level = "INFO"

Expand Down Expand Up @@ -113,4 +117,4 @@ requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
py-modules = ["dsc"]
py-modules = ["dsc"]
Loading
Loading