From 2a560c187fd8ba4137d83a0644d643f8a9182374 Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 10:43:13 -0500 Subject: [PATCH 1/8] Refactor project folder structure for improved organization and scalability Why these changes are being introduced: * The restructuring of the app's project directory was required to better accommodate the introduction of new DSC workflows. As we've started to plan the implementation of other workflows, it's becoming clear that each workflow is different. This new structure allows us to standardize the way each workflow is organized, while also providing a space for workflow-specific assets that may be needed. How this addresses that need: * Create folders for each DSC workflow implementation * Organize workflow and transformer Python files and metadata mapping JSON files under corresponding workflow dirs * Clean up imports * Update DSC version to 1.3 Side effects of this change: * None. It's worth highlighting that this commit focuses on structural changes to the app's project directory and does not introduce any functional changes to the app. Relevant ticket(s): * TBD --- dsc/workflows/__init__.py | 2 +- dsc/workflows/archivesspace/__init__.py | 3 + .../metadata_mapping.json} | 0 .../workflow.py} | 4 +- dsc/workflows/base/__init__.py | 444 +----------------- dsc/workflows/base/workflow.py | 443 +++++++++++++++++ dsc/workflows/opencourseware/__init__.py | 4 + .../metadata_mapping.json} | 0 .../transformer.py} | 129 +---- dsc/workflows/opencourseware/workflow.py | 130 +++++ dsc/workflows/sccs/__init__.py | 3 + .../sccs.json => sccs/metadata_mapping.json} | 0 dsc/workflows/{sccs.py => sccs/workflow.py} | 4 +- dsc/workflows/simple_csv/__init__.py | 3 + .../simple_csv.py => simple_csv/workflow.py} | 0 pyproject.toml | 2 +- tests/conftest.py | 5 +- tests/test_workflow_opencourseware.py | 12 +- 18 files changed, 605 insertions(+), 583 deletions(-) create mode 100644 dsc/workflows/archivesspace/__init__.py rename dsc/workflows/{metadata_mapping/archivesspace.json => archivesspace/metadata_mapping.json} (100%) rename dsc/workflows/{archivesspace.py => archivesspace/workflow.py} (96%) create mode 100644 dsc/workflows/base/workflow.py create mode 100644 dsc/workflows/opencourseware/__init__.py rename dsc/workflows/{metadata_mapping/opencourseware.json => opencourseware/metadata_mapping.json} (100%) rename dsc/workflows/{opencourseware.py => opencourseware/transformer.py} (72%) create mode 100644 dsc/workflows/opencourseware/workflow.py create mode 100644 dsc/workflows/sccs/__init__.py rename dsc/workflows/{metadata_mapping/sccs.json => sccs/metadata_mapping.json} (100%) rename dsc/workflows/{sccs.py => sccs/workflow.py} (81%) create mode 100644 dsc/workflows/simple_csv/__init__.py rename dsc/workflows/{base/simple_csv.py => simple_csv/workflow.py} (100%) diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py index b8749f9..da981db 100644 --- a/dsc/workflows/__init__.py +++ b/dsc/workflows/__init__.py @@ -5,8 +5,8 @@ from dsc.workflows.archivesspace import ArchivesSpace from dsc.workflows.base import Workflow -from dsc.workflows.base.simple_csv import SimpleCSV from dsc.workflows.opencourseware import OpenCourseWare from dsc.workflows.sccs import SCCS +from dsc.workflows.simple_csv import SimpleCSV __all__ = ["SCCS", "ArchivesSpace", "OpenCourseWare", "SimpleCSV", "Workflow"] diff --git a/dsc/workflows/archivesspace/__init__.py b/dsc/workflows/archivesspace/__init__.py new file mode 100644 index 0000000..d9e00ff --- /dev/null +++ b/dsc/workflows/archivesspace/__init__.py @@ -0,0 +1,3 @@ +from dsc.workflows.archivesspace.workflow import ArchivesSpace + +__all__ = ["ArchivesSpace"] diff --git a/dsc/workflows/metadata_mapping/archivesspace.json b/dsc/workflows/archivesspace/metadata_mapping.json similarity index 100% rename from dsc/workflows/metadata_mapping/archivesspace.json rename to dsc/workflows/archivesspace/metadata_mapping.json diff --git a/dsc/workflows/archivesspace.py b/dsc/workflows/archivesspace/workflow.py similarity index 96% rename from dsc/workflows/archivesspace.py rename to dsc/workflows/archivesspace/workflow.py index 07126a8..059857f 100644 --- a/dsc/workflows/archivesspace.py +++ b/dsc/workflows/archivesspace/workflow.py @@ -5,7 +5,7 @@ from dsc.db.models import ItemSubmissionStatus from dsc.item_submission import ItemSubmission -from dsc.workflows.base.simple_csv import SimpleCSV +from dsc.workflows.simple_csv import SimpleCSV logger = logging.getLogger(__name__) @@ -25,7 +25,7 @@ class ArchivesSpace(SimpleCSV): @property def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/archivesspace.json" + return "dsc/workflows/archivesspace/metadata_mapping.json" @property def output_path(self) -> str: diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index 4346d24..eb51c74 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -1,443 +1,3 @@ -from __future__ import annotations +from dsc.workflows.base.workflow import Workflow -import json -import logging -from abc import ABC, abstractmethod -from dataclasses import dataclass -from datetime import UTC, datetime -from typing import TYPE_CHECKING, Any, final - -import jsonschema -import jsonschema.exceptions - -from dsc.config import Config -from dsc.db.models import ItemSubmissionStatus -from dsc.exceptions import ( - BatchCreationFailedError, - InvalidSQSMessageError, - InvalidWorkflowNameError, -) -from dsc.item_submission import ItemSubmission -from dsc.reports import Report -from dsc.utilities.aws import SESClient, SQSClient -from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY - -if TYPE_CHECKING: # pragma: no cover - from collections.abc import Iterator - - from mypy_boto3_sqs.type_defs import MessageTypeDef - - from dsc.reports import Report - -logger = logging.getLogger(__name__) -CONFIG = Config() - -ITEM_SUBMISSION_LOG_STR = ( - "with primary keys batch_id={batch_id} (hash key) and " - "item_identifier={item_identifier} (range key)" -) - - -@dataclass -class DSSResultMessage: - """Represents a parsed DSpace Submission Service result message.""" - - item_identifier: str - submission_source: str - result_type: str - dspace_handle: str | None - last_modified: str | None - error_info: str | None - error_timestamp: str | None - dspace_response: str | None - exception_traceback: list[str] | None - message_id: str - receipt_handle: str - raw_message: MessageTypeDef - - @classmethod - def from_result_message(cls, message: MessageTypeDef) -> DSSResultMessage: - """Create instance from result message. - - Args: - message: A result message from the DSS output queue - - Raises: - InvalidSQSMessageError: If message fails validation - """ - try: - attrs = message.get("MessageAttributes", {}) - jsonschema.validate(instance=attrs, schema=RESULT_MESSAGE_ATTRIBUTES) - - body = json.loads(message.get("Body", "{}")) - jsonschema.validate(instance=body, schema=RESULT_MESSAGE_BODY) - - return cls( - item_identifier=attrs["PackageID"]["StringValue"], - submission_source=attrs["SubmissionSource"]["StringValue"], - result_type=body["ResultType"], - dspace_handle=body.get("ItemHandle"), - last_modified=body.get("lastModified"), - error_info=body.get("ErrorInfo", "Unknown error"), - error_timestamp=body.get("ErrorTimestamp"), - dspace_response=body.get("DSpaceResponse"), - exception_traceback=body.get("ExceptionTraceback"), - message_id=message["MessageId"], - receipt_handle=message["ReceiptHandle"], - raw_message=message, - ) - - except (KeyError, json.JSONDecodeError) as exception: - raise InvalidSQSMessageError( - f"Failed to parse result message: {exception}" - ) from exception - except jsonschema.exceptions.ValidationError as exception: - raise InvalidSQSMessageError( - f"Result message failed schema validation: {exception}" - ) from exception - - -class Workflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - workflow_name: str = "base" - submission_system: str = "DSpace@MIT" - - def __init__(self, batch_id: str) -> None: - """Initialize base instance. - - Args: - batch_id: Unique identifier for a 'batch' deposit that corresponds - to the name of a subfolder in the workflow directory of the S3 bucket. - This subfolder is where the S3 client will search for bitstream - and metadata files. - """ - self.batch_id = batch_id - self.run_date = datetime.now(UTC) - self.exclude_prefixes: list[str] = [ - "archived/", - "dspace_metadata/", - f"{self.batch_path}metadata.csv", - ] - self.submission_summary: dict[str, int] = { - "total": 0, - "submitted": 0, - "skipped": 0, - "errors": 0, - } - - # cache list of bitstreams - self._batch_bitstream_uris: list[str] | None = None - - @property - @abstractmethod - def metadata_mapping_path(self) -> str: - """Path to the JSON metadata mapping file for the workflow.""" - - @property - def metadata_mapping(self) -> dict: - with open(self.metadata_mapping_path) as mapping_file: - return json.load(mapping_file) - - @final - @property - def s3_bucket(self) -> str: - return CONFIG.s3_bucket_submission_assets - - @property - def output_queue(self) -> str: - """The SQS output queue for the DSS result messages.""" - return f"dss-output-dsc-{CONFIG.workspace}" - - @property - def batch_path(self) -> str: - return f"{self.workflow_name}/{self.batch_id}/" - - @property - def batch_bitstream_uris(self) -> list[str]: - if not self._batch_bitstream_uris: - self._batch_bitstream_uris = self.get_batch_bitstream_uris() - return self._batch_bitstream_uris - - @property - def retry_threshold(self) -> int: - return CONFIG.retry_threshold - - @final - @classmethod - def get_workflow(cls, workflow_name: str) -> type[Workflow]: - """Return workflow class. - - Args: - workflow_name: The label of the workflow. Must match a workflow_name attribute - from Workflow subclass. - """ - for workflow_class in cls._get_subclasses(): - if workflow_name == workflow_class.workflow_name: - return workflow_class - raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") - - @classmethod - def _get_subclasses(cls) -> Iterator[type[Workflow]]: - for subclass in cls.__subclasses__(): - yield from subclass._get_subclasses() # noqa: SLF001 - yield subclass - - @abstractmethod - def get_batch_bitstream_uris(self) -> list[str]: - """Get list of bitstream URIs for a batch.""" - - @final - def get_item_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get list of bitstreams URIs for an item.""" - return [uri for uri in self.batch_bitstream_uris if item_identifier in uri] - - @abstractmethod - def item_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Iterate through batch metadata to yield item metadata. - - MUST be overridden by workflow subclasses. - """ - - @final - def create_batch(self, *, synced: bool = False) -> None: - """Create a batch of item submissions for processing. - - A "batch" refers to a collection of item submissions that are grouped together - for coordinated processing, storage, and workflow execution. Each batch - typically consists of multiple items, each with its own metadata and - associated files, organized under a unique batch identifier. - - This method prepares the necessary assets in S3 (programmatically as needed) - and records each item in the batch to DynamoDB. - """ - item_submissions, errors = self.prepare_batch(synced=synced) - if errors: - raise BatchCreationFailedError(errors) - self._create_batch_in_db(item_submissions) - - @abstractmethod - def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: - """Prepare batch submission assets in S3. - - This method performs the required steps to prepare a batch - of item submissions in S3. These steps must include (at minimum) - the following checks: - - - Check if there is metadata for the item submission; - otherwise raise dsc.exceptions.ItemMetadataNotFoundError - - Check if there are any bitstreams for the item submission; - otherwise raise dsc.exceptions.ItemBitstreamsNotFoundError - - MUST be overridden by workflow subclasses. - - Returns: - A tuple of item submissions (init params) represented as a - list of dicts and errors represented as a list of tuples - containing the item identifier and the error message. - """ - pass # noqa: PIE790 - - @final - def _create_batch_in_db(self, item_submissions: list[dict]) -> None: - """Write records for a batch of item submissions to DynamoDB. - - This method loops through the item submissions (init params) - represented as a list dicts. For each item submission, the - method creates an instance of ItemSubmission and saves the - record to DynamoDB. - """ - for item_submission_init_params in item_submissions: - item_submission = ItemSubmission.create(**item_submission_init_params) - item_submission.last_run_date = self.run_date - item_submission.status = ItemSubmissionStatus.BATCH_CREATED - item_submission.status_details = None - item_submission.save() - - @final - def submit_items(self, collection_handle: str) -> list: - """Submit items to the DSpace Submission Service according to the workflow class. - - Args: - collection_handle: The handle of the DSpace collection to which the batch will - be submitted. - - - Returns a dict with the submission results organized into succeeded and failed - items. - """ - logger.info( - f"Submitting messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " - f"for batch '{self.batch_id}'" - ) - - batch_metadata = { - item_metadata["item_identifier"]: item_metadata - for item_metadata in self.item_metadata_iter() - } - - items = [] - for item_submission in ItemSubmission.get_batch(self.batch_id): - - self.submission_summary["total"] += 1 - item_identifier = item_submission.item_identifier - logger.debug(f"Preparing submission for item: {item_identifier}") - item_submission.last_run_date = self.run_date - - # validate whether a message should be sent for this item submission - if not item_submission.ready_to_submit(): - self.submission_summary["skipped"] += 1 - continue - try: - # prepare submission assets - item_submission.prepare_dspace_metadata( - metadata_mapping=self.metadata_mapping, - item_metadata=batch_metadata[item_identifier], - s3_bucket=self.s3_bucket, - batch_path=self.batch_path, - ) - item_submission.bitstream_s3_uris = self.get_item_bitstream_uris( - item_identifier - ) - - # Send submission message to DSS input queue - response = item_submission.send_submission_message( - self.workflow_name, - self.output_queue, - self.submission_system, - collection_handle, - ) - - # Record details of the item submission message - item_data = { - "item_identifier": item_identifier, - "message_id": response["MessageId"], - } - items.append(item_data) - self.submission_summary["submitted"] += 1 - - logger.info(f"Sent item submission message: {item_data["message_id"]}") - - # Set status in DynamoDB - item_submission.status = ItemSubmissionStatus.SUBMIT_SUCCESS - item_submission.status_details = None - item_submission.submit_attempts += 1 - item_submission.upsert_db() - except Exception as exception: # noqa: BLE001 - self.submission_summary["errors"] += 1 - item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED - item_submission.status_details = str(exception) - item_submission.submit_attempts += 1 - item_submission.upsert_db() - - logger.info( - f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " - f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" - ) - return items - - @final - def finalize_items(self) -> None: - """Examine results for all item submissions in the batch. - - This method involves three main steps: - - 1. Process DSS result messages from the output queue - 2. Apply workflow-specific processing - """ - logger.info( - f"Processing DSS result messages from the output queue '{self.output_queue}'" - ) - sqs_results_summary = { - "received_messages": 0, - "ingest_success": 0, - "ingest_failed": 0, - "ingest_unknown": 0, - } - - # retrieve and create map of result messages - sqs_client = SQSClient( - region=CONFIG.aws_region_name, queue_name=self.output_queue - ) - logger.info( - f"Processing DSS result messages from the output queue '{self.output_queue}'" - ) - result_message_map: dict[str, DSSResultMessage] = {} - for message in sqs_client.receive(): - try: - result_message_object = DSSResultMessage.from_result_message(message) - result_message_map[result_message_object.item_identifier] = ( - result_message_object - ) - except InvalidSQSMessageError: - logger.exception(f"Failure parsing message '{message}'") - continue - - sqs_results_summary["received_messages"] = len(result_message_map) - - # retrieve item submissions from batch - for item_submission in ItemSubmission.get_batch(self.batch_id): - log_str = ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=item_submission.item_identifier - ) - if item_submission.status == ItemSubmissionStatus.INGEST_SUCCESS: - logger.debug(f"Record {log_str} already ingested, skipping") - continue - - item_submission.ingest_attempts += 1 - - result_message = result_message_map.get(item_submission.item_identifier) - - # skip item submission if result message is not found - if not result_message: - continue - - # update item submission status based on ingest result - if result_message.result_type == "success": - item_submission.status = ItemSubmissionStatus.INGEST_SUCCESS - item_submission.status_details = None - item_submission.dspace_handle = result_message.dspace_handle - sqs_results_summary["ingest_success"] += 1 - logger.debug(f"Record {log_str} was ingested") - elif result_message.result_type == "error": - item_submission.status = ItemSubmissionStatus.INGEST_FAILED - item_submission.status_details = result_message.error_info - sqs_results_summary["ingest_failed"] += 1 - logger.debug(f"Record {log_str} failed to ingest") - else: - item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN - sqs_results_summary["ingest_unknown"] += 1 - logger.debug(f"Unable to determine ingest status for record {log_str}") - item_submission.last_result_message = str(result_message.raw_message) - item_submission.last_run_date = self.run_date - item_submission.upsert_db() - sqs_client.delete( - receipt_handle=result_message.receipt_handle, - message_id=result_message.message_id, - ) - - # optional method used for some workflows - self.workflow_specific_processing() - - logger.info( - f"Processed DSS result messages from the output queue '{self.output_queue}': " - f"{json.dumps(sqs_results_summary)}" - ) - - def workflow_specific_processing(self) -> None: - logger.info( - f"No extra processing for batch based on workflow: " - f"'{self.workflow_name}' " - ) - - def send_report(self, report: Report, email_recipients: list[str]) -> None: - """Send report as an email via SES.""" - logger.info(f"Sending report to recipients: {email_recipients}") - ses_client = SESClient(region=CONFIG.aws_region_name) - ses_client.create_and_send_email( - subject=report.subject, - source_email_address=CONFIG.source_email, - recipient_email_addresses=email_recipients, - message_body=report.generate_summary(), - attachments=report.generate_attachments(), - ) +__all__ = ["Workflow"] diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py new file mode 100644 index 0000000..4346d24 --- /dev/null +++ b/dsc/workflows/base/workflow.py @@ -0,0 +1,443 @@ +from __future__ import annotations + +import json +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any, final + +import jsonschema +import jsonschema.exceptions + +from dsc.config import Config +from dsc.db.models import ItemSubmissionStatus +from dsc.exceptions import ( + BatchCreationFailedError, + InvalidSQSMessageError, + InvalidWorkflowNameError, +) +from dsc.item_submission import ItemSubmission +from dsc.reports import Report +from dsc.utilities.aws import SESClient, SQSClient +from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Iterator + + from mypy_boto3_sqs.type_defs import MessageTypeDef + + from dsc.reports import Report + +logger = logging.getLogger(__name__) +CONFIG = Config() + +ITEM_SUBMISSION_LOG_STR = ( + "with primary keys batch_id={batch_id} (hash key) and " + "item_identifier={item_identifier} (range key)" +) + + +@dataclass +class DSSResultMessage: + """Represents a parsed DSpace Submission Service result message.""" + + item_identifier: str + submission_source: str + result_type: str + dspace_handle: str | None + last_modified: str | None + error_info: str | None + error_timestamp: str | None + dspace_response: str | None + exception_traceback: list[str] | None + message_id: str + receipt_handle: str + raw_message: MessageTypeDef + + @classmethod + def from_result_message(cls, message: MessageTypeDef) -> DSSResultMessage: + """Create instance from result message. + + Args: + message: A result message from the DSS output queue + + Raises: + InvalidSQSMessageError: If message fails validation + """ + try: + attrs = message.get("MessageAttributes", {}) + jsonschema.validate(instance=attrs, schema=RESULT_MESSAGE_ATTRIBUTES) + + body = json.loads(message.get("Body", "{}")) + jsonschema.validate(instance=body, schema=RESULT_MESSAGE_BODY) + + return cls( + item_identifier=attrs["PackageID"]["StringValue"], + submission_source=attrs["SubmissionSource"]["StringValue"], + result_type=body["ResultType"], + dspace_handle=body.get("ItemHandle"), + last_modified=body.get("lastModified"), + error_info=body.get("ErrorInfo", "Unknown error"), + error_timestamp=body.get("ErrorTimestamp"), + dspace_response=body.get("DSpaceResponse"), + exception_traceback=body.get("ExceptionTraceback"), + message_id=message["MessageId"], + receipt_handle=message["ReceiptHandle"], + raw_message=message, + ) + + except (KeyError, json.JSONDecodeError) as exception: + raise InvalidSQSMessageError( + f"Failed to parse result message: {exception}" + ) from exception + except jsonschema.exceptions.ValidationError as exception: + raise InvalidSQSMessageError( + f"Result message failed schema validation: {exception}" + ) from exception + + +class Workflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + workflow_name: str = "base" + submission_system: str = "DSpace@MIT" + + def __init__(self, batch_id: str) -> None: + """Initialize base instance. + + Args: + batch_id: Unique identifier for a 'batch' deposit that corresponds + to the name of a subfolder in the workflow directory of the S3 bucket. + This subfolder is where the S3 client will search for bitstream + and metadata files. + """ + self.batch_id = batch_id + self.run_date = datetime.now(UTC) + self.exclude_prefixes: list[str] = [ + "archived/", + "dspace_metadata/", + f"{self.batch_path}metadata.csv", + ] + self.submission_summary: dict[str, int] = { + "total": 0, + "submitted": 0, + "skipped": 0, + "errors": 0, + } + + # cache list of bitstreams + self._batch_bitstream_uris: list[str] | None = None + + @property + @abstractmethod + def metadata_mapping_path(self) -> str: + """Path to the JSON metadata mapping file for the workflow.""" + + @property + def metadata_mapping(self) -> dict: + with open(self.metadata_mapping_path) as mapping_file: + return json.load(mapping_file) + + @final + @property + def s3_bucket(self) -> str: + return CONFIG.s3_bucket_submission_assets + + @property + def output_queue(self) -> str: + """The SQS output queue for the DSS result messages.""" + return f"dss-output-dsc-{CONFIG.workspace}" + + @property + def batch_path(self) -> str: + return f"{self.workflow_name}/{self.batch_id}/" + + @property + def batch_bitstream_uris(self) -> list[str]: + if not self._batch_bitstream_uris: + self._batch_bitstream_uris = self.get_batch_bitstream_uris() + return self._batch_bitstream_uris + + @property + def retry_threshold(self) -> int: + return CONFIG.retry_threshold + + @final + @classmethod + def get_workflow(cls, workflow_name: str) -> type[Workflow]: + """Return workflow class. + + Args: + workflow_name: The label of the workflow. Must match a workflow_name attribute + from Workflow subclass. + """ + for workflow_class in cls._get_subclasses(): + if workflow_name == workflow_class.workflow_name: + return workflow_class + raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") + + @classmethod + def _get_subclasses(cls) -> Iterator[type[Workflow]]: + for subclass in cls.__subclasses__(): + yield from subclass._get_subclasses() # noqa: SLF001 + yield subclass + + @abstractmethod + def get_batch_bitstream_uris(self) -> list[str]: + """Get list of bitstream URIs for a batch.""" + + @final + def get_item_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get list of bitstreams URIs for an item.""" + return [uri for uri in self.batch_bitstream_uris if item_identifier in uri] + + @abstractmethod + def item_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @final + def create_batch(self, *, synced: bool = False) -> None: + """Create a batch of item submissions for processing. + + A "batch" refers to a collection of item submissions that are grouped together + for coordinated processing, storage, and workflow execution. Each batch + typically consists of multiple items, each with its own metadata and + associated files, organized under a unique batch identifier. + + This method prepares the necessary assets in S3 (programmatically as needed) + and records each item in the batch to DynamoDB. + """ + item_submissions, errors = self.prepare_batch(synced=synced) + if errors: + raise BatchCreationFailedError(errors) + self._create_batch_in_db(item_submissions) + + @abstractmethod + def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: + """Prepare batch submission assets in S3. + + This method performs the required steps to prepare a batch + of item submissions in S3. These steps must include (at minimum) + the following checks: + + - Check if there is metadata for the item submission; + otherwise raise dsc.exceptions.ItemMetadataNotFoundError + - Check if there are any bitstreams for the item submission; + otherwise raise dsc.exceptions.ItemBitstreamsNotFoundError + + MUST be overridden by workflow subclasses. + + Returns: + A tuple of item submissions (init params) represented as a + list of dicts and errors represented as a list of tuples + containing the item identifier and the error message. + """ + pass # noqa: PIE790 + + @final + def _create_batch_in_db(self, item_submissions: list[dict]) -> None: + """Write records for a batch of item submissions to DynamoDB. + + This method loops through the item submissions (init params) + represented as a list dicts. For each item submission, the + method creates an instance of ItemSubmission and saves the + record to DynamoDB. + """ + for item_submission_init_params in item_submissions: + item_submission = ItemSubmission.create(**item_submission_init_params) + item_submission.last_run_date = self.run_date + item_submission.status = ItemSubmissionStatus.BATCH_CREATED + item_submission.status_details = None + item_submission.save() + + @final + def submit_items(self, collection_handle: str) -> list: + """Submit items to the DSpace Submission Service according to the workflow class. + + Args: + collection_handle: The handle of the DSpace collection to which the batch will + be submitted. + + + Returns a dict with the submission results organized into succeeded and failed + items. + """ + logger.info( + f"Submitting messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " + f"for batch '{self.batch_id}'" + ) + + batch_metadata = { + item_metadata["item_identifier"]: item_metadata + for item_metadata in self.item_metadata_iter() + } + + items = [] + for item_submission in ItemSubmission.get_batch(self.batch_id): + + self.submission_summary["total"] += 1 + item_identifier = item_submission.item_identifier + logger.debug(f"Preparing submission for item: {item_identifier}") + item_submission.last_run_date = self.run_date + + # validate whether a message should be sent for this item submission + if not item_submission.ready_to_submit(): + self.submission_summary["skipped"] += 1 + continue + try: + # prepare submission assets + item_submission.prepare_dspace_metadata( + metadata_mapping=self.metadata_mapping, + item_metadata=batch_metadata[item_identifier], + s3_bucket=self.s3_bucket, + batch_path=self.batch_path, + ) + item_submission.bitstream_s3_uris = self.get_item_bitstream_uris( + item_identifier + ) + + # Send submission message to DSS input queue + response = item_submission.send_submission_message( + self.workflow_name, + self.output_queue, + self.submission_system, + collection_handle, + ) + + # Record details of the item submission message + item_data = { + "item_identifier": item_identifier, + "message_id": response["MessageId"], + } + items.append(item_data) + self.submission_summary["submitted"] += 1 + + logger.info(f"Sent item submission message: {item_data["message_id"]}") + + # Set status in DynamoDB + item_submission.status = ItemSubmissionStatus.SUBMIT_SUCCESS + item_submission.status_details = None + item_submission.submit_attempts += 1 + item_submission.upsert_db() + except Exception as exception: # noqa: BLE001 + self.submission_summary["errors"] += 1 + item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED + item_submission.status_details = str(exception) + item_submission.submit_attempts += 1 + item_submission.upsert_db() + + logger.info( + f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " + f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" + ) + return items + + @final + def finalize_items(self) -> None: + """Examine results for all item submissions in the batch. + + This method involves three main steps: + + 1. Process DSS result messages from the output queue + 2. Apply workflow-specific processing + """ + logger.info( + f"Processing DSS result messages from the output queue '{self.output_queue}'" + ) + sqs_results_summary = { + "received_messages": 0, + "ingest_success": 0, + "ingest_failed": 0, + "ingest_unknown": 0, + } + + # retrieve and create map of result messages + sqs_client = SQSClient( + region=CONFIG.aws_region_name, queue_name=self.output_queue + ) + logger.info( + f"Processing DSS result messages from the output queue '{self.output_queue}'" + ) + result_message_map: dict[str, DSSResultMessage] = {} + for message in sqs_client.receive(): + try: + result_message_object = DSSResultMessage.from_result_message(message) + result_message_map[result_message_object.item_identifier] = ( + result_message_object + ) + except InvalidSQSMessageError: + logger.exception(f"Failure parsing message '{message}'") + continue + + sqs_results_summary["received_messages"] = len(result_message_map) + + # retrieve item submissions from batch + for item_submission in ItemSubmission.get_batch(self.batch_id): + log_str = ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=item_submission.item_identifier + ) + if item_submission.status == ItemSubmissionStatus.INGEST_SUCCESS: + logger.debug(f"Record {log_str} already ingested, skipping") + continue + + item_submission.ingest_attempts += 1 + + result_message = result_message_map.get(item_submission.item_identifier) + + # skip item submission if result message is not found + if not result_message: + continue + + # update item submission status based on ingest result + if result_message.result_type == "success": + item_submission.status = ItemSubmissionStatus.INGEST_SUCCESS + item_submission.status_details = None + item_submission.dspace_handle = result_message.dspace_handle + sqs_results_summary["ingest_success"] += 1 + logger.debug(f"Record {log_str} was ingested") + elif result_message.result_type == "error": + item_submission.status = ItemSubmissionStatus.INGEST_FAILED + item_submission.status_details = result_message.error_info + sqs_results_summary["ingest_failed"] += 1 + logger.debug(f"Record {log_str} failed to ingest") + else: + item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN + sqs_results_summary["ingest_unknown"] += 1 + logger.debug(f"Unable to determine ingest status for record {log_str}") + item_submission.last_result_message = str(result_message.raw_message) + item_submission.last_run_date = self.run_date + item_submission.upsert_db() + sqs_client.delete( + receipt_handle=result_message.receipt_handle, + message_id=result_message.message_id, + ) + + # optional method used for some workflows + self.workflow_specific_processing() + + logger.info( + f"Processed DSS result messages from the output queue '{self.output_queue}': " + f"{json.dumps(sqs_results_summary)}" + ) + + def workflow_specific_processing(self) -> None: + logger.info( + f"No extra processing for batch based on workflow: " + f"'{self.workflow_name}' " + ) + + def send_report(self, report: Report, email_recipients: list[str]) -> None: + """Send report as an email via SES.""" + logger.info(f"Sending report to recipients: {email_recipients}") + ses_client = SESClient(region=CONFIG.aws_region_name) + ses_client.create_and_send_email( + subject=report.subject, + source_email_address=CONFIG.source_email, + recipient_email_addresses=email_recipients, + message_body=report.generate_summary(), + attachments=report.generate_attachments(), + ) diff --git a/dsc/workflows/opencourseware/__init__.py b/dsc/workflows/opencourseware/__init__.py new file mode 100644 index 0000000..edc93f3 --- /dev/null +++ b/dsc/workflows/opencourseware/__init__.py @@ -0,0 +1,4 @@ +from dsc.workflows.opencourseware.transformer import OpenCourseWareTransformer +from dsc.workflows.opencourseware.workflow import OpenCourseWare + +__all__ = ["OpenCourseWare", "OpenCourseWareTransformer"] diff --git a/dsc/workflows/metadata_mapping/opencourseware.json b/dsc/workflows/opencourseware/metadata_mapping.json similarity index 100% rename from dsc/workflows/metadata_mapping/opencourseware.json rename to dsc/workflows/opencourseware/metadata_mapping.json diff --git a/dsc/workflows/opencourseware.py b/dsc/workflows/opencourseware/transformer.py similarity index 72% rename from dsc/workflows/opencourseware.py rename to dsc/workflows/opencourseware/transformer.py index fad76ad..7dc3e0e 100644 --- a/dsc/workflows/opencourseware.py +++ b/dsc/workflows/opencourseware/transformer.py @@ -1,18 +1,7 @@ import inspect -import json -import logging -import zipfile -from collections.abc import Iterable, Iterator +from collections.abc import Iterable from typing import Any, ClassVar -import smart_open - -from dsc.exceptions import ItemMetadataNotFoundError -from dsc.utilities.aws.s3 import S3Client -from dsc.workflows.base import Workflow - -logger = logging.getLogger(__name__) - class OpenCourseWareTransformer: """Transformer for OpenCourseWare (OCW) source metadata.""" @@ -314,119 +303,3 @@ def dc_rights_uri(cls) -> str: @classmethod def dc_language_iso(cls) -> str: return "en_US" - - -class OpenCourseWare(Workflow): - """Workflow for OpenCourseWare (OCW) deposits. - - The deposits managed by this workflow are requested by the - Scholarly Communications and Collections Strategy (SCCS) department - and were previously deposited into DSpace@MIT by Technical services staff. - """ - - workflow_name: str = "opencourseware" - metadata_transformer = OpenCourseWareTransformer - - @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/opencourseware.json" - - def get_batch_bitstream_uris(self) -> list[str]: - """Get list of URIs for all zipfiles within the batch folder.""" - s3_client = S3Client() - return list( - s3_client.files_iter( - bucket=self.s3_bucket, - prefix=self.batch_path, - file_type=".zip", - exclude_prefixes=self.exclude_prefixes, - ) - ) - - def item_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Yield item metadata from metadata JSON file in the zip file. - - If the zip file does not include a metadata JSON file (data.json), - this method yields a dict containing only the item identifier. - Otherwise, a dict containing the item identifier and transformed metadata - is yielded. - - NOTE: Item identifiers are retrieved from the filenames of the zip - files, which follow the naming format ".zip". - """ - for file in self.batch_bitstream_uris: - try: - source_metadata = self._read_metadata_from_zip_file(file) - except FileNotFoundError: - source_metadata = {} - - transformed_metadata = self.metadata_transformer.transform(source_metadata) - - yield { - "item_identifier": self._parse_item_identifier(file), - **transformed_metadata, - } - - def _read_metadata_from_zip_file(self, file: str) -> dict[str, str]: - """Read source metadata JSON file in zip archive. - - This method expects a JSON file called "data.json" at the root - level of the the zip file. - - Args: - file: Object prefix for bitstream zip file, formatted as the - path from the S3 bucket to the file. - Given an S3 URI "s3://dsc/opencourseware/batch-00/123.zip", - then file = "opencourseware/batch-00/123.zip". - """ - with ( - smart_open.open(file, "rb") as file_input, - zipfile.ZipFile(file_input) as zip_file, - zip_file.open("data.json") as json_file, - ): - return json.load(json_file) - - def _parse_item_identifier(self, file: str) -> str: - """Parse item identifier from bitstream zip file.""" - return file.split("/")[-1].removesuffix(".zip") - - def prepare_batch( - self, - *, - synced: bool = False, # noqa: ARG002 - ) -> tuple[list, ...]: - """Prepare a batch of item submissions, given a batch of zip files. - - For this workflow, the expected number of item submissions is determined - by the number of zip files in the batch folder. This method will iterate - over the yielded transformed metadata, checking whether metadata is provided: - - - If only the item identifier is provided and no other metadata is available, - an error is recorded - - If metadata is present, init params are generated for the item submission - - For the OpenCourseWare workflow, the batch preparation steps are the same - for synced vs. non-synced workflows. - """ - item_submissions = [] - errors = [] - - for item_metadata in self.item_metadata_iter(): - # check if metadata is provided - # item identifier is always returned by iter - if len(item_metadata) == 1 and "item_identifier" in item_metadata: - errors.append( - (item_metadata["item_identifier"], str(ItemMetadataNotFoundError())) - ) - continue - - # if item submission includes metadata, save init params - item_submissions.append( - { - "batch_id": self.batch_id, - "item_identifier": item_metadata["item_identifier"], - "workflow_name": self.workflow_name, - } - ) - - return item_submissions, errors diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py new file mode 100644 index 0000000..17fe413 --- /dev/null +++ b/dsc/workflows/opencourseware/workflow.py @@ -0,0 +1,130 @@ +import json +import logging +import zipfile +from collections.abc import Iterator +from typing import Any + +import smart_open + +from dsc.exceptions import ItemMetadataNotFoundError +from dsc.utilities.aws.s3 import S3Client +from dsc.workflows.base import Workflow +from dsc.workflows.opencourseware import OpenCourseWareTransformer + +logger = logging.getLogger(__name__) + + +class OpenCourseWare(Workflow): + """Workflow for OpenCourseWare (OCW) deposits. + + The deposits managed by this workflow are requested by the + Scholarly Communications and Collections Strategy (SCCS) department + and were previously deposited into DSpace@MIT by Technical services staff. + """ + + workflow_name: str = "opencourseware" + metadata_transformer = OpenCourseWareTransformer + + @property + def metadata_mapping_path(self) -> str: + return "dsc/workflows/opencourseware/metadata_mapping.json" + + def get_batch_bitstream_uris(self) -> list[str]: + """Get list of URIs for all zipfiles within the batch folder.""" + s3_client = S3Client() + return list( + s3_client.files_iter( + bucket=self.s3_bucket, + prefix=self.batch_path, + file_type=".zip", + exclude_prefixes=self.exclude_prefixes, + ) + ) + + def item_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Yield item metadata from metadata JSON file in the zip file. + + If the zip file does not include a metadata JSON file (data.json), + this method yields a dict containing only the item identifier. + Otherwise, a dict containing the item identifier and transformed metadata + is yielded. + + NOTE: Item identifiers are retrieved from the filenames of the zip + files, which follow the naming format ".zip". + """ + for file in self.batch_bitstream_uris: + try: + source_metadata = self._read_metadata_from_zip_file(file) + except FileNotFoundError: + source_metadata = {} + + transformed_metadata = self.metadata_transformer.transform(source_metadata) + + yield { + "item_identifier": self._parse_item_identifier(file), + **transformed_metadata, + } + + def _read_metadata_from_zip_file(self, file: str) -> dict[str, str]: + """Read source metadata JSON file in zip archive. + + This method expects a JSON file called "data.json" at the root + level of the the zip file. + + Args: + file: Object prefix for bitstream zip file, formatted as the + path from the S3 bucket to the file. + Given an S3 URI "s3://dsc/opencourseware/batch-00/123.zip", + then file = "opencourseware/batch-00/123.zip". + """ + with ( + smart_open.open(file, "rb") as file_input, + zipfile.ZipFile(file_input) as zip_file, + zip_file.open("data.json") as json_file, + ): + return json.load(json_file) + + def _parse_item_identifier(self, file: str) -> str: + """Parse item identifier from bitstream zip file.""" + return file.split("/")[-1].removesuffix(".zip") + + def prepare_batch( + self, + *, + synced: bool = False, # noqa: ARG002 + ) -> tuple[list, ...]: + """Prepare a batch of item submissions, given a batch of zip files. + + For this workflow, the expected number of item submissions is determined + by the number of zip files in the batch folder. This method will iterate + over the yielded transformed metadata, checking whether metadata is provided: + + - If only the item identifier is provided and no other metadata is available, + an error is recorded + - If metadata is present, init params are generated for the item submission + + For the OpenCourseWare workflow, the batch preparation steps are the same + for synced vs. non-synced workflows. + """ + item_submissions = [] + errors = [] + + for item_metadata in self.item_metadata_iter(): + # check if metadata is provided + # item identifier is always returned by iter + if len(item_metadata) == 1 and "item_identifier" in item_metadata: + errors.append( + (item_metadata["item_identifier"], str(ItemMetadataNotFoundError())) + ) + continue + + # if item submission includes metadata, save init params + item_submissions.append( + { + "batch_id": self.batch_id, + "item_identifier": item_metadata["item_identifier"], + "workflow_name": self.workflow_name, + } + ) + + return item_submissions, errors diff --git a/dsc/workflows/sccs/__init__.py b/dsc/workflows/sccs/__init__.py new file mode 100644 index 0000000..913604d --- /dev/null +++ b/dsc/workflows/sccs/__init__.py @@ -0,0 +1,3 @@ +from dsc.workflows.sccs.workflow import SCCS + +__all__ = ["SCCS"] diff --git a/dsc/workflows/metadata_mapping/sccs.json b/dsc/workflows/sccs/metadata_mapping.json similarity index 100% rename from dsc/workflows/metadata_mapping/sccs.json rename to dsc/workflows/sccs/metadata_mapping.json diff --git a/dsc/workflows/sccs.py b/dsc/workflows/sccs/workflow.py similarity index 81% rename from dsc/workflows/sccs.py rename to dsc/workflows/sccs/workflow.py index 0f02ec3..363c364 100644 --- a/dsc/workflows/sccs.py +++ b/dsc/workflows/sccs/workflow.py @@ -1,4 +1,4 @@ -from dsc.workflows import SimpleCSV +from dsc.workflows.simple_csv import SimpleCSV class SCCS(SimpleCSV): @@ -13,7 +13,7 @@ class SCCS(SimpleCSV): @property def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/sccs.json" + return "dsc/workflows/sccs/metadata_mapping.json" @property def item_identifier_column_names(self) -> list[str]: diff --git a/dsc/workflows/simple_csv/__init__.py b/dsc/workflows/simple_csv/__init__.py new file mode 100644 index 0000000..43c9c16 --- /dev/null +++ b/dsc/workflows/simple_csv/__init__.py @@ -0,0 +1,3 @@ +from dsc.workflows.simple_csv.workflow import SimpleCSV + +__all__ = ["SimpleCSV"] diff --git a/dsc/workflows/base/simple_csv.py b/dsc/workflows/simple_csv/workflow.py similarity index 100% rename from dsc/workflows/base/simple_csv.py rename to dsc/workflows/simple_csv/workflow.py diff --git a/pyproject.toml b/pyproject.toml index 41bbe9f..9eabfd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ [project] name = "dsc" -version = "1.0" # See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#version +version = "1.3" # See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#version requires-python = "~=3.12.0" dependencies = [ diff --git a/tests/conftest.py b/tests/conftest.py index c61c4c5..96228f2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,10 +18,7 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows import OpenCourseWare -from dsc.workflows.archivesspace import ArchivesSpace -from dsc.workflows.base import Workflow -from dsc.workflows.base.simple_csv import SimpleCSV +from dsc.workflows import ArchivesSpace, OpenCourseWare, SimpleCSV, Workflow # Test Workflow classes ###################### diff --git a/tests/test_workflow_opencourseware.py b/tests/test_workflow_opencourseware.py index 1b6d88a..bc043fd 100644 --- a/tests/test_workflow_opencourseware.py +++ b/tests/test_workflow_opencourseware.py @@ -6,7 +6,9 @@ from dsc.item_submission import ItemSubmission -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) def test_workflow_ocw_metadata_mapping_dspace_metadata_success( mock_opencourseware_read_metadata_from_zip_file, caplog, @@ -102,7 +104,9 @@ def test_workflow_ocw_metadata_mapping_dspace_metadata_success( ] -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) def test_workflow_ocw_prepare_batch_success( mock_opencourseware_read_metadata_from_zip_file, mocked_item_submission_db, @@ -132,7 +136,9 @@ def test_workflow_ocw_prepare_batch_success( ) -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) def test_workflow_ocw_item_metadata_iter_success( mock_opencourseware_read_metadata_from_zip_file, caplog, From ddafb7fba8abbc397fa9eb9183b71ffb14e86f40 Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 11:47:39 -0500 Subject: [PATCH 2/8] Introduce abstract base class for transforming metadata Why these changes are being introduced: * Prior to OpenCourseWare, DSC's metadata transformation capabilities was limited to simple 1-1 mappings and splitting strings on delimiters for multi-valued fields. However, with OpenCourseWare and other workflows planned for implementation, transformations can be more complex. This extends the transformer approach originally developed for OpenCourseWare to other workflows as an option. How this addresses that need: * Create module for Transformer in dsc.workflows.base * Update OpenCourseWareTransformer to inherit from base Transformer Side effects of this change: * None Relevant ticket(s): * --- dsc/workflows/base/__init__.py | 3 +- dsc/workflows/base/transformer.py | 90 +++++++++++++++++++++ dsc/workflows/opencourseware/transformer.py | 64 +++++---------- dsc/workflows/opencourseware/workflow.py | 13 ++- uv.lock | 2 +- 5 files changed, 124 insertions(+), 48 deletions(-) create mode 100644 dsc/workflows/base/transformer.py diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index eb51c74..5254cbf 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -1,3 +1,4 @@ +from dsc.workflows.base.transformer import Transformer from dsc.workflows.base.workflow import Workflow -__all__ = ["Workflow"] +__all__ = ["Transformer", "Workflow"] diff --git a/dsc/workflows/base/transformer.py b/dsc/workflows/base/transformer.py new file mode 100644 index 0000000..4b5f485 --- /dev/null +++ b/dsc/workflows/base/transformer.py @@ -0,0 +1,90 @@ +import inspect +import itertools +import logging +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable +from typing import Any, final + +logger = logging.getLogger(__name__) + + +class Transformer(ABC): + """Base metadata transformer class. + + When defining a Transformer for a given workflow, you must: + + 1. Provide an implementation of Transformer.optional_fields() class method: + + Return a list of Dublin Core elements in dot (.) notation that is + expected in the transformed metadata. The provided values must + use the following format: dc.element.qualifier. + + 2. Provide class methods for metadata fields that (a) must be derived + dynamically or (b) set to a static value. + + Class methods may accept 'source_metadata' as a parameter. + + 3. [OPTIONAL] Provide class methods for metadata fields that are simply + mapped from an existing column in the source metadata. If provided, + the class method must accept 'source_metadata' as a parameter. + + Note: This is optional because the Transformer.transform() will + attempt to apply simple 1-1 mapping if a class method is not found + for a specified field. + """ + + # based on 'Metadata Recommendations' from DSpace docs + required_fields: Iterable[str] = ["dc.title", "dc.date.issued"] + + @classmethod + @abstractmethod + def optional_fields(cls) -> Iterable[str]: + """Dublin Core metadata fields.""" + + @final + @classmethod + def transform(cls, source_metadata: dict) -> dict | None: + """Transform source metadata. + + This method will iterate over all fields--combined from + Transformer.required and Transformer.optional_fields--and either + calls field methods (if provided) or applies simple 1-1 mapping + to generate transformed metadata. + + If a class method is not found for a given field and it is also + not found in 'source_metadata', the function returns None, which + indicates that something went wrong. + """ + transformed_metadata: dict[str, Any] = {} + + all_fields = itertools.chain(cls.required_fields, cls.optional_fields()) + + for field in all_fields: + field_method = cls.get_field_method(field) + if field_method: + transformed_metadata[field] = cls._call_field_method( + field_method, source_metadata + ) + else: + transformed_metadata[field] = source_metadata.get(field) + + return transformed_metadata + + @classmethod + def get_field_method(cls, field: str) -> Callable | None: + field_method_name = field.replace(".", "_") + try: + return getattr(cls, field_method_name) + except AttributeError: + return None + + @classmethod + def _call_field_method( + cls, field_method: Callable, source_metadata: dict + ) -> Any: # noqa: ANN401 + """Invoke field method with source metadata (as needed).""" + # check if 'source_metadata' is in signature + signature = inspect.signature(field_method) + if "source_metadata" in signature.parameters: + return field_method(source_metadata) + return field_method() diff --git a/dsc/workflows/opencourseware/transformer.py b/dsc/workflows/opencourseware/transformer.py index 7dc3e0e..14f52b5 100644 --- a/dsc/workflows/opencourseware/transformer.py +++ b/dsc/workflows/opencourseware/transformer.py @@ -1,29 +1,30 @@ -import inspect from collections.abc import Iterable -from typing import Any, ClassVar +from typing import ClassVar +from dsc.workflows.base import Transformer -class OpenCourseWareTransformer: + +class OpenCourseWareTransformer(Transformer): """Transformer for OpenCourseWare (OCW) source metadata.""" - fields: Iterable[str] = [ - # fields with derived values - "dc_title", - "dc_date_issued", - "dc_description_abstract", - "dc_contributor_author", - "dc_contributor_department", - "creativework_learningresourcetype", - "dc_subject", - "dc_identifier_other", - "dc_coverage_temporal", - "dc_audience_educationlevel", - # fields with static values - "dc_type", - "dc_rights", - "dc_rights_uri", - "dc_language_iso", - ] + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + # fields with derived values + "dc.description.abstract", + "dc.contributor.author", + "dc.contributor.department", + "creativework.learningresourcetype", + "dc.subject", + "dc.identifier.other", + "dc.coverage.temporal", + "dc.audience.educationlevel", + # fields with static values + "dc.type", + "dc.rights", + "dc.rights.uri", + "dc.language.iso", + ] department_mappings: ClassVar = { "1": "Massachusetts Institute of Technology. Department of Civil and Environmental Engineering", # noqa: E501 @@ -65,27 +66,6 @@ class OpenCourseWareTransformer: "EC": "Edgerton Center (Massachusetts Institute of Technology)", } - @classmethod - def transform(cls, source_metadata: dict) -> dict: - """Transform source metadata.""" - transformed_metadata: dict[str, Any] = {} - - if not source_metadata: - return transformed_metadata - - for field in cls.fields: - field_method = getattr(cls, field) - formatted_field_name = field.replace("_", ".") - - # check if 'source_metadata' is in signature - signature = inspect.signature(field_method) - if "source_metadata" in signature.parameters: - transformed_metadata[formatted_field_name] = field_method(source_metadata) - else: - transformed_metadata[formatted_field_name] = field_method() - - return transformed_metadata - @classmethod def dc_title(cls, source_metadata: dict) -> str: """Build a title string from course numbers, title, and term year. diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index 17fe413..e14e695 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -60,10 +60,15 @@ def item_metadata_iter(self) -> Iterator[dict[str, Any]]: transformed_metadata = self.metadata_transformer.transform(source_metadata) - yield { - "item_identifier": self._parse_item_identifier(file), - **transformed_metadata, - } + if transformed_metadata: + yield { + "item_identifier": self._parse_item_identifier(file), + **transformed_metadata, + } + else: + yield { + "item_identifier": self._parse_item_identifier(file), + } def _read_metadata_from_zip_file(self, file: str) -> dict[str, str]: """Read source metadata JSON file in zip archive. diff --git a/uv.lock b/uv.lock index f22704e..5ea6a39 100644 --- a/uv.lock +++ b/uv.lock @@ -423,7 +423,7 @@ sdist = { url = "https://files.pythonhosted.org/packages/a2/55/8f8cab2afd404cf57 [[package]] name = "dsc" -version = "1.0" +version = "1.3" source = { editable = "." } dependencies = [ { name = "boto3" }, From 82234b34d91ac0421c521fcdd5290e12e83905d2 Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 12:45:27 -0500 Subject: [PATCH 3/8] Update Workflow.prepare_batch methods to return ItemSubmission's --- dsc/workflows/base/workflow.py | 14 ++++++-------- dsc/workflows/opencourseware/workflow.py | 11 ++++++----- dsc/workflows/simple_csv/workflow.py | 11 ++++++----- tests/conftest.py | 20 ++++++++++---------- tests/test_workflow_opencourseware.py | 10 +++++----- tests/test_workflow_simple_csv.py | 12 +++++++----- 6 files changed, 40 insertions(+), 38 deletions(-) diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index 4346d24..5009059 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -232,23 +232,21 @@ def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: MUST be overridden by workflow subclasses. Returns: - A tuple of item submissions (init params) represented as a - list of dicts and errors represented as a list of tuples + A tuple containing list of ItemSubmission's and + errors represented as a list of tuples containing the item identifier and the error message. """ pass # noqa: PIE790 @final - def _create_batch_in_db(self, item_submissions: list[dict]) -> None: + def _create_batch_in_db(self, item_submissions: list[ItemSubmission]) -> None: """Write records for a batch of item submissions to DynamoDB. - This method loops through the item submissions (init params) - represented as a list dicts. For each item submission, the - method creates an instance of ItemSubmission and saves the + For each ItemSubmission, the method updates the last_run_date, + status, and status_details attributes and saves the record to DynamoDB. """ - for item_submission_init_params in item_submissions: - item_submission = ItemSubmission.create(**item_submission_init_params) + for item_submission in item_submissions: item_submission.last_run_date = self.run_date item_submission.status = ItemSubmissionStatus.BATCH_CREATED item_submission.status_details = None diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index e14e695..6819c03 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -7,6 +7,7 @@ import smart_open from dsc.exceptions import ItemMetadataNotFoundError +from dsc.item_submission import ItemSubmission from dsc.utilities.aws.s3 import S3Client from dsc.workflows.base import Workflow from dsc.workflows.opencourseware import OpenCourseWareTransformer @@ -125,11 +126,11 @@ def prepare_batch( # if item submission includes metadata, save init params item_submissions.append( - { - "batch_id": self.batch_id, - "item_identifier": item_metadata["item_identifier"], - "workflow_name": self.workflow_name, - } + ItemSubmission( + batch_id=self.batch_id, + item_identifier=item_metadata["item_identifier"], + workflow_name=self.workflow_name, + ) ) return item_submissions, errors diff --git a/dsc/workflows/simple_csv/workflow.py b/dsc/workflows/simple_csv/workflow.py index 1cdce97..83cfe8e 100644 --- a/dsc/workflows/simple_csv/workflow.py +++ b/dsc/workflows/simple_csv/workflow.py @@ -6,6 +6,7 @@ import smart_open from dsc.exceptions import ItemBitstreamsNotFoundError +from dsc.item_submission import ItemSubmission from dsc.utilities.aws import S3Client from dsc.workflows.base import Workflow @@ -114,11 +115,11 @@ def prepare_batch( # if item submission has associated bitstreams, save init params item_submissions.append( - { - "batch_id": self.batch_id, - "item_identifier": item_metadata["item_identifier"], - "workflow_name": self.workflow_name, - } + ItemSubmission( + batch_id=self.batch_id, + item_identifier=item_metadata["item_identifier"], + workflow_name=self.workflow_name, + ) ) return item_submissions, errors diff --git a/tests/conftest.py b/tests/conftest.py index 96228f2..e0ddab3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -59,16 +59,16 @@ def item_metadata_iter(self): def prepare_batch(self, *, synced: bool = False): # noqa: ARG002 return ( [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "test", - }, - { - "batch_id": "batch-aaa", - "item_identifier": "789", - "workflow_name": "test", - }, + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="test", + ), + ItemSubmission( + batch_id="batch-aaa", + item_identifier="789", + workflow_name="test", + ), ], [], ) diff --git a/tests/test_workflow_opencourseware.py b/tests/test_workflow_opencourseware.py index bc043fd..244812b 100644 --- a/tests/test_workflow_opencourseware.py +++ b/tests/test_workflow_opencourseware.py @@ -126,11 +126,11 @@ def test_workflow_ocw_prepare_batch_success( assert opencourseware_workflow_instance.prepare_batch() == ( [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "opencourseware", - } + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="opencourseware", + ) ], [], ) diff --git a/tests/test_workflow_simple_csv.py b/tests/test_workflow_simple_csv.py index dcf7eef..6dc9971 100644 --- a/tests/test_workflow_simple_csv.py +++ b/tests/test_workflow_simple_csv.py @@ -2,6 +2,8 @@ import pandas as pd +from dsc.item_submission import ItemSubmission + def test_workflow_simple_csv_prepare_batch_success( mocked_s3_simple_csv, @@ -20,11 +22,11 @@ def test_workflow_simple_csv_prepare_batch_success( ) assert simple_csv_workflow_instance.prepare_batch() == ( [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "simple-csv", - } + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="simple-csv", + ) ], [], ) From c1975d01f73057831308a224e5d02d4a27c1eadc Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 12:53:39 -0500 Subject: [PATCH 4/8] Rename abstract method for preparing batches --- dsc/workflows/base/workflow.py | 4 ++-- dsc/workflows/opencourseware/workflow.py | 2 +- dsc/workflows/simple_csv/workflow.py | 2 +- tests/conftest.py | 2 +- tests/test_workflow_base.py | 2 +- tests/test_workflow_opencourseware.py | 2 +- tests/test_workflow_simple_csv.py | 4 ++-- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index 5009059..3f3f69a 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -211,13 +211,13 @@ def create_batch(self, *, synced: bool = False) -> None: This method prepares the necessary assets in S3 (programmatically as needed) and records each item in the batch to DynamoDB. """ - item_submissions, errors = self.prepare_batch(synced=synced) + item_submissions, errors = self._prepare_batch(synced=synced) if errors: raise BatchCreationFailedError(errors) self._create_batch_in_db(item_submissions) @abstractmethod - def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: + def _prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: """Prepare batch submission assets in S3. This method performs the required steps to prepare a batch diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index 6819c03..72b1a4c 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -94,7 +94,7 @@ def _parse_item_identifier(self, file: str) -> str: """Parse item identifier from bitstream zip file.""" return file.split("/")[-1].removesuffix(".zip") - def prepare_batch( + def _prepare_batch( self, *, synced: bool = False, # noqa: ARG002 diff --git a/dsc/workflows/simple_csv/workflow.py b/dsc/workflows/simple_csv/workflow.py index 83cfe8e..74bf706 100644 --- a/dsc/workflows/simple_csv/workflow.py +++ b/dsc/workflows/simple_csv/workflow.py @@ -79,7 +79,7 @@ def item_metadata_iter(self, metadata_file: str = "metadata.csv") -> Iterator[di for k, v in row.items() } - def prepare_batch( + def _prepare_batch( self, *, synced: bool = False, # noqa: ARG002 diff --git a/tests/conftest.py b/tests/conftest.py index e0ddab3..29e495d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -56,7 +56,7 @@ def item_metadata_iter(self): }, ] - def prepare_batch(self, *, synced: bool = False): # noqa: ARG002 + def _prepare_batch(self, *, synced: bool = False): # noqa: ARG002 return ( [ ItemSubmission( diff --git a/tests/test_workflow_base.py b/tests/test_workflow_base.py index a55d8b5..36c8fa8 100644 --- a/tests/test_workflow_base.py +++ b/tests/test_workflow_base.py @@ -44,7 +44,7 @@ def test_base_workflow_get_workflow_invalid_workflow_name_raises_error( def test_base_workflow_create_batch_in_db_success( base_workflow_instance, mocked_item_submission_db ): - item_submissions, _ = base_workflow_instance.prepare_batch() + item_submissions, _ = base_workflow_instance._prepare_batch() # noqa: SLF001 base_workflow_instance._create_batch_in_db(item_submissions) # noqa: SLF001 item_submission = ItemSubmissionDB.get(hash_key="batch-aaa", range_key="123") diff --git a/tests/test_workflow_opencourseware.py b/tests/test_workflow_opencourseware.py index 244812b..ce27046 100644 --- a/tests/test_workflow_opencourseware.py +++ b/tests/test_workflow_opencourseware.py @@ -124,7 +124,7 @@ def test_workflow_ocw_prepare_batch_success( opencourseware_source_metadata ) - assert opencourseware_workflow_instance.prepare_batch() == ( + assert opencourseware_workflow_instance._prepare_batch() == ( [ ItemSubmission( batch_id="batch-aaa", diff --git a/tests/test_workflow_simple_csv.py b/tests/test_workflow_simple_csv.py index 6dc9971..3b471c7 100644 --- a/tests/test_workflow_simple_csv.py +++ b/tests/test_workflow_simple_csv.py @@ -20,7 +20,7 @@ def test_workflow_simple_csv_prepare_batch_success( bucket="dsc", key="simple-csv/batch-aaa/123_002.pdf", ) - assert simple_csv_workflow_instance.prepare_batch() == ( + assert simple_csv_workflow_instance._prepare_batch() == ( # noqa: SLF001 [ ItemSubmission( batch_id="batch-aaa", @@ -36,7 +36,7 @@ def test_workflow_simple_csv_prepare_batch_track_errors( mocked_s3_simple_csv, simple_csv_workflow_instance, ): - assert simple_csv_workflow_instance.prepare_batch() == ( + assert simple_csv_workflow_instance._prepare_batch() == ( # noqa: SLF001 [], [("123", "No bitstreams found for the item submission")], ) From 915081349472ff566ad98467078e62c59358abd2 Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 15:27:30 -0500 Subject: [PATCH 5/8] Move creation of DSpace metadata JSON to batch creation step Why these changes are being introduced: During batch creation, we are performing checks to see if both metadata and bitstreams are provided for a given item submission. When determining if metadata is available, what we should really ask is if the metadata is sufficient and can be transformed to the format that is needed for DSpace ingests. How this addresses that need: * Update ItemSubmission.prepare_dspace_metadata to clarify what is being created (the 'MetadataEntry' object payload for the DSpace REST API) * Deprecate ItemSubmission.validate_dspace_metadata * Add property methods for retrieving dspace metadata S3 URIs to base Workflow * Add final method for creating DSpace metadata JSON to base workflow * Remove calls to create DSpace metadata JSON from submit command Side effects of this change: * A successful run of batch creation will include the creation of a 'dspace_metadata/' subfolder in the batch folder that contains JSON files of Dublin Core formatted metadata for each item submission. Relevant ticket(s): * TBD --- dsc/exceptions.py | 2 +- dsc/item_submission.py | 164 ++++++++--------------- dsc/workflows/base/workflow.py | 62 +++++++-- dsc/workflows/opencourseware/workflow.py | 8 +- 4 files changed, 118 insertions(+), 118 deletions(-) diff --git a/dsc/exceptions.py b/dsc/exceptions.py index 0c63a26..45528bf 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -14,7 +14,7 @@ class InvalidWorkflowNameError(Exception): pass -class ItemMetadatMissingRequiredFieldError(Exception): +class ItemMetadataMissingRequiredFieldError(Exception): pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index e6613f2..d9c54e0 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -2,6 +2,7 @@ import json import logging +from collections import defaultdict from dataclasses import dataclass, fields from typing import TYPE_CHECKING, Any @@ -12,8 +13,7 @@ from dsc.db.models import ITEM_SUBMISSION_LOG_STR, ItemSubmissionDB, ItemSubmissionStatus from dsc.exceptions import ( DSpaceMetadataUploadError, - InvalidDSpaceMetadataError, - ItemMetadatMissingRequiredFieldError, + ItemMetadataMissingRequiredFieldError, SQSMessageSendError, ) from dsc.utilities.aws.s3 import S3Client @@ -176,9 +176,9 @@ def save(self) -> None: ) item_submission_db.create() - logger.info(f"Saved record " f"{ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=self.item_identifier - )}") + logger.info(f"Saved record {ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=self.item_identifier + )}") def upsert_db(self) -> None: """Upsert a record in DynamoDB from ItemSubmission. @@ -195,9 +195,9 @@ def upsert_db(self) -> None: ) item_submission_db.save() - logger.info(f"Upserted record " f"{ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=self.item_identifier - )}") + logger.info(f"Upserted record {ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=self.item_identifier + )}") def ready_to_submit(self) -> bool: """Check if the item submission is ready to be submitted.""" @@ -209,32 +209,31 @@ def ready_to_submit(self) -> bool: case ItemSubmissionStatus.INGEST_SUCCESS: logger.info( f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "already ingested, skipping submission" + item_identifier=self.item_identifier)} " + "already ingested, skipping submission" ) case ItemSubmissionStatus.SUBMIT_SUCCESS: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " " already submitted, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + " already submitted, skipping submission" ) case ItemSubmissionStatus.MAX_RETRIES_REACHED: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "max retries reached, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + "max retries reached, skipping submission" ) case None: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " " status unknown, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + "status unknown, skipping submission" ) case _: logger.debug( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "allowed for submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} allowed for submission" ) ready_to_submit = True @@ -248,93 +247,48 @@ def _exceeded_retry_threshold(self) -> None: if self.ingest_attempts > CONFIG.retry_threshold: self.status = ItemSubmissionStatus.MAX_RETRIES_REACHED - def prepare_dspace_metadata( - self, metadata_mapping: dict, item_metadata: dict, s3_bucket: str, batch_path: str - ) -> None: + def prepare_dspace_metadata(self, s3_bucket: str, batch_path: str) -> str: """Prepare DSpace metadata for the item submission.""" - self.create_dspace_metadata( - item_metadata=item_metadata, - metadata_mapping=metadata_mapping, - ) - self.validate_dspace_metadata() - self.upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path) - - def create_dspace_metadata( - self, item_metadata: dict[str, Any], metadata_mapping: dict - ) -> None: - """Create DSpace metadata from the item's source metadata. - - A metadata mapping is a dict with the format seen below: - - { - "dc.contributor": { - "source_field_name": "contributor", - "language": "", - "delimiter": "", - "required": true | false - } - } - - When setting up the metadata mapping JSON file, "language" and "delimiter" - can be omitted from the file if not applicable. Required fields ("item_identifier" - and "title") must be set as required (true); if "required" is not listed as a - a config, the field defaults as not required (false). + self.dspace_metadata = self._create_dspace_metadataentry() + return self._upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path) - Args: - item_metadata: Item metadata from which the DSpace metadata will be derived. - metadata_mapping: A mapping of DSpace metadata fields to source metadata - fields. - """ - metadata_entries = [] - for field_name, field_mapping in metadata_mapping.items(): - if field_name not in ["item_identifier"]: - - field_value = item_metadata.get(field_mapping["source_field_name"]) - if not field_value and field_mapping.get("required", False): - raise ItemMetadatMissingRequiredFieldError( - "Item metadata missing required field: '" - f"{field_mapping["source_field_name"]}'" - ) - - if field_value: - if isinstance(field_value, list): - field_values = field_value - elif delimiter := field_mapping.get("delimiter"): - field_values = field_value.split(delimiter) - else: - field_values = [field_value] - - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": field_mapping.get("language"), - } - for value in field_values - ] - ) - self.dspace_metadata = {"metadata": metadata_entries} - - def validate_dspace_metadata(self) -> bool: - """Validate that DSpace metadata follows the expected format for DSpace 6.x. + def _create_dspace_metadataentry(self) -> None | dict: + """Format transformed metadata for DSpace REST API. - Args: - dspace_metadata: DSpace metadata to be validated. + This method follows the specs for a 'MetadataEntry Object' for + the DSpace 6 REST API: + https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-MetadataEntryO """ - valid = False - if self.dspace_metadata and self.dspace_metadata.get("metadata") is not None: - for element in self.dspace_metadata["metadata"]: - if element.get("key") is not None and element.get("value") is not None: - valid = True - logger.debug("Valid DSpace metadata created") - else: - raise InvalidDSpaceMetadataError( - f"Invalid DSpace metadata created: {self.dspace_metadata} ", - ) - return valid + if not self.dspace_metadata: + return None + + metadataentry = defaultdict(list) + + for field, value in self.dspace_metadata.items(): + if field == "item_identifier": + continue + + if not value and field in ["dc.title", "dc.date.issued"]: + raise ItemMetadataMissingRequiredFieldError( + f"Item metadata missing required field: {field}" + ) + + if value: + value_list = value if isinstance(value, list) else [value] + + metadataentry["metadata"].extend( + [ + { + "key": field, + "value": _value, + } + for _value in value_list + ] + ) + + return dict(metadataentry) - def upload_dspace_metadata(self, bucket: str, prefix: str) -> None: + def _upload_dspace_metadata(self, bucket: str, prefix: str) -> str: """Upload DSpace metadata to S3 using the specified bucket and keyname. Args: @@ -361,7 +315,7 @@ def upload_dspace_metadata(self, bucket: str, prefix: str) -> None: metadata_s3_uri = f"s3://{bucket}/{metadata_s3_key}" logger.info(f"Metadata uploaded to S3: {metadata_s3_uri}") - self.metadata_s3_uri = metadata_s3_uri + return metadata_s3_uri def send_submission_message( self, diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index 3f3f69a..956ff32 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -14,12 +14,14 @@ from dsc.db.models import ItemSubmissionStatus from dsc.exceptions import ( BatchCreationFailedError, + DSpaceMetadataUploadError, InvalidSQSMessageError, InvalidWorkflowNameError, + ItemMetadataMissingRequiredFieldError, ) from dsc.item_submission import ItemSubmission from dsc.reports import Report -from dsc.utilities.aws import SESClient, SQSClient +from dsc.utilities.aws import S3Client, SESClient, SQSClient from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY if TYPE_CHECKING: # pragma: no cover @@ -128,6 +130,7 @@ def __init__(self, batch_id: str) -> None: # cache list of bitstreams self._batch_bitstream_uris: list[str] | None = None + self._batch_dspace_metadata_json_uris: list[str] | None = None @property @abstractmethod @@ -159,6 +162,19 @@ def batch_bitstream_uris(self) -> list[str]: self._batch_bitstream_uris = self.get_batch_bitstream_uris() return self._batch_bitstream_uris + @property + def batch_dspace_metadata_json_uris(self) -> list[str]: + if not self._batch_dspace_metadata_json_uris: + s3_client = S3Client() + self._batch_dspace_metadata_json_uris = list( + s3_client.files_iter( + bucket=self.s3_bucket, + prefix=f"{self.batch_path}dspace_metadata/", + file_type=".json", + ) + ) + return self._batch_dspace_metadata_json_uris + @property def retry_threshold(self) -> int: return CONFIG.retry_threshold @@ -192,6 +208,12 @@ def get_item_bitstream_uris(self, item_identifier: str) -> list[str]: """Get list of bitstreams URIs for an item.""" return [uri for uri in self.batch_bitstream_uris if item_identifier in uri] + @final + def get_item_dspace_metadata_json_uri(self, item_identifier: str) -> str: + return next( + uri for uri in self.batch_dspace_metadata_json_uris if item_identifier in uri + ) + @abstractmethod def item_metadata_iter(self) -> Iterator[dict[str, Any]]: """Iterate through batch metadata to yield item metadata. @@ -214,6 +236,13 @@ def create_batch(self, *, synced: bool = False) -> None: item_submissions, errors = self._prepare_batch(synced=synced) if errors: raise BatchCreationFailedError(errors) + + _dspace_metadata_uris, errors = self._create_dspace_metadata_json( + item_submissions + ) + if errors: + raise BatchCreationFailedError(errors) + self._create_batch_in_db(item_submissions) @abstractmethod @@ -238,6 +267,25 @@ def _prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: """ pass # noqa: PIE790 + @final + def _create_dspace_metadata_json( + self, item_submissions: list[ItemSubmission] + ) -> tuple[list, ...]: + dspace_metadata_uris = [] + errors = [] + for item_submission in item_submissions: + try: + uri = item_submission.prepare_dspace_metadata( + self.s3_bucket, self.batch_path + ) + dspace_metadata_uris.append(uri) + except ( + DSpaceMetadataUploadError, + ItemMetadataMissingRequiredFieldError, + ) as exception: + errors.append((item_submission.item_identifier, str(exception))) + return dspace_metadata_uris, errors + @final def _create_batch_in_db(self, item_submissions: list[ItemSubmission]) -> None: """Write records for a batch of item submissions to DynamoDB. @@ -269,11 +317,6 @@ def submit_items(self, collection_handle: str) -> list: f"for batch '{self.batch_id}'" ) - batch_metadata = { - item_metadata["item_identifier"]: item_metadata - for item_metadata in self.item_metadata_iter() - } - items = [] for item_submission in ItemSubmission.get_batch(self.batch_id): @@ -288,11 +331,8 @@ def submit_items(self, collection_handle: str) -> list: continue try: # prepare submission assets - item_submission.prepare_dspace_metadata( - metadata_mapping=self.metadata_mapping, - item_metadata=batch_metadata[item_identifier], - s3_bucket=self.s3_bucket, - batch_path=self.batch_path, + item_submission.metadata_s3_uri = self.get_item_dspace_metadata_json_uri( + item_identifier ) item_submission.bitstream_s3_uris = self.get_item_bitstream_uris( item_identifier diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index 72b1a4c..36e7e62 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -124,12 +124,18 @@ def _prepare_batch( ) continue - # if item submission includes metadata, save init params + # copy transformed metadata, excluding 'item_identifier' + dspace_metadata = { + k: v for k, v in item_metadata.items() if k != "item_identifier" + } + + # create ItemSubmission item_submissions.append( ItemSubmission( batch_id=self.batch_id, item_identifier=item_metadata["item_identifier"], workflow_name=self.workflow_name, + dspace_metadata=dspace_metadata, ) ) From d23cd6e99a3a635ef88dc6f2a404fc66f416a14d Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Thu, 19 Feb 2026 08:40:18 -0500 Subject: [PATCH 6/8] Align all workflows to use a Transformer for generating Dublin Core metadata Why these changes are being introduced: * This allows alignment of DSC workflows such that metadata transformations are handled via a shared module/abstract -- the Transformer class. Prior to these changes, some workflows were solely relying on the metadata mapping JSON files. The Transformer class pulls inspiration from the Transmogrifier app (https://github.com/MITLibraries/transmogrifier). This will allow us to consolidate any and all complex transformations inside the Transformer class, while still standardizing the "how" across all DSC workflows. How this addresses that need: * Define transformer modules for ArchivesSpace and SCCS * Translate mapping from metadata mapping JSON files to class methods on Transformer classes * Add 'metadata_transformer' property to base Workflow class Side effects of this change: * This allows deprecation of metadata mapping JSON files. Relevant ticket(s): * TBD --- dsc/workflows/archivesspace/__init__.py | 3 +- dsc/workflows/archivesspace/transformer.py | 20 +++++++++ dsc/workflows/archivesspace/workflow.py | 5 +++ dsc/workflows/base/workflow.py | 6 +++ dsc/workflows/opencourseware/workflow.py | 11 ++++- dsc/workflows/sccs/__init__.py | 3 +- dsc/workflows/sccs/transformer.py | 32 +++++++++++++++ dsc/workflows/sccs/workflow.py | 6 +++ dsc/workflows/simple_csv/workflow.py | 47 +++++++++++++++++++--- 9 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 dsc/workflows/archivesspace/transformer.py create mode 100644 dsc/workflows/sccs/transformer.py diff --git a/dsc/workflows/archivesspace/__init__.py b/dsc/workflows/archivesspace/__init__.py index d9e00ff..16ef590 100644 --- a/dsc/workflows/archivesspace/__init__.py +++ b/dsc/workflows/archivesspace/__init__.py @@ -1,3 +1,4 @@ +from dsc.workflows.archivesspace.transformer import ArchivesSpaceTransformer from dsc.workflows.archivesspace.workflow import ArchivesSpace -__all__ = ["ArchivesSpace"] +__all__ = ["ArchivesSpace", "ArchivesSpaceTransformer"] diff --git a/dsc/workflows/archivesspace/transformer.py b/dsc/workflows/archivesspace/transformer.py new file mode 100644 index 0000000..03977e3 --- /dev/null +++ b/dsc/workflows/archivesspace/transformer.py @@ -0,0 +1,20 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class ArchivesSpaceTransformer(Transformer): + """Transformer for OpenCourseWare (OCW) source metadata.""" + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["author"].split("|") + + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + "dc.contributor.author", + "dc.description", + "dc.rights", + "dc.rights.uri", + ] diff --git a/dsc/workflows/archivesspace/workflow.py b/dsc/workflows/archivesspace/workflow.py index 059857f..a5fa279 100644 --- a/dsc/workflows/archivesspace/workflow.py +++ b/dsc/workflows/archivesspace/workflow.py @@ -5,6 +5,7 @@ from dsc.db.models import ItemSubmissionStatus from dsc.item_submission import ItemSubmission +from dsc.workflows.archivesspace import ArchivesSpaceTransformer from dsc.workflows.simple_csv import SimpleCSV logger = logging.getLogger(__name__) @@ -23,6 +24,10 @@ class ArchivesSpace(SimpleCSV): workflow_name: str = "archivesspace" submission_system: str = "Dome" + @property + def metadata_transformer(self) -> type[ArchivesSpaceTransformer]: + return ArchivesSpaceTransformer + @property def metadata_mapping_path(self) -> str: return "dsc/workflows/archivesspace/metadata_mapping.json" diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index 956ff32..debc821 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -30,6 +30,7 @@ from mypy_boto3_sqs.type_defs import MessageTypeDef from dsc.reports import Report + from dsc.workflows.base import Transformer logger = logging.getLogger(__name__) CONFIG = Config() @@ -132,6 +133,11 @@ def __init__(self, batch_id: str) -> None: self._batch_bitstream_uris: list[str] | None = None self._batch_dspace_metadata_json_uris: list[str] | None = None + @property + @abstractmethod + def metadata_transformer(self) -> type[Transformer]: + """Transformer for source metadata.""" + @property @abstractmethod def metadata_mapping_path(self) -> str: diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index 36e7e62..4eda4a0 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -24,7 +24,10 @@ class OpenCourseWare(Workflow): """ workflow_name: str = "opencourseware" - metadata_transformer = OpenCourseWareTransformer + + @property + def metadata_transformer(self) -> type[OpenCourseWareTransformer]: + return OpenCourseWareTransformer @property def metadata_mapping_path(self) -> str: @@ -43,7 +46,7 @@ def get_batch_bitstream_uris(self) -> list[str]: ) def item_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Yield item metadata from metadata JSON file in the zip file. + """Yield transformed metadata from metadata JSON file in the zip file. If the zip file does not include a metadata JSON file (data.json), this method yields a dict containing only the item identifier. @@ -52,6 +55,10 @@ def item_metadata_iter(self) -> Iterator[dict[str, Any]]: NOTE: Item identifiers are retrieved from the filenames of the zip files, which follow the naming format ".zip". + + Yields: + A dict containing the item identifier and Dublin Core metadata + for DSpace. """ for file in self.batch_bitstream_uris: try: diff --git a/dsc/workflows/sccs/__init__.py b/dsc/workflows/sccs/__init__.py index 913604d..3b5bd23 100644 --- a/dsc/workflows/sccs/__init__.py +++ b/dsc/workflows/sccs/__init__.py @@ -1,3 +1,4 @@ +from dsc.workflows.sccs.transformer import SCCSTransformer from dsc.workflows.sccs.workflow import SCCS -__all__ = ["SCCS"] +__all__ = ["SCCS", "SCCSTransformer"] diff --git a/dsc/workflows/sccs/transformer.py b/dsc/workflows/sccs/transformer.py new file mode 100644 index 0000000..eb7167b --- /dev/null +++ b/dsc/workflows/sccs/transformer.py @@ -0,0 +1,32 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class SCCSTransformer(Transformer): + """Transformer for SCCS source metadata.""" + + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + "dc.publisher", + "dc.identifier.mitlicense", + "dc.eprint.version", + "dc.type", + "dc.type.uri", + "dc.source", + "dc.contributor.author", + "dc.contributor.department", + "dc.relation.isversionof", + "dc.relation.journal", + "dc.identifier.issn", + "dc.date.submitted", + "dc.rights", + "dc.rights.uri", + "dc.description", + "dc.description.sponsorship", + ] + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["dc.contributor.author"].split("|") diff --git a/dsc/workflows/sccs/workflow.py b/dsc/workflows/sccs/workflow.py index 363c364..2e1540b 100644 --- a/dsc/workflows/sccs/workflow.py +++ b/dsc/workflows/sccs/workflow.py @@ -1,3 +1,4 @@ +from dsc.workflows.sccs import SCCSTransformer from dsc.workflows.simple_csv import SimpleCSV @@ -11,6 +12,11 @@ class SCCS(SimpleCSV): workflow_name: str = "sccs" + @property + def metadata_transformer(self) -> type[SCCSTransformer]: + """Transformer for source metadata.""" + return SCCSTransformer + @property def metadata_mapping_path(self) -> str: return "dsc/workflows/sccs/metadata_mapping.json" diff --git a/dsc/workflows/simple_csv/workflow.py b/dsc/workflows/simple_csv/workflow.py index 74bf706..8d46635 100644 --- a/dsc/workflows/simple_csv/workflow.py +++ b/dsc/workflows/simple_csv/workflow.py @@ -1,4 +1,5 @@ import logging +from abc import abstractmethod from collections.abc import Iterator import numpy as np @@ -8,7 +9,7 @@ from dsc.exceptions import ItemBitstreamsNotFoundError from dsc.item_submission import ItemSubmission from dsc.utilities.aws import S3Client -from dsc.workflows.base import Workflow +from dsc.workflows.base import Transformer, Workflow logger = logging.getLogger(__name__) @@ -22,6 +23,11 @@ class SimpleCSV(Workflow): workflow_name: str = "simple-csv" + @property + @abstractmethod + def metadata_transformer(self) -> type[Transformer]: + """Transformer for source metadata.""" + @property def item_identifier_column_names(self) -> list[str]: return ["item_identifier"] @@ -36,14 +42,25 @@ def get_batch_bitstream_uris(self) -> list[str]: ) def item_metadata_iter(self, metadata_file: str = "metadata.csv") -> Iterator[dict]: - """Yield dicts of item metadata from metadata CSV file. + """Yield transformed metadata from metadata CSV file. + + This method will read rows from the metadata CSV file then call + self.metadata_transformer.transform to generate Dublin Core + metadata for DSpace. A dict where keys = dc.element.qualifier and + value = transformed/mapped value is returned. For logging purposes, + the dict includes an entry for 'item_identifier'. + + If self.metadata_transformer.transform returns None (i.e., + something went wrong with transformation), the yielded dict will + only include the 'item_identifier'. Args: metadata_file: Metadata CSV filename with the filename extension (.csv) included. Defaults to 'metadata.csv'. Yields: - Item metadata. + A dict containing the item identifier and Dublin Core metadata + for DSpace. """ with smart_open.open( f"s3://{self.s3_bucket}/{self.batch_path}{metadata_file}", @@ -74,11 +91,25 @@ def item_metadata_iter(self, metadata_file: str = "metadata.csv") -> Iterator[di for _, row in metadata_df.iterrows(): # replace all NaN values with None - yield { + source_metadata = { k: (None if isinstance(v, float) and np.isnan(v) else v) for k, v in row.items() } + transformed_metadata = self.metadata_transformer.transform( + source_metadata + ) + + if transformed_metadata: + yield { + "item_identifier": source_metadata["item_identifier"], + **transformed_metadata, + } + else: + yield { + "item_identifier": source_metadata["item_identifier"], + } + def _prepare_batch( self, *, @@ -113,12 +144,18 @@ def _prepare_batch( ) continue - # if item submission has associated bitstreams, save init params + # copy transformed metadata, excluding 'item_identifier' + dspace_metadata = { + k: v for k, v in item_metadata.items() if k != "item_identifier" + } + + # create ItemSubmission item_submissions.append( ItemSubmission( batch_id=self.batch_id, item_identifier=item_metadata["item_identifier"], workflow_name=self.workflow_name, + dspace_metadata=dspace_metadata, ) ) From 2aaaf2c469ab23d3cef9c72737e503db40d6a76b Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Wed, 18 Feb 2026 15:28:37 -0500 Subject: [PATCH 7/8] Update tests --- tests/conftest.py | 173 ++++++---------- tests/fixtures/workflows/__init__.py | 7 + .../fixtures/workflows/localtest/__init__.py | 9 + .../workflows/localtest/transformer.py | 23 +++ .../fixtures/workflows/localtest/workflow.py | 92 +++++++++ tests/test_cli.py | 24 ++- tests/test_item_submission.py | 58 ++---- tests/test_transformer_opencourseware.py | 91 ++++++--- tests/test_workflow_base.py | 68 ++++-- tests/test_workflow_opencourseware.py | 193 +++++++++--------- tests/test_workflow_simple_csv.py | 33 ++- 11 files changed, 453 insertions(+), 318 deletions(-) create mode 100644 tests/fixtures/workflows/__init__.py create mode 100644 tests/fixtures/workflows/localtest/__init__.py create mode 100644 tests/fixtures/workflows/localtest/transformer.py create mode 100644 tests/fixtures/workflows/localtest/workflow.py diff --git a/tests/conftest.py b/tests/conftest.py index 29e495d..8ffd83a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,91 +18,18 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows import ArchivesSpace, OpenCourseWare, SimpleCSV, Workflow +from dsc.workflows.archivesspace import ArchivesSpace +from dsc.workflows.opencourseware import OpenCourseWareTransformer +from tests.fixtures.workflows import TestOpenCourseWare, TestSimpleCSV, TestWorkflow +######################### +# Test Workflow classes +######################### -# Test Workflow classes ###################### -class TestWorkflow(Workflow): - workflow_name: str = "test" - submission_system: str = "Test@MIT" - - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - def get_batch_bitstream_uris(self) -> list[str]: - return [ - "s3://dsc/test/batch-aaa/123_01.pdf", - "s3://dsc/test/batch-aaa/123_02.pdf", - "s3://dsc/test/batch-aaa/789_01.pdf", - ] - - def item_metadata_iter(self): - yield from [ - { - "title": "Title", - "contributor": "Author 1|Author 2", - "item_identifier": "123", - }, - { - "title": "2nd Title", - "contributor": "Author 3|Author 4", - "item_identifier": "789", - }, - ] - - def _prepare_batch(self, *, synced: bool = False): # noqa: ARG002 - return ( - [ - ItemSubmission( - batch_id="batch-aaa", - item_identifier="123", - workflow_name="test", - ), - ItemSubmission( - batch_id="batch-aaa", - item_identifier="789", - workflow_name="test", - ), - ], - [], - ) - - -class TestOpenCourseWare(OpenCourseWare): - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - -class TestSimpleCSV(SimpleCSV): - - workflow_name = "simple-csv" - submission_system: str = "Test@MIT" - - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - - @property - def item_identifier_column_names(self) -> list[str]: - return ["item_identifier", "filename"] - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - -# Test Workflow instances #################### @pytest.fixture @freeze_time("2025-01-01 09:00:00") -def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3): +def base_workflow_instance(metadata_mapping, mocked_s3): return TestWorkflow(batch_id="batch-aaa") @@ -122,7 +49,7 @@ def opencourseware_workflow_instance(): return TestOpenCourseWare(batch_id="batch-aaa") -# Test fixtures ############################## +# Test fixtures @pytest.fixture(autouse=True) def _test_env(monkeypatch): monkeypatch.setenv("SENTRY_DSN", "None") @@ -155,44 +82,44 @@ def config_instance(): @pytest.fixture -def dspace_metadata(): +def item_submission_source_metadata(): return { - "metadata": [ - { - "key": "dc.title", - "language": "en_US", - "value": "Title", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 1", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 2", - }, - ] + "title": "Title", + "date": 2026, + "contributor": "Author 1|Author 2", + "item_identifier": "123", } @pytest.fixture -def item_metadata(): +def item_submission_dspace_metadata(): return { - "title": "Title", - "contributor": "Author 1|Author 2", "item_identifier": "123", + "dc.title": "Title", + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 1", "Author 2"], } @pytest.fixture -def item_submission_instance(dspace_metadata): +def item_submission_dspace_metadataentry(): + return { + "metadata": [ + {"key": "dc.title", "value": "Title"}, + {"key": "dc.date.issued", "value": "2026"}, + {"key": "dc.contributor.author", "value": "Author 1"}, + {"key": "dc.contributor.author", "value": "Author 2"}, + ] + } + + +@pytest.fixture +def item_submission_instance(item_submission_dspace_metadata): return ItemSubmission( batch_id="batch-aaa", item_identifier="123", workflow_name="test", - dspace_metadata=dspace_metadata, + dspace_metadata=item_submission_dspace_metadata, bitstream_s3_uris=[ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -241,13 +168,13 @@ def mocked_s3(config_instance): @pytest.fixture -def mocked_s3_simple_csv(mocked_s3, item_metadata): +def mocked_s3_simple_csv(mocked_s3, item_submission_source_metadata): # write in-memory metadata CSV file csv_buffer = StringIO() - fieldnames = item_metadata.keys() + fieldnames = item_submission_source_metadata.keys() writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames) writer.writeheader() - writer.writerows([item_metadata]) + writer.writerows([item_submission_source_metadata]) # seek to the beginning of the in-memory file before uploading csv_buffer.seek(0) @@ -284,15 +211,6 @@ def mocked_sqs_output(): yield sqs -@pytest.fixture -def opencourseware_source_metadata(): - with ( - zipfile.ZipFile("tests/fixtures/opencourseware/123.zip", "r") as zip_file, - zip_file.open("data.json") as file, - ): - return json.load(file) - - @pytest.fixture def result_message_attributes(): return { @@ -396,3 +314,26 @@ def submission_message_body(): ], } ) + + +######################### +# OpenCourseWare fixtures +######################### + + +@pytest.fixture +def opencourseware_itemsubmission_source_metadata(): + with ( + zipfile.ZipFile("tests/fixtures/opencourseware/123.zip", "r") as zip_file, + zip_file.open("data.json") as file, + ): + return json.load(file) + + +@pytest.fixture +def opencourseware_itemsubmission_dspace_metadata( + opencourseware_itemsubmission_source_metadata, +): + return OpenCourseWareTransformer.transform( + opencourseware_itemsubmission_source_metadata + ) diff --git a/tests/fixtures/workflows/__init__.py b/tests/fixtures/workflows/__init__.py new file mode 100644 index 0000000..7f4c731 --- /dev/null +++ b/tests/fixtures/workflows/__init__.py @@ -0,0 +1,7 @@ +from tests.fixtures.workflows.localtest import ( + TestOpenCourseWare, + TestSimpleCSV, + TestWorkflow, +) + +__all__ = ["TestOpenCourseWare", "TestSimpleCSV", "TestWorkflow"] diff --git a/tests/fixtures/workflows/localtest/__init__.py b/tests/fixtures/workflows/localtest/__init__.py new file mode 100644 index 0000000..e98aad8 --- /dev/null +++ b/tests/fixtures/workflows/localtest/__init__.py @@ -0,0 +1,9 @@ +"""test package.""" + +from tests.fixtures.workflows.localtest.workflow import ( + TestOpenCourseWare, + TestSimpleCSV, + TestWorkflow, +) + +__all__ = ["TestOpenCourseWare", "TestSimpleCSV", "TestWorkflow"] diff --git a/tests/fixtures/workflows/localtest/transformer.py b/tests/fixtures/workflows/localtest/transformer.py new file mode 100644 index 0000000..14c7a80 --- /dev/null +++ b/tests/fixtures/workflows/localtest/transformer.py @@ -0,0 +1,23 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class TestTransformer(Transformer): + + @classmethod + def dc_title(cls, source_metadata) -> str: + return source_metadata["title"] + + @classmethod + def dc_date_issued(cls, source_metadata: dict) -> str: + return source_metadata["date"] + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["contributor"].split("|") + + @classmethod + def optional_fields(cls) -> Iterable[str]: + """Dublin Core metadata fields.""" + return ["dc.contributor.author"] diff --git a/tests/fixtures/workflows/localtest/workflow.py b/tests/fixtures/workflows/localtest/workflow.py new file mode 100644 index 0000000..246864e --- /dev/null +++ b/tests/fixtures/workflows/localtest/workflow.py @@ -0,0 +1,92 @@ +from dsc.item_submission import ItemSubmission +from dsc.workflows.base import Workflow +from dsc.workflows.opencourseware import OpenCourseWare +from dsc.workflows.simple_csv import SimpleCSV +from tests.fixtures.workflows.localtest.transformer import TestTransformer + + +class TestWorkflow(Workflow): + + workflow_name: str = "test" + submission_system: str = "Test@MIT" + + @property + def metadata_transformer(self) -> type[TestTransformer]: + return TestTransformer + + @property + def metadata_mapping_path(self) -> str: + return "tests/fixtures/test_metadata_mapping.json" + + @property + def output_queue(self) -> str: + return "mock-output-queue" + + def get_batch_bitstream_uris(self) -> list[str]: + return [ + "s3://dsc/test/batch-aaa/123_01.pdf", + "s3://dsc/test/batch-aaa/123_02.pdf", + "s3://dsc/test/batch-aaa/789_01.pdf", + ] + + def item_metadata_iter(self): + yield from [ + { + "title": "Title", + "contributor": "Author 1|Author 2", + "item_identifier": "123", + }, + { + "title": "2nd Title", + "contributor": "Author 3|Author 4", + "item_identifier": "789", + }, + ] + + def _prepare_batch(self, *, synced: bool = False): # noqa: ARG002 + return ( + [ + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="test", + ), + ItemSubmission( + batch_id="batch-aaa", + item_identifier="789", + workflow_name="test", + ), + ], + [], + ) + + +class TestOpenCourseWare(OpenCourseWare): + + submission_system: str = "Test@MIT" + + @property + def output_queue(self) -> str: + return "mock-output-queue" + + +class TestSimpleCSV(SimpleCSV): + + workflow_name = "simple-csv" + submission_system: str = "Test@MIT" + + @property + def metadata_transformer(self) -> type[TestTransformer]: + return TestTransformer + + @property + def metadata_mapping_path(self) -> str: + return "tests/fixtures/test_metadata_mapping.json" + + @property + def item_identifier_column_names(self) -> list[str]: + return ["item_identifier", "filename"] + + @property + def output_queue(self) -> str: + return "mock-output-queue" diff --git a/tests/test_cli.py b/tests/test_cli.py index 1f1aef6..bec8cd4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -122,9 +122,25 @@ def test_submit_success( s3_client, ): caplog.set_level("DEBUG") + + # mock upload bitstreams s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_001.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_002.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_001.pdf") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -161,15 +177,7 @@ def test_submit_success( "for batch 'batch-aaa'" ) in caplog.text assert "Preparing submission for item: 123" in caplog.text - assert ( - "Metadata uploaded to S3: s3://dsc/test/batch-aaa/dspace_metadata/123_metadata.json" - in caplog.text - ) assert "Preparing submission for item: 789" in caplog.text - assert ( - "Metadata uploaded to S3: s3://dsc/test/batch-aaa/dspace_metadata/789_metadata.json" - in caplog.text - ) assert json.dumps(expected_submission_summary) in caplog.text assert "Application exiting" in caplog.text assert "Total time elapsed" in caplog.text diff --git a/tests/test_item_submission.py b/tests/test_item_submission.py index dae6e80..ecf7b93 100644 --- a/tests/test_item_submission.py +++ b/tests/test_item_submission.py @@ -7,15 +7,15 @@ from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus from dsc.exceptions import ( DSpaceMetadataUploadError, - InvalidDSpaceMetadataError, - ItemMetadatMissingRequiredFieldError, SQSMessageSendError, ) from dsc.item_submission import ItemSubmission -def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): - assert item_submission_instance.dspace_metadata == dspace_metadata +def test_itemsubmission_init_success( + item_submission_instance, item_submission_dspace_metadata +): + assert item_submission_instance.dspace_metadata == item_submission_dspace_metadata assert item_submission_instance.bitstream_s3_uris == [ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -155,65 +155,41 @@ def test_itemsubmission_ready_to_submit_with_ingest_failed(item_submission_insta assert item_submission_instance.ready_to_submit() is True -def test_itemsubmission_create_dspace_metadata_success( - item_submission_instance, item_metadata, metadata_mapping +def test_itemsubmission_create_dspace_metadataentry_success( + item_submission_instance, ): - item_metadata["topics"] = [ + item_submission_instance.dspace_metadata["dc.subject"] = [ "Topic Header - Topic Subheading - Topic Name", "Topic Header 2 - Topic Subheading 2 - Topic Name 2", ] - item_submission_instance.create_dspace_metadata(item_metadata, metadata_mapping) - assert item_submission_instance.dspace_metadata == { + assert item_submission_instance._create_dspace_metadataentry() == { # noqa: SLF001 "metadata": [ - {"key": "dc.title", "language": "en_US", "value": "Title"}, - {"key": "dc.contributor", "language": None, "value": "Author 1"}, - {"key": "dc.contributor", "language": None, "value": "Author 2"}, + {"key": "dc.title", "value": "Title"}, + {"key": "dc.date.issued", "value": "2026"}, + {"key": "dc.contributor.author", "value": "Author 1"}, + {"key": "dc.contributor.author", "value": "Author 2"}, { "key": "dc.subject", - "language": None, "value": "Topic Header - Topic Subheading - Topic Name", }, { "key": "dc.subject", - "language": None, "value": "Topic Header 2 - Topic Subheading 2 - Topic Name 2", }, ] } -def test_itemsubmission_create_dspace_metadata_required_field_missing_raises_exception( - item_submission_instance, item_metadata, metadata_mapping -): - item_metadata.pop("title") - with pytest.raises(ItemMetadatMissingRequiredFieldError): - item_submission_instance.create_dspace_metadata(item_metadata, metadata_mapping) - - -def test_itemsubmission_validate_dspace_metadata_success( - item_submission_instance, - dspace_metadata, -): - item_submission_instance.dspace_metadata = dspace_metadata - assert item_submission_instance.validate_dspace_metadata() - - -def test_itemsubmission_validate_dspace_metadata_invalid_raises_exception( - item_submission_instance, -): - item_submission_instance.dspace_metadata = {} - with pytest.raises(InvalidDSpaceMetadataError): - item_submission_instance.validate_dspace_metadata() - - def test_itemsubmission_upload_dspace_metadata_success( mocked_s3, item_submission_instance, s3_client ): - item_submission_instance.upload_dspace_metadata("dsc", "workflow/folder/") assert ( - item_submission_instance.metadata_s3_uri + item_submission_instance._upload_dspace_metadata( # noqa: SLF001 + "dsc", "workflow/folder/" + ) == "s3://dsc/workflow/folder/dspace_metadata/123_metadata.json" ) + response = s3_client.client.get_object( Bucket="dsc", Key="workflow/folder/dspace_metadata/123_metadata.json" ) @@ -236,7 +212,7 @@ def test_itemsubmission_upload_dspace_metadata_raises_custom_exception( with pytest.raises( DSpaceMetadataUploadError, match="The specified bucket does not exist" ): - item_submission_instance.upload_dspace_metadata( + item_submission_instance._upload_dspace_metadata( # noqa: SLF001 bucket="dsc", prefix="test/batch-bbb/" ) diff --git a/tests/test_transformer_opencourseware.py b/tests/test_transformer_opencourseware.py index 8992f40..fabec28 100644 --- a/tests/test_transformer_opencourseware.py +++ b/tests/test_transformer_opencourseware.py @@ -1,8 +1,10 @@ from dsc.workflows.opencourseware import OpenCourseWareTransformer -def test_opencourseware_transform_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.transform(opencourseware_source_metadata) == { +def test_opencourseware_transform_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.transform( + opencourseware_itemsubmission_source_metadata + ) == { "dc.title": "14.02 Principles of Macroeconomics, Fall 2004", "dc.date.issued": "2004", "dc.description.abstract": ( @@ -36,49 +38,60 @@ def test_opencourseware_transform_success(opencourseware_source_metadata): } -def test_opencourseware_dc_title_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.dc_title(opencourseware_source_metadata) == ( - "14.02 Principles of Macroeconomics, Fall 2004" - ) +def test_opencourseware_dc_title_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.dc_title( + opencourseware_itemsubmission_source_metadata + ) == ("14.02 Principles of Macroeconomics, Fall 2004") def test_opencourseware_dc_title_if_multi_extra_course_numbers_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): - opencourseware_source_metadata["extra_course_numbers"] = "14.027J,14.006" - - assert OpenCourseWareTransformer.dc_title(opencourseware_source_metadata) == ( - "14.02 / 14.027J / 14.006 Principles of Macroeconomics, Fall 2004" + opencourseware_itemsubmission_source_metadata["extra_course_numbers"] = ( + "14.027J,14.006" ) + assert OpenCourseWareTransformer.dc_title( + opencourseware_itemsubmission_source_metadata + ) == ("14.02 / 14.027J / 14.006 Principles of Macroeconomics, Fall 2004") + -def test_opencourseware_dc_date_issued_success(opencourseware_source_metadata): +def test_opencourseware_dc_date_issued_success( + opencourseware_itemsubmission_source_metadata, +): assert ( - OpenCourseWareTransformer.dc_date_issued(opencourseware_source_metadata) == "2004" + OpenCourseWareTransformer.dc_date_issued( + opencourseware_itemsubmission_source_metadata + ) + == "2004" ) -def test_opencourseware_dc_description_abstract(opencourseware_source_metadata): +def test_opencourseware_dc_description_abstract( + opencourseware_itemsubmission_source_metadata, +): assert isinstance( - OpenCourseWareTransformer.dc_description_abstract(opencourseware_source_metadata), + OpenCourseWareTransformer.dc_description_abstract( + opencourseware_itemsubmission_source_metadata + ), str, ) def test_opencourseware_dc_contributor_author_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): assert OpenCourseWareTransformer.dc_contributor_author( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Caballero, Ricardo"] def test_opencourseware_dc_contributor_author_if_any_names_empty_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): # the first four entries in the list below result in an empty name ("") # only the last entry is included - opencourseware_source_metadata["instructors"].extend( + opencourseware_itemsubmission_source_metadata["instructors"].extend( [ {}, # all fields missing {"middle_initial": "E."}, # all required fields missing @@ -88,49 +101,61 @@ def test_opencourseware_dc_contributor_author_if_any_names_empty_success( ] ) assert OpenCourseWareTransformer.dc_contributor_author( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == [ "Caballero, Ricardo", "Burger, Cheese E.", ] -def test_opencourseware_dc_contributor_department_success(opencourseware_source_metadata): - assert opencourseware_source_metadata["department_numbers"] == ["14"] +def test_opencourseware_dc_contributor_department_success( + opencourseware_itemsubmission_source_metadata, +): + assert opencourseware_itemsubmission_source_metadata["department_numbers"] == ["14"] assert OpenCourseWareTransformer.dc_contributor_department( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Massachusetts Institute of Technology. Department of Economics"] def test_opencourseware_creativework_learningresourcetype_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): assert OpenCourseWareTransformer.creativework_learningresourcetype( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Problem Sets with Solutions", "Exams with Solutions", "Lecture Notes"] -def test_opencourseware_dc_subject_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.dc_subject(opencourseware_source_metadata) == [ +def test_opencourseware_dc_subject_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.dc_subject( + opencourseware_itemsubmission_source_metadata + ) == [ "Social Science - Economics - International Economics", "Social Science - Economics - Macroeconomics", ] -def test_opencourseware_dc_identifier_other_success(opencourseware_source_metadata): +def test_opencourseware_dc_identifier_other_success( + opencourseware_itemsubmission_source_metadata, +): assert OpenCourseWareTransformer.dc_identifier_other( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["14.02", "14.02-Fall2004"] -def test_opencourseware_dc_coverage_temporal_success(opencourseware_source_metadata): +def test_opencourseware_dc_coverage_temporal_success( + opencourseware_itemsubmission_source_metadata, +): assert ( - OpenCourseWareTransformer.dc_coverage_temporal(opencourseware_source_metadata) + OpenCourseWareTransformer.dc_coverage_temporal( + opencourseware_itemsubmission_source_metadata + ) == "Fall 2004" ) -def test_opencourseware_dc_audience_educationlevel(opencourseware_source_metadata): +def test_opencourseware_dc_audience_educationlevel( + opencourseware_itemsubmission_source_metadata, +): assert OpenCourseWareTransformer.dc_audience_educationlevel( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Undergraduate"] diff --git a/tests/test_workflow_base.py b/tests/test_workflow_base.py index 36c8fa8..a9f7eae 100644 --- a/tests/test_workflow_base.py +++ b/tests/test_workflow_base.py @@ -3,13 +3,10 @@ from unittest.mock import patch import pytest -from botocore.exceptions import ClientError from freezegun import freeze_time from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus -from dsc.exceptions import ( - InvalidWorkflowNameError, -) +from dsc.exceptions import InvalidWorkflowNameError, SQSMessageSendError from dsc.reports import FinalizeReport from dsc.workflows.base import Workflow @@ -19,10 +16,6 @@ def test_base_workflow_init_with_defaults_success(): workflow_instance = workflow_class(batch_id="batch-aaa") assert workflow_instance.workflow_name == "test" assert workflow_instance.submission_system == "Test@MIT" - assert ( - workflow_instance.metadata_mapping_path - == "tests/fixtures/test_metadata_mapping.json" - ) assert workflow_instance.batch_id == "batch-aaa" assert workflow_instance.s3_bucket == "dsc" assert workflow_instance.output_queue == "mock-output-queue" @@ -62,9 +55,25 @@ def test_base_workflow_submit_items_success( mocked_item_submission_db, ): caplog.set_level("DEBUG") + + # mock upload bitstreams s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.pdf") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -77,6 +86,7 @@ def test_base_workflow_submit_items_success( workflow_name="test", status=ItemSubmissionStatus.BATCH_CREATED, ).create() + items = base_workflow_instance.submit_items(collection_handle="123.4/5678") expected_submission_summary = {"total": 2, "submitted": 2, "skipped": 0, "errors": 0} @@ -85,15 +95,30 @@ def test_base_workflow_submit_items_success( assert json.dumps(expected_submission_summary) in caplog.text -def test_base_workflow_submit_items_failed_ready_to_submit_is_skipped( +def test_base_workflow_submit_items_skips_ingested_item( caplog, base_workflow_instance, + s3_client, mocked_s3, mocked_sqs_input, mocked_sqs_output, mocked_item_submission_db, ): caplog.set_level("DEBUG") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -123,6 +148,7 @@ def test_base_workflow_submit_items_exceptions_handled( mocked_method, caplog, base_workflow_instance, + s3_client, mocked_s3, mocked_sqs_input, mocked_sqs_output, @@ -130,17 +156,23 @@ def test_base_workflow_submit_items_exceptions_handled( ): side_effect = [ {"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}}, - ClientError( - { - "Error": { - "Code": "InvalidParameterValue", - "Message": "The specified S3 bucket does not exist.", - } - }, - "SendMessage", - ), + SQSMessageSendError, ] mocked_method.side_effect = side_effect + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", diff --git a/tests/test_workflow_opencourseware.py b/tests/test_workflow_opencourseware.py index ce27046..de69e22 100644 --- a/tests/test_workflow_opencourseware.py +++ b/tests/test_workflow_opencourseware.py @@ -9,11 +9,12 @@ @patch( "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" ) -def test_workflow_ocw_metadata_mapping_dspace_metadata_success( +def test_workflow_ocw_prepare_batch_success( mock_opencourseware_read_metadata_from_zip_file, - caplog, + mocked_item_submission_db, mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, + opencourseware_itemsubmission_dspace_metadata, opencourseware_workflow_instance, s3_client, ): @@ -23,96 +24,31 @@ def test_workflow_ocw_metadata_mapping_dspace_metadata_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) - item_submission = ItemSubmission( - batch_id="aaa", item_identifier="123", workflow_name="opencourseware" - ) - item_submission.create_dspace_metadata( - item_metadata=next(opencourseware_workflow_instance.item_metadata_iter()), - metadata_mapping=opencourseware_workflow_instance.metadata_mapping, + assert opencourseware_workflow_instance._prepare_batch() == ( + [ + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="opencourseware", + dspace_metadata=opencourseware_itemsubmission_dspace_metadata, + ) + ], + [], ) - assert item_submission.dspace_metadata["metadata"] == [ - { - "key": "dc.title", - "value": "14.02 Principles of Macroeconomics, Fall 2004", - "language": None, - }, - {"key": "dc.date.issued", "value": "2004", "language": None}, - { - "key": "dc.description.abstract", - "value": ( - "This course provides an overview of the following macroeconomic " - "issues: the determination of output, employment, unemployment, " - "interest rates, and inflation. Monetary and fiscal policies are " - "discussed, as are public debt and international economic issues. " - "This course also introduces basic models of macroeconomics and " - "illustrates principles with the experience of the United States " - "and other economies.\n" - ), - "language": None, - }, - {"key": "dc.contributor.author", "value": "Caballero, Ricardo", "language": None}, - { - "key": "dc.contributor.department", - "value": "Massachusetts Institute of Technology. Department of Economics", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Problem Sets with Solutions", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Exams with Solutions", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Lecture Notes", - "language": None, - }, - { - "key": "dc.subject", - "value": "Social Science - Economics - International Economics", - "language": None, - }, - { - "key": "dc.subject", - "value": "Social Science - Economics - Macroeconomics", - "language": None, - }, - {"key": "dc.identifier.other", "value": "14.02", "language": None}, - {"key": "dc.identifier.other", "value": "14.02-Fall2004", "language": None}, - {"key": "dc.coverage.temporal", "value": "Fall 2004", "language": None}, - {"key": "dc.audience.educationlevel", "value": "Undergraduate", "language": None}, - {"key": "dc.type", "value": "Learning Object", "language": None}, - { - "key": "dc.rights", - "value": "Attribution-NonCommercial-NoDerivs 4.0 United States", - "language": None, - }, - { - "key": "dc.rights.uri", - "value": "https://creativecommons.org/licenses/by-nc-nd/4.0/deed.en", - "language": None, - }, - {"key": "dc.language.iso", "value": "en_US", "language": None}, - ] - @patch( "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" ) -def test_workflow_ocw_prepare_batch_success( +def test_workflow_ocw_itemsubmission_create_dspace_metadataentry_success( mock_opencourseware_read_metadata_from_zip_file, - mocked_item_submission_db, + caplog, mocked_s3, - opencourseware_source_metadata, - opencourseware_workflow_instance, + opencourseware_itemsubmission_source_metadata, + opencourseware_itemsubmission_dspace_metadata, s3_client, ): s3_client.put_file( @@ -121,20 +57,83 @@ def test_workflow_ocw_prepare_batch_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) - assert opencourseware_workflow_instance._prepare_batch() == ( - [ - ItemSubmission( - batch_id="batch-aaa", - item_identifier="123", - workflow_name="opencourseware", - ) - ], - [], + item_submission = ItemSubmission( + batch_id="aaa", + item_identifier="123", + workflow_name="opencourseware", + dspace_metadata=opencourseware_itemsubmission_dspace_metadata, ) + assert item_submission._create_dspace_metadataentry() == { + "metadata": [ + { + "key": "dc.title", + "value": "14.02 Principles of Macroeconomics, Fall 2004", + }, + {"key": "dc.date.issued", "value": "2004"}, + { + "key": "dc.description.abstract", + "value": ( + "This course provides an overview of the following macroeconomic " + "issues: the determination of output, employment, unemployment, " + "interest rates, and inflation. Monetary and fiscal policies are " + "discussed, as are public debt and international economic issues. " + "This course also introduces basic models of macroeconomics and " + "illustrates principles with the experience of the United States " + "and other economies.\n" + ), + }, + { + "key": "dc.contributor.author", + "value": "Caballero, Ricardo", + }, + { + "key": "dc.contributor.department", + "value": "Massachusetts Institute of Technology. Department of Economics", + }, + { + "key": "creativework.learningresourcetype", + "value": "Problem Sets with Solutions", + }, + { + "key": "creativework.learningresourcetype", + "value": "Exams with Solutions", + }, + { + "key": "creativework.learningresourcetype", + "value": "Lecture Notes", + }, + { + "key": "dc.subject", + "value": "Social Science - Economics - International Economics", + }, + { + "key": "dc.subject", + "value": "Social Science - Economics - Macroeconomics", + }, + {"key": "dc.identifier.other", "value": "14.02"}, + {"key": "dc.identifier.other", "value": "14.02-Fall2004"}, + {"key": "dc.coverage.temporal", "value": "Fall 2004"}, + { + "key": "dc.audience.educationlevel", + "value": "Undergraduate", + }, + {"key": "dc.type", "value": "Learning Object"}, + { + "key": "dc.rights", + "value": "Attribution-NonCommercial-NoDerivs 4.0 United States", + }, + { + "key": "dc.rights.uri", + "value": "https://creativecommons.org/licenses/by-nc-nd/4.0/deed.en", + }, + {"key": "dc.language.iso", "value": "en_US"}, + ] + } + @patch( "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" @@ -143,7 +142,7 @@ def test_workflow_ocw_item_metadata_iter_success( mock_opencourseware_read_metadata_from_zip_file, caplog, mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, opencourseware_workflow_instance, s3_client, ): @@ -153,7 +152,7 @@ def test_workflow_ocw_item_metadata_iter_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) assert next(opencourseware_workflow_instance.item_metadata_iter()) == { "item_identifier": "123", @@ -192,7 +191,7 @@ def test_workflow_ocw_item_metadata_iter_success( def test_workflow_ocw_read_metadata_from_zip_file_success( mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, opencourseware_workflow_instance, ): """Read source metadata JSON file from test zip file. @@ -211,7 +210,7 @@ def test_workflow_ocw_read_metadata_from_zip_file_success( opencourseware_workflow_instance._read_metadata_from_zip_file( "s3://dsc/opencourseware/batch-aaa/123.zip" ) - == opencourseware_source_metadata + == opencourseware_itemsubmission_source_metadata ) diff --git a/tests/test_workflow_simple_csv.py b/tests/test_workflow_simple_csv.py index 3b471c7..291b23e 100644 --- a/tests/test_workflow_simple_csv.py +++ b/tests/test_workflow_simple_csv.py @@ -26,6 +26,14 @@ def test_workflow_simple_csv_prepare_batch_success( batch_id="batch-aaa", item_identifier="123", workflow_name="simple-csv", + dspace_metadata={ + "dc.contributor.author": [ + "Author 1", + "Author 2", + ], + "dc.date.issued": "2026", + "dc.title": "Title", + }, ) ], [], @@ -43,19 +51,24 @@ def test_workflow_simple_csv_prepare_batch_track_errors( def test_workflow_simple_csv_item_metadata_iter_success( - simple_csv_workflow_instance, mocked_s3_simple_csv, item_metadata + simple_csv_workflow_instance, mocked_s3_simple_csv, item_submission_dspace_metadata ): metadata_iter = simple_csv_workflow_instance.item_metadata_iter( metadata_file="metadata.csv" ) - assert next(metadata_iter) == item_metadata + assert next(metadata_iter) == item_submission_dspace_metadata def test_workflow_simple_csv_item_metadata_iter_processing_success( simple_csv_workflow_instance, mocked_s3 ): metadata_df = pd.DataFrame( - {"filename": ["123.pdf", "456.pdf"], "TITLE": ["Cheeses of the World", ""]} + { + "filename": ["123.pdf", "456.pdf"], + "TITLE": ["Cheeses of the World", ""], + "date": [2026, 2026], + "contributor": ["Author 1", "Author 2"], + } ) # upload to mocked S3 bucket @@ -71,8 +84,18 @@ def test_workflow_simple_csv_item_metadata_iter_processing_success( metadata_file="metadata.csv" ) assert list(metadata_iter) == [ - {"item_identifier": "123.pdf", "title": "Cheeses of the World"}, - {"item_identifier": "456.pdf", "title": None}, + { + "item_identifier": "123.pdf", + "dc.title": "Cheeses of the World", + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 1"], + }, + { + "item_identifier": "456.pdf", + "dc.title": None, + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 2"], + }, ] From 6c94f739194fc940a8ebe8db7ddc824b4a2a864c Mon Sep 17 00:00:00 2001 From: Jonavelle Cuerdo Date: Thu, 19 Feb 2026 08:47:22 -0500 Subject: [PATCH 8/8] Deprecate use of metadata mapping JSON --- .../archivesspace/metadata_mapping.json | 20 ------ dsc/workflows/archivesspace/workflow.py | 4 -- dsc/workflows/base/workflow.py | 10 --- .../opencourseware/metadata_mapping.json | 46 ------------- dsc/workflows/opencourseware/workflow.py | 4 -- dsc/workflows/sccs/metadata_mapping.json | 67 ------------------- dsc/workflows/sccs/workflow.py | 4 -- tests/conftest.py | 10 +-- tests/fixtures/test_metadata_mapping.json | 14 ---- .../fixtures/workflows/localtest/workflow.py | 8 --- 10 files changed, 2 insertions(+), 185 deletions(-) delete mode 100644 dsc/workflows/archivesspace/metadata_mapping.json delete mode 100644 dsc/workflows/opencourseware/metadata_mapping.json delete mode 100644 dsc/workflows/sccs/metadata_mapping.json delete mode 100644 tests/fixtures/test_metadata_mapping.json diff --git a/dsc/workflows/archivesspace/metadata_mapping.json b/dsc/workflows/archivesspace/metadata_mapping.json deleted file mode 100644 index 32bfa7b..0000000 --- a/dsc/workflows/archivesspace/metadata_mapping.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "dc.title": { - "source_field_name": "title", - }, - "dc.contributor.author": { - "source_field_name": "author", - "delimiter": "|" - }, - "dc.description": { - "source_field_name": "description", - "language": "en_US" - }, - "dc.rights": { - "source_field_name": "rights_statement", - "language": "en_US" - }, - "dc.rights.uri": { - "source_field_name": "rights_uri" - } -} \ No newline at end of file diff --git a/dsc/workflows/archivesspace/workflow.py b/dsc/workflows/archivesspace/workflow.py index a5fa279..dd0bf1d 100644 --- a/dsc/workflows/archivesspace/workflow.py +++ b/dsc/workflows/archivesspace/workflow.py @@ -28,10 +28,6 @@ class ArchivesSpace(SimpleCSV): def metadata_transformer(self) -> type[ArchivesSpaceTransformer]: return ArchivesSpaceTransformer - @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/archivesspace/metadata_mapping.json" - @property def output_path(self) -> str: return "output-bucket" diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index debc821..df89bbc 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -138,16 +138,6 @@ def __init__(self, batch_id: str) -> None: def metadata_transformer(self) -> type[Transformer]: """Transformer for source metadata.""" - @property - @abstractmethod - def metadata_mapping_path(self) -> str: - """Path to the JSON metadata mapping file for the workflow.""" - - @property - def metadata_mapping(self) -> dict: - with open(self.metadata_mapping_path) as mapping_file: - return json.load(mapping_file) - @final @property def s3_bucket(self) -> str: diff --git a/dsc/workflows/opencourseware/metadata_mapping.json b/dsc/workflows/opencourseware/metadata_mapping.json deleted file mode 100644 index 7908b8c..0000000 --- a/dsc/workflows/opencourseware/metadata_mapping.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "dc.title": { - "source_field_name": "dc.title", - "required": true - }, - "dc.date.issued": { - "source_field_name": "dc.date.issued", - "required": true - }, - "dc.description.abstract": { - "source_field_name": "dc.description.abstract" - }, - "dc.contributor.author": { - "source_field_name": "dc.contributor.author" - }, - "dc.contributor.department": { - "source_field_name": "dc.contributor.department" - }, - "creativework.learningresourcetype": { - "source_field_name": "creativework.learningresourcetype" - }, - "dc.subject": { - "source_field_name": "dc.subject" - }, - "dc.identifier.other": { - "source_field_name": "dc.identifier.other" - }, - "dc.coverage.temporal": { - "source_field_name": "dc.coverage.temporal" - }, - "dc.audience.educationlevel": { - "source_field_name": "dc.audience.educationlevel" - }, - "dc.type": { - "source_field_name": "dc.type" - }, - "dc.rights": { - "source_field_name": "dc.rights" - }, - "dc.rights.uri": { - "source_field_name": "dc.rights.uri" - }, - "dc.language.iso": { - "source_field_name": "dc.language.iso" - } -} \ No newline at end of file diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py index 4eda4a0..5fd0d4c 100644 --- a/dsc/workflows/opencourseware/workflow.py +++ b/dsc/workflows/opencourseware/workflow.py @@ -29,10 +29,6 @@ class OpenCourseWare(Workflow): def metadata_transformer(self) -> type[OpenCourseWareTransformer]: return OpenCourseWareTransformer - @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/opencourseware/metadata_mapping.json" - def get_batch_bitstream_uris(self) -> list[str]: """Get list of URIs for all zipfiles within the batch folder.""" s3_client = S3Client() diff --git a/dsc/workflows/sccs/metadata_mapping.json b/dsc/workflows/sccs/metadata_mapping.json deleted file mode 100644 index 1a44fc2..0000000 --- a/dsc/workflows/sccs/metadata_mapping.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "dc.title": { - "source_field_name": "dc.title", - "language": "en_US", - "required": true - }, - "dc.publisher": { - "source_field_name": "dc.publisher", - "language": "en_US" - }, - "dc.identifier.mitlicense": { - "source_field_name": "dc.identifier.mitlicense", - "language": "en_US" - }, - "dc.eprint.version": { - "source_field_name": "dc.eprint.version", - "language": "en_US" - }, - "dc.type": { - "source_field_name": "dc.type", - "language": "en_US" - }, - "dc.type.uri": { - "source_field_name": "dc.type.uri" - }, - "dc.source": { - "source_field_name": "dc.source", - "language": "en_US" - }, - "dc.contributor.author": { - "source_field_name": "dc.contributor.author", - "language": "en_US", - "delimiter": "|" - }, - "dc.contributor.department": { - "source_field_name": "dc.contributor.department" - }, - "dc.relation.isversionof": { - "source_field_name": "dc.relation.isversionof" - }, - "dc.relation.journal": { - "source_field_name": "dc.relation.journal" - }, - "dc.identifier.issn": { - "source_field_name": "dc.identifier.issn" - }, - "dc.date.issued": { - "source_field_name": "dc.date.issued" - }, - "dc.date.submitted": { - "source_field_name": "dc.date.submitted" - }, - "dc.rights": { - "source_field_name": "dc.rights", - "language": "en_US" - }, - "dc.rights.uri": { - "source_field_name": "dc.rights.uri" - }, - "dc.description": { - "source_field_name": "dc.description" - }, - "dc.description.sponsorship": { - "source_field_name": "dc.description.sponsorship", - "language": "en_US" - } -} \ No newline at end of file diff --git a/dsc/workflows/sccs/workflow.py b/dsc/workflows/sccs/workflow.py index 2e1540b..7ab4780 100644 --- a/dsc/workflows/sccs/workflow.py +++ b/dsc/workflows/sccs/workflow.py @@ -17,10 +17,6 @@ def metadata_transformer(self) -> type[SCCSTransformer]: """Transformer for source metadata.""" return SCCSTransformer - @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/sccs/metadata_mapping.json" - @property def item_identifier_column_names(self) -> list[str]: return ["item_identifier", "filename"] diff --git a/tests/conftest.py b/tests/conftest.py index 8ffd83a..ea5f47b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,12 +29,12 @@ @pytest.fixture @freeze_time("2025-01-01 09:00:00") -def base_workflow_instance(metadata_mapping, mocked_s3): +def base_workflow_instance(mocked_s3): return TestWorkflow(batch_id="batch-aaa") @pytest.fixture -def simple_csv_workflow_instance(metadata_mapping): +def simple_csv_workflow_instance(): return TestSimpleCSV(batch_id="batch-aaa") @@ -127,12 +127,6 @@ def item_submission_instance(item_submission_dspace_metadata): ) -@pytest.fixture -def metadata_mapping(): - with open("tests/fixtures/test_metadata_mapping.json") as mapping_file: - return json.load(mapping_file) - - @pytest.fixture def mocked_item_submission_db(config_instance): with mock_aws(): diff --git a/tests/fixtures/test_metadata_mapping.json b/tests/fixtures/test_metadata_mapping.json deleted file mode 100644 index 303a38d..0000000 --- a/tests/fixtures/test_metadata_mapping.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "dc.title": { - "source_field_name": "title", - "language": "en_US", - "required": true - }, - "dc.contributor": { - "source_field_name": "contributor", - "delimiter": "|" - }, - "dc.subject": { - "source_field_name": "topics" - } -} \ No newline at end of file diff --git a/tests/fixtures/workflows/localtest/workflow.py b/tests/fixtures/workflows/localtest/workflow.py index 246864e..afc7518 100644 --- a/tests/fixtures/workflows/localtest/workflow.py +++ b/tests/fixtures/workflows/localtest/workflow.py @@ -14,10 +14,6 @@ class TestWorkflow(Workflow): def metadata_transformer(self) -> type[TestTransformer]: return TestTransformer - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - @property def output_queue(self) -> str: return "mock-output-queue" @@ -79,10 +75,6 @@ class TestSimpleCSV(SimpleCSV): def metadata_transformer(self) -> type[TestTransformer]: return TestTransformer - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - @property def item_identifier_column_names(self) -> list[str]: return ["item_identifier", "filename"]