diff --git a/dsc/exceptions.py b/dsc/exceptions.py index 0c63a26..45528bf 100644 --- a/dsc/exceptions.py +++ b/dsc/exceptions.py @@ -14,7 +14,7 @@ class InvalidWorkflowNameError(Exception): pass -class ItemMetadatMissingRequiredFieldError(Exception): +class ItemMetadataMissingRequiredFieldError(Exception): pass diff --git a/dsc/item_submission.py b/dsc/item_submission.py index e6613f2..d9c54e0 100644 --- a/dsc/item_submission.py +++ b/dsc/item_submission.py @@ -2,6 +2,7 @@ import json import logging +from collections import defaultdict from dataclasses import dataclass, fields from typing import TYPE_CHECKING, Any @@ -12,8 +13,7 @@ from dsc.db.models import ITEM_SUBMISSION_LOG_STR, ItemSubmissionDB, ItemSubmissionStatus from dsc.exceptions import ( DSpaceMetadataUploadError, - InvalidDSpaceMetadataError, - ItemMetadatMissingRequiredFieldError, + ItemMetadataMissingRequiredFieldError, SQSMessageSendError, ) from dsc.utilities.aws.s3 import S3Client @@ -176,9 +176,9 @@ def save(self) -> None: ) item_submission_db.create() - logger.info(f"Saved record " f"{ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=self.item_identifier - )}") + logger.info(f"Saved record {ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=self.item_identifier + )}") def upsert_db(self) -> None: """Upsert a record in DynamoDB from ItemSubmission. @@ -195,9 +195,9 @@ def upsert_db(self) -> None: ) item_submission_db.save() - logger.info(f"Upserted record " f"{ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=self.item_identifier - )}") + logger.info(f"Upserted record {ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=self.item_identifier + )}") def ready_to_submit(self) -> bool: """Check if the item submission is ready to be submitted.""" @@ -209,32 +209,31 @@ def ready_to_submit(self) -> bool: case ItemSubmissionStatus.INGEST_SUCCESS: logger.info( f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "already ingested, skipping submission" + item_identifier=self.item_identifier)} " + "already ingested, skipping submission" ) case ItemSubmissionStatus.SUBMIT_SUCCESS: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " " already submitted, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + " already submitted, skipping submission" ) case ItemSubmissionStatus.MAX_RETRIES_REACHED: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "max retries reached, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + "max retries reached, skipping submission" ) case None: logger.info( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " " status unknown, skipping submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} " + "status unknown, skipping submission" ) case _: logger.debug( - f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, - item_identifier=self.item_identifier) - } " "allowed for submission" + f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id, + item_identifier=self.item_identifier)} allowed for submission" ) ready_to_submit = True @@ -248,93 +247,48 @@ def _exceeded_retry_threshold(self) -> None: if self.ingest_attempts > CONFIG.retry_threshold: self.status = ItemSubmissionStatus.MAX_RETRIES_REACHED - def prepare_dspace_metadata( - self, metadata_mapping: dict, item_metadata: dict, s3_bucket: str, batch_path: str - ) -> None: + def prepare_dspace_metadata(self, s3_bucket: str, batch_path: str) -> str: """Prepare DSpace metadata for the item submission.""" - self.create_dspace_metadata( - item_metadata=item_metadata, - metadata_mapping=metadata_mapping, - ) - self.validate_dspace_metadata() - self.upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path) - - def create_dspace_metadata( - self, item_metadata: dict[str, Any], metadata_mapping: dict - ) -> None: - """Create DSpace metadata from the item's source metadata. - - A metadata mapping is a dict with the format seen below: - - { - "dc.contributor": { - "source_field_name": "contributor", - "language": "", - "delimiter": "", - "required": true | false - } - } - - When setting up the metadata mapping JSON file, "language" and "delimiter" - can be omitted from the file if not applicable. Required fields ("item_identifier" - and "title") must be set as required (true); if "required" is not listed as a - a config, the field defaults as not required (false). + self.dspace_metadata = self._create_dspace_metadataentry() + return self._upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path) - Args: - item_metadata: Item metadata from which the DSpace metadata will be derived. - metadata_mapping: A mapping of DSpace metadata fields to source metadata - fields. - """ - metadata_entries = [] - for field_name, field_mapping in metadata_mapping.items(): - if field_name not in ["item_identifier"]: - - field_value = item_metadata.get(field_mapping["source_field_name"]) - if not field_value and field_mapping.get("required", False): - raise ItemMetadatMissingRequiredFieldError( - "Item metadata missing required field: '" - f"{field_mapping["source_field_name"]}'" - ) - - if field_value: - if isinstance(field_value, list): - field_values = field_value - elif delimiter := field_mapping.get("delimiter"): - field_values = field_value.split(delimiter) - else: - field_values = [field_value] - - metadata_entries.extend( - [ - { - "key": field_name, - "value": value, - "language": field_mapping.get("language"), - } - for value in field_values - ] - ) - self.dspace_metadata = {"metadata": metadata_entries} - - def validate_dspace_metadata(self) -> bool: - """Validate that DSpace metadata follows the expected format for DSpace 6.x. + def _create_dspace_metadataentry(self) -> None | dict: + """Format transformed metadata for DSpace REST API. - Args: - dspace_metadata: DSpace metadata to be validated. + This method follows the specs for a 'MetadataEntry Object' for + the DSpace 6 REST API: + https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-MetadataEntryO """ - valid = False - if self.dspace_metadata and self.dspace_metadata.get("metadata") is not None: - for element in self.dspace_metadata["metadata"]: - if element.get("key") is not None and element.get("value") is not None: - valid = True - logger.debug("Valid DSpace metadata created") - else: - raise InvalidDSpaceMetadataError( - f"Invalid DSpace metadata created: {self.dspace_metadata} ", - ) - return valid + if not self.dspace_metadata: + return None + + metadataentry = defaultdict(list) + + for field, value in self.dspace_metadata.items(): + if field == "item_identifier": + continue + + if not value and field in ["dc.title", "dc.date.issued"]: + raise ItemMetadataMissingRequiredFieldError( + f"Item metadata missing required field: {field}" + ) + + if value: + value_list = value if isinstance(value, list) else [value] + + metadataentry["metadata"].extend( + [ + { + "key": field, + "value": _value, + } + for _value in value_list + ] + ) + + return dict(metadataentry) - def upload_dspace_metadata(self, bucket: str, prefix: str) -> None: + def _upload_dspace_metadata(self, bucket: str, prefix: str) -> str: """Upload DSpace metadata to S3 using the specified bucket and keyname. Args: @@ -361,7 +315,7 @@ def upload_dspace_metadata(self, bucket: str, prefix: str) -> None: metadata_s3_uri = f"s3://{bucket}/{metadata_s3_key}" logger.info(f"Metadata uploaded to S3: {metadata_s3_uri}") - self.metadata_s3_uri = metadata_s3_uri + return metadata_s3_uri def send_submission_message( self, diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py index b8749f9..da981db 100644 --- a/dsc/workflows/__init__.py +++ b/dsc/workflows/__init__.py @@ -5,8 +5,8 @@ from dsc.workflows.archivesspace import ArchivesSpace from dsc.workflows.base import Workflow -from dsc.workflows.base.simple_csv import SimpleCSV from dsc.workflows.opencourseware import OpenCourseWare from dsc.workflows.sccs import SCCS +from dsc.workflows.simple_csv import SimpleCSV __all__ = ["SCCS", "ArchivesSpace", "OpenCourseWare", "SimpleCSV", "Workflow"] diff --git a/dsc/workflows/archivesspace/__init__.py b/dsc/workflows/archivesspace/__init__.py new file mode 100644 index 0000000..16ef590 --- /dev/null +++ b/dsc/workflows/archivesspace/__init__.py @@ -0,0 +1,4 @@ +from dsc.workflows.archivesspace.transformer import ArchivesSpaceTransformer +from dsc.workflows.archivesspace.workflow import ArchivesSpace + +__all__ = ["ArchivesSpace", "ArchivesSpaceTransformer"] diff --git a/dsc/workflows/archivesspace/transformer.py b/dsc/workflows/archivesspace/transformer.py new file mode 100644 index 0000000..03977e3 --- /dev/null +++ b/dsc/workflows/archivesspace/transformer.py @@ -0,0 +1,20 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class ArchivesSpaceTransformer(Transformer): + """Transformer for OpenCourseWare (OCW) source metadata.""" + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["author"].split("|") + + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + "dc.contributor.author", + "dc.description", + "dc.rights", + "dc.rights.uri", + ] diff --git a/dsc/workflows/archivesspace.py b/dsc/workflows/archivesspace/workflow.py similarity index 92% rename from dsc/workflows/archivesspace.py rename to dsc/workflows/archivesspace/workflow.py index 07126a8..dd0bf1d 100644 --- a/dsc/workflows/archivesspace.py +++ b/dsc/workflows/archivesspace/workflow.py @@ -5,7 +5,8 @@ from dsc.db.models import ItemSubmissionStatus from dsc.item_submission import ItemSubmission -from dsc.workflows.base.simple_csv import SimpleCSV +from dsc.workflows.archivesspace import ArchivesSpaceTransformer +from dsc.workflows.simple_csv import SimpleCSV logger = logging.getLogger(__name__) @@ -24,8 +25,8 @@ class ArchivesSpace(SimpleCSV): submission_system: str = "Dome" @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/archivesspace.json" + def metadata_transformer(self) -> type[ArchivesSpaceTransformer]: + return ArchivesSpaceTransformer @property def output_path(self) -> str: diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index 4346d24..5254cbf 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -1,443 +1,4 @@ -from __future__ import annotations +from dsc.workflows.base.transformer import Transformer +from dsc.workflows.base.workflow import Workflow -import json -import logging -from abc import ABC, abstractmethod -from dataclasses import dataclass -from datetime import UTC, datetime -from typing import TYPE_CHECKING, Any, final - -import jsonschema -import jsonschema.exceptions - -from dsc.config import Config -from dsc.db.models import ItemSubmissionStatus -from dsc.exceptions import ( - BatchCreationFailedError, - InvalidSQSMessageError, - InvalidWorkflowNameError, -) -from dsc.item_submission import ItemSubmission -from dsc.reports import Report -from dsc.utilities.aws import SESClient, SQSClient -from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY - -if TYPE_CHECKING: # pragma: no cover - from collections.abc import Iterator - - from mypy_boto3_sqs.type_defs import MessageTypeDef - - from dsc.reports import Report - -logger = logging.getLogger(__name__) -CONFIG = Config() - -ITEM_SUBMISSION_LOG_STR = ( - "with primary keys batch_id={batch_id} (hash key) and " - "item_identifier={item_identifier} (range key)" -) - - -@dataclass -class DSSResultMessage: - """Represents a parsed DSpace Submission Service result message.""" - - item_identifier: str - submission_source: str - result_type: str - dspace_handle: str | None - last_modified: str | None - error_info: str | None - error_timestamp: str | None - dspace_response: str | None - exception_traceback: list[str] | None - message_id: str - receipt_handle: str - raw_message: MessageTypeDef - - @classmethod - def from_result_message(cls, message: MessageTypeDef) -> DSSResultMessage: - """Create instance from result message. - - Args: - message: A result message from the DSS output queue - - Raises: - InvalidSQSMessageError: If message fails validation - """ - try: - attrs = message.get("MessageAttributes", {}) - jsonschema.validate(instance=attrs, schema=RESULT_MESSAGE_ATTRIBUTES) - - body = json.loads(message.get("Body", "{}")) - jsonschema.validate(instance=body, schema=RESULT_MESSAGE_BODY) - - return cls( - item_identifier=attrs["PackageID"]["StringValue"], - submission_source=attrs["SubmissionSource"]["StringValue"], - result_type=body["ResultType"], - dspace_handle=body.get("ItemHandle"), - last_modified=body.get("lastModified"), - error_info=body.get("ErrorInfo", "Unknown error"), - error_timestamp=body.get("ErrorTimestamp"), - dspace_response=body.get("DSpaceResponse"), - exception_traceback=body.get("ExceptionTraceback"), - message_id=message["MessageId"], - receipt_handle=message["ReceiptHandle"], - raw_message=message, - ) - - except (KeyError, json.JSONDecodeError) as exception: - raise InvalidSQSMessageError( - f"Failed to parse result message: {exception}" - ) from exception - except jsonschema.exceptions.ValidationError as exception: - raise InvalidSQSMessageError( - f"Result message failed schema validation: {exception}" - ) from exception - - -class Workflow(ABC): - """A base workflow class from which other workflow classes are derived.""" - - workflow_name: str = "base" - submission_system: str = "DSpace@MIT" - - def __init__(self, batch_id: str) -> None: - """Initialize base instance. - - Args: - batch_id: Unique identifier for a 'batch' deposit that corresponds - to the name of a subfolder in the workflow directory of the S3 bucket. - This subfolder is where the S3 client will search for bitstream - and metadata files. - """ - self.batch_id = batch_id - self.run_date = datetime.now(UTC) - self.exclude_prefixes: list[str] = [ - "archived/", - "dspace_metadata/", - f"{self.batch_path}metadata.csv", - ] - self.submission_summary: dict[str, int] = { - "total": 0, - "submitted": 0, - "skipped": 0, - "errors": 0, - } - - # cache list of bitstreams - self._batch_bitstream_uris: list[str] | None = None - - @property - @abstractmethod - def metadata_mapping_path(self) -> str: - """Path to the JSON metadata mapping file for the workflow.""" - - @property - def metadata_mapping(self) -> dict: - with open(self.metadata_mapping_path) as mapping_file: - return json.load(mapping_file) - - @final - @property - def s3_bucket(self) -> str: - return CONFIG.s3_bucket_submission_assets - - @property - def output_queue(self) -> str: - """The SQS output queue for the DSS result messages.""" - return f"dss-output-dsc-{CONFIG.workspace}" - - @property - def batch_path(self) -> str: - return f"{self.workflow_name}/{self.batch_id}/" - - @property - def batch_bitstream_uris(self) -> list[str]: - if not self._batch_bitstream_uris: - self._batch_bitstream_uris = self.get_batch_bitstream_uris() - return self._batch_bitstream_uris - - @property - def retry_threshold(self) -> int: - return CONFIG.retry_threshold - - @final - @classmethod - def get_workflow(cls, workflow_name: str) -> type[Workflow]: - """Return workflow class. - - Args: - workflow_name: The label of the workflow. Must match a workflow_name attribute - from Workflow subclass. - """ - for workflow_class in cls._get_subclasses(): - if workflow_name == workflow_class.workflow_name: - return workflow_class - raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") - - @classmethod - def _get_subclasses(cls) -> Iterator[type[Workflow]]: - for subclass in cls.__subclasses__(): - yield from subclass._get_subclasses() # noqa: SLF001 - yield subclass - - @abstractmethod - def get_batch_bitstream_uris(self) -> list[str]: - """Get list of bitstream URIs for a batch.""" - - @final - def get_item_bitstream_uris(self, item_identifier: str) -> list[str]: - """Get list of bitstreams URIs for an item.""" - return [uri for uri in self.batch_bitstream_uris if item_identifier in uri] - - @abstractmethod - def item_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Iterate through batch metadata to yield item metadata. - - MUST be overridden by workflow subclasses. - """ - - @final - def create_batch(self, *, synced: bool = False) -> None: - """Create a batch of item submissions for processing. - - A "batch" refers to a collection of item submissions that are grouped together - for coordinated processing, storage, and workflow execution. Each batch - typically consists of multiple items, each with its own metadata and - associated files, organized under a unique batch identifier. - - This method prepares the necessary assets in S3 (programmatically as needed) - and records each item in the batch to DynamoDB. - """ - item_submissions, errors = self.prepare_batch(synced=synced) - if errors: - raise BatchCreationFailedError(errors) - self._create_batch_in_db(item_submissions) - - @abstractmethod - def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: - """Prepare batch submission assets in S3. - - This method performs the required steps to prepare a batch - of item submissions in S3. These steps must include (at minimum) - the following checks: - - - Check if there is metadata for the item submission; - otherwise raise dsc.exceptions.ItemMetadataNotFoundError - - Check if there are any bitstreams for the item submission; - otherwise raise dsc.exceptions.ItemBitstreamsNotFoundError - - MUST be overridden by workflow subclasses. - - Returns: - A tuple of item submissions (init params) represented as a - list of dicts and errors represented as a list of tuples - containing the item identifier and the error message. - """ - pass # noqa: PIE790 - - @final - def _create_batch_in_db(self, item_submissions: list[dict]) -> None: - """Write records for a batch of item submissions to DynamoDB. - - This method loops through the item submissions (init params) - represented as a list dicts. For each item submission, the - method creates an instance of ItemSubmission and saves the - record to DynamoDB. - """ - for item_submission_init_params in item_submissions: - item_submission = ItemSubmission.create(**item_submission_init_params) - item_submission.last_run_date = self.run_date - item_submission.status = ItemSubmissionStatus.BATCH_CREATED - item_submission.status_details = None - item_submission.save() - - @final - def submit_items(self, collection_handle: str) -> list: - """Submit items to the DSpace Submission Service according to the workflow class. - - Args: - collection_handle: The handle of the DSpace collection to which the batch will - be submitted. - - - Returns a dict with the submission results organized into succeeded and failed - items. - """ - logger.info( - f"Submitting messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " - f"for batch '{self.batch_id}'" - ) - - batch_metadata = { - item_metadata["item_identifier"]: item_metadata - for item_metadata in self.item_metadata_iter() - } - - items = [] - for item_submission in ItemSubmission.get_batch(self.batch_id): - - self.submission_summary["total"] += 1 - item_identifier = item_submission.item_identifier - logger.debug(f"Preparing submission for item: {item_identifier}") - item_submission.last_run_date = self.run_date - - # validate whether a message should be sent for this item submission - if not item_submission.ready_to_submit(): - self.submission_summary["skipped"] += 1 - continue - try: - # prepare submission assets - item_submission.prepare_dspace_metadata( - metadata_mapping=self.metadata_mapping, - item_metadata=batch_metadata[item_identifier], - s3_bucket=self.s3_bucket, - batch_path=self.batch_path, - ) - item_submission.bitstream_s3_uris = self.get_item_bitstream_uris( - item_identifier - ) - - # Send submission message to DSS input queue - response = item_submission.send_submission_message( - self.workflow_name, - self.output_queue, - self.submission_system, - collection_handle, - ) - - # Record details of the item submission message - item_data = { - "item_identifier": item_identifier, - "message_id": response["MessageId"], - } - items.append(item_data) - self.submission_summary["submitted"] += 1 - - logger.info(f"Sent item submission message: {item_data["message_id"]}") - - # Set status in DynamoDB - item_submission.status = ItemSubmissionStatus.SUBMIT_SUCCESS - item_submission.status_details = None - item_submission.submit_attempts += 1 - item_submission.upsert_db() - except Exception as exception: # noqa: BLE001 - self.submission_summary["errors"] += 1 - item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED - item_submission.status_details = str(exception) - item_submission.submit_attempts += 1 - item_submission.upsert_db() - - logger.info( - f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " - f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" - ) - return items - - @final - def finalize_items(self) -> None: - """Examine results for all item submissions in the batch. - - This method involves three main steps: - - 1. Process DSS result messages from the output queue - 2. Apply workflow-specific processing - """ - logger.info( - f"Processing DSS result messages from the output queue '{self.output_queue}'" - ) - sqs_results_summary = { - "received_messages": 0, - "ingest_success": 0, - "ingest_failed": 0, - "ingest_unknown": 0, - } - - # retrieve and create map of result messages - sqs_client = SQSClient( - region=CONFIG.aws_region_name, queue_name=self.output_queue - ) - logger.info( - f"Processing DSS result messages from the output queue '{self.output_queue}'" - ) - result_message_map: dict[str, DSSResultMessage] = {} - for message in sqs_client.receive(): - try: - result_message_object = DSSResultMessage.from_result_message(message) - result_message_map[result_message_object.item_identifier] = ( - result_message_object - ) - except InvalidSQSMessageError: - logger.exception(f"Failure parsing message '{message}'") - continue - - sqs_results_summary["received_messages"] = len(result_message_map) - - # retrieve item submissions from batch - for item_submission in ItemSubmission.get_batch(self.batch_id): - log_str = ITEM_SUBMISSION_LOG_STR.format( - batch_id=self.batch_id, item_identifier=item_submission.item_identifier - ) - if item_submission.status == ItemSubmissionStatus.INGEST_SUCCESS: - logger.debug(f"Record {log_str} already ingested, skipping") - continue - - item_submission.ingest_attempts += 1 - - result_message = result_message_map.get(item_submission.item_identifier) - - # skip item submission if result message is not found - if not result_message: - continue - - # update item submission status based on ingest result - if result_message.result_type == "success": - item_submission.status = ItemSubmissionStatus.INGEST_SUCCESS - item_submission.status_details = None - item_submission.dspace_handle = result_message.dspace_handle - sqs_results_summary["ingest_success"] += 1 - logger.debug(f"Record {log_str} was ingested") - elif result_message.result_type == "error": - item_submission.status = ItemSubmissionStatus.INGEST_FAILED - item_submission.status_details = result_message.error_info - sqs_results_summary["ingest_failed"] += 1 - logger.debug(f"Record {log_str} failed to ingest") - else: - item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN - sqs_results_summary["ingest_unknown"] += 1 - logger.debug(f"Unable to determine ingest status for record {log_str}") - item_submission.last_result_message = str(result_message.raw_message) - item_submission.last_run_date = self.run_date - item_submission.upsert_db() - sqs_client.delete( - receipt_handle=result_message.receipt_handle, - message_id=result_message.message_id, - ) - - # optional method used for some workflows - self.workflow_specific_processing() - - logger.info( - f"Processed DSS result messages from the output queue '{self.output_queue}': " - f"{json.dumps(sqs_results_summary)}" - ) - - def workflow_specific_processing(self) -> None: - logger.info( - f"No extra processing for batch based on workflow: " - f"'{self.workflow_name}' " - ) - - def send_report(self, report: Report, email_recipients: list[str]) -> None: - """Send report as an email via SES.""" - logger.info(f"Sending report to recipients: {email_recipients}") - ses_client = SESClient(region=CONFIG.aws_region_name) - ses_client.create_and_send_email( - subject=report.subject, - source_email_address=CONFIG.source_email, - recipient_email_addresses=email_recipients, - message_body=report.generate_summary(), - attachments=report.generate_attachments(), - ) +__all__ = ["Transformer", "Workflow"] diff --git a/dsc/workflows/base/transformer.py b/dsc/workflows/base/transformer.py new file mode 100644 index 0000000..4b5f485 --- /dev/null +++ b/dsc/workflows/base/transformer.py @@ -0,0 +1,90 @@ +import inspect +import itertools +import logging +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterable +from typing import Any, final + +logger = logging.getLogger(__name__) + + +class Transformer(ABC): + """Base metadata transformer class. + + When defining a Transformer for a given workflow, you must: + + 1. Provide an implementation of Transformer.optional_fields() class method: + + Return a list of Dublin Core elements in dot (.) notation that is + expected in the transformed metadata. The provided values must + use the following format: dc.element.qualifier. + + 2. Provide class methods for metadata fields that (a) must be derived + dynamically or (b) set to a static value. + + Class methods may accept 'source_metadata' as a parameter. + + 3. [OPTIONAL] Provide class methods for metadata fields that are simply + mapped from an existing column in the source metadata. If provided, + the class method must accept 'source_metadata' as a parameter. + + Note: This is optional because the Transformer.transform() will + attempt to apply simple 1-1 mapping if a class method is not found + for a specified field. + """ + + # based on 'Metadata Recommendations' from DSpace docs + required_fields: Iterable[str] = ["dc.title", "dc.date.issued"] + + @classmethod + @abstractmethod + def optional_fields(cls) -> Iterable[str]: + """Dublin Core metadata fields.""" + + @final + @classmethod + def transform(cls, source_metadata: dict) -> dict | None: + """Transform source metadata. + + This method will iterate over all fields--combined from + Transformer.required and Transformer.optional_fields--and either + calls field methods (if provided) or applies simple 1-1 mapping + to generate transformed metadata. + + If a class method is not found for a given field and it is also + not found in 'source_metadata', the function returns None, which + indicates that something went wrong. + """ + transformed_metadata: dict[str, Any] = {} + + all_fields = itertools.chain(cls.required_fields, cls.optional_fields()) + + for field in all_fields: + field_method = cls.get_field_method(field) + if field_method: + transformed_metadata[field] = cls._call_field_method( + field_method, source_metadata + ) + else: + transformed_metadata[field] = source_metadata.get(field) + + return transformed_metadata + + @classmethod + def get_field_method(cls, field: str) -> Callable | None: + field_method_name = field.replace(".", "_") + try: + return getattr(cls, field_method_name) + except AttributeError: + return None + + @classmethod + def _call_field_method( + cls, field_method: Callable, source_metadata: dict + ) -> Any: # noqa: ANN401 + """Invoke field method with source metadata (as needed).""" + # check if 'source_metadata' is in signature + signature = inspect.signature(field_method) + if "source_metadata" in signature.parameters: + return field_method(source_metadata) + return field_method() diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py new file mode 100644 index 0000000..df89bbc --- /dev/null +++ b/dsc/workflows/base/workflow.py @@ -0,0 +1,477 @@ +from __future__ import annotations + +import json +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any, final + +import jsonschema +import jsonschema.exceptions + +from dsc.config import Config +from dsc.db.models import ItemSubmissionStatus +from dsc.exceptions import ( + BatchCreationFailedError, + DSpaceMetadataUploadError, + InvalidSQSMessageError, + InvalidWorkflowNameError, + ItemMetadataMissingRequiredFieldError, +) +from dsc.item_submission import ItemSubmission +from dsc.reports import Report +from dsc.utilities.aws import S3Client, SESClient, SQSClient +from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Iterator + + from mypy_boto3_sqs.type_defs import MessageTypeDef + + from dsc.reports import Report + from dsc.workflows.base import Transformer + +logger = logging.getLogger(__name__) +CONFIG = Config() + +ITEM_SUBMISSION_LOG_STR = ( + "with primary keys batch_id={batch_id} (hash key) and " + "item_identifier={item_identifier} (range key)" +) + + +@dataclass +class DSSResultMessage: + """Represents a parsed DSpace Submission Service result message.""" + + item_identifier: str + submission_source: str + result_type: str + dspace_handle: str | None + last_modified: str | None + error_info: str | None + error_timestamp: str | None + dspace_response: str | None + exception_traceback: list[str] | None + message_id: str + receipt_handle: str + raw_message: MessageTypeDef + + @classmethod + def from_result_message(cls, message: MessageTypeDef) -> DSSResultMessage: + """Create instance from result message. + + Args: + message: A result message from the DSS output queue + + Raises: + InvalidSQSMessageError: If message fails validation + """ + try: + attrs = message.get("MessageAttributes", {}) + jsonschema.validate(instance=attrs, schema=RESULT_MESSAGE_ATTRIBUTES) + + body = json.loads(message.get("Body", "{}")) + jsonschema.validate(instance=body, schema=RESULT_MESSAGE_BODY) + + return cls( + item_identifier=attrs["PackageID"]["StringValue"], + submission_source=attrs["SubmissionSource"]["StringValue"], + result_type=body["ResultType"], + dspace_handle=body.get("ItemHandle"), + last_modified=body.get("lastModified"), + error_info=body.get("ErrorInfo", "Unknown error"), + error_timestamp=body.get("ErrorTimestamp"), + dspace_response=body.get("DSpaceResponse"), + exception_traceback=body.get("ExceptionTraceback"), + message_id=message["MessageId"], + receipt_handle=message["ReceiptHandle"], + raw_message=message, + ) + + except (KeyError, json.JSONDecodeError) as exception: + raise InvalidSQSMessageError( + f"Failed to parse result message: {exception}" + ) from exception + except jsonschema.exceptions.ValidationError as exception: + raise InvalidSQSMessageError( + f"Result message failed schema validation: {exception}" + ) from exception + + +class Workflow(ABC): + """A base workflow class from which other workflow classes are derived.""" + + workflow_name: str = "base" + submission_system: str = "DSpace@MIT" + + def __init__(self, batch_id: str) -> None: + """Initialize base instance. + + Args: + batch_id: Unique identifier for a 'batch' deposit that corresponds + to the name of a subfolder in the workflow directory of the S3 bucket. + This subfolder is where the S3 client will search for bitstream + and metadata files. + """ + self.batch_id = batch_id + self.run_date = datetime.now(UTC) + self.exclude_prefixes: list[str] = [ + "archived/", + "dspace_metadata/", + f"{self.batch_path}metadata.csv", + ] + self.submission_summary: dict[str, int] = { + "total": 0, + "submitted": 0, + "skipped": 0, + "errors": 0, + } + + # cache list of bitstreams + self._batch_bitstream_uris: list[str] | None = None + self._batch_dspace_metadata_json_uris: list[str] | None = None + + @property + @abstractmethod + def metadata_transformer(self) -> type[Transformer]: + """Transformer for source metadata.""" + + @final + @property + def s3_bucket(self) -> str: + return CONFIG.s3_bucket_submission_assets + + @property + def output_queue(self) -> str: + """The SQS output queue for the DSS result messages.""" + return f"dss-output-dsc-{CONFIG.workspace}" + + @property + def batch_path(self) -> str: + return f"{self.workflow_name}/{self.batch_id}/" + + @property + def batch_bitstream_uris(self) -> list[str]: + if not self._batch_bitstream_uris: + self._batch_bitstream_uris = self.get_batch_bitstream_uris() + return self._batch_bitstream_uris + + @property + def batch_dspace_metadata_json_uris(self) -> list[str]: + if not self._batch_dspace_metadata_json_uris: + s3_client = S3Client() + self._batch_dspace_metadata_json_uris = list( + s3_client.files_iter( + bucket=self.s3_bucket, + prefix=f"{self.batch_path}dspace_metadata/", + file_type=".json", + ) + ) + return self._batch_dspace_metadata_json_uris + + @property + def retry_threshold(self) -> int: + return CONFIG.retry_threshold + + @final + @classmethod + def get_workflow(cls, workflow_name: str) -> type[Workflow]: + """Return workflow class. + + Args: + workflow_name: The label of the workflow. Must match a workflow_name attribute + from Workflow subclass. + """ + for workflow_class in cls._get_subclasses(): + if workflow_name == workflow_class.workflow_name: + return workflow_class + raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") + + @classmethod + def _get_subclasses(cls) -> Iterator[type[Workflow]]: + for subclass in cls.__subclasses__(): + yield from subclass._get_subclasses() # noqa: SLF001 + yield subclass + + @abstractmethod + def get_batch_bitstream_uris(self) -> list[str]: + """Get list of bitstream URIs for a batch.""" + + @final + def get_item_bitstream_uris(self, item_identifier: str) -> list[str]: + """Get list of bitstreams URIs for an item.""" + return [uri for uri in self.batch_bitstream_uris if item_identifier in uri] + + @final + def get_item_dspace_metadata_json_uri(self, item_identifier: str) -> str: + return next( + uri for uri in self.batch_dspace_metadata_json_uris if item_identifier in uri + ) + + @abstractmethod + def item_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Iterate through batch metadata to yield item metadata. + + MUST be overridden by workflow subclasses. + """ + + @final + def create_batch(self, *, synced: bool = False) -> None: + """Create a batch of item submissions for processing. + + A "batch" refers to a collection of item submissions that are grouped together + for coordinated processing, storage, and workflow execution. Each batch + typically consists of multiple items, each with its own metadata and + associated files, organized under a unique batch identifier. + + This method prepares the necessary assets in S3 (programmatically as needed) + and records each item in the batch to DynamoDB. + """ + item_submissions, errors = self._prepare_batch(synced=synced) + if errors: + raise BatchCreationFailedError(errors) + + _dspace_metadata_uris, errors = self._create_dspace_metadata_json( + item_submissions + ) + if errors: + raise BatchCreationFailedError(errors) + + self._create_batch_in_db(item_submissions) + + @abstractmethod + def _prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: + """Prepare batch submission assets in S3. + + This method performs the required steps to prepare a batch + of item submissions in S3. These steps must include (at minimum) + the following checks: + + - Check if there is metadata for the item submission; + otherwise raise dsc.exceptions.ItemMetadataNotFoundError + - Check if there are any bitstreams for the item submission; + otherwise raise dsc.exceptions.ItemBitstreamsNotFoundError + + MUST be overridden by workflow subclasses. + + Returns: + A tuple containing list of ItemSubmission's and + errors represented as a list of tuples + containing the item identifier and the error message. + """ + pass # noqa: PIE790 + + @final + def _create_dspace_metadata_json( + self, item_submissions: list[ItemSubmission] + ) -> tuple[list, ...]: + dspace_metadata_uris = [] + errors = [] + for item_submission in item_submissions: + try: + uri = item_submission.prepare_dspace_metadata( + self.s3_bucket, self.batch_path + ) + dspace_metadata_uris.append(uri) + except ( + DSpaceMetadataUploadError, + ItemMetadataMissingRequiredFieldError, + ) as exception: + errors.append((item_submission.item_identifier, str(exception))) + return dspace_metadata_uris, errors + + @final + def _create_batch_in_db(self, item_submissions: list[ItemSubmission]) -> None: + """Write records for a batch of item submissions to DynamoDB. + + For each ItemSubmission, the method updates the last_run_date, + status, and status_details attributes and saves the + record to DynamoDB. + """ + for item_submission in item_submissions: + item_submission.last_run_date = self.run_date + item_submission.status = ItemSubmissionStatus.BATCH_CREATED + item_submission.status_details = None + item_submission.save() + + @final + def submit_items(self, collection_handle: str) -> list: + """Submit items to the DSpace Submission Service according to the workflow class. + + Args: + collection_handle: The handle of the DSpace collection to which the batch will + be submitted. + + + Returns a dict with the submission results organized into succeeded and failed + items. + """ + logger.info( + f"Submitting messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " + f"for batch '{self.batch_id}'" + ) + + items = [] + for item_submission in ItemSubmission.get_batch(self.batch_id): + + self.submission_summary["total"] += 1 + item_identifier = item_submission.item_identifier + logger.debug(f"Preparing submission for item: {item_identifier}") + item_submission.last_run_date = self.run_date + + # validate whether a message should be sent for this item submission + if not item_submission.ready_to_submit(): + self.submission_summary["skipped"] += 1 + continue + try: + # prepare submission assets + item_submission.metadata_s3_uri = self.get_item_dspace_metadata_json_uri( + item_identifier + ) + item_submission.bitstream_s3_uris = self.get_item_bitstream_uris( + item_identifier + ) + + # Send submission message to DSS input queue + response = item_submission.send_submission_message( + self.workflow_name, + self.output_queue, + self.submission_system, + collection_handle, + ) + + # Record details of the item submission message + item_data = { + "item_identifier": item_identifier, + "message_id": response["MessageId"], + } + items.append(item_data) + self.submission_summary["submitted"] += 1 + + logger.info(f"Sent item submission message: {item_data["message_id"]}") + + # Set status in DynamoDB + item_submission.status = ItemSubmissionStatus.SUBMIT_SUCCESS + item_submission.status_details = None + item_submission.submit_attempts += 1 + item_submission.upsert_db() + except Exception as exception: # noqa: BLE001 + self.submission_summary["errors"] += 1 + item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED + item_submission.status_details = str(exception) + item_submission.submit_attempts += 1 + item_submission.upsert_db() + + logger.info( + f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " + f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" + ) + return items + + @final + def finalize_items(self) -> None: + """Examine results for all item submissions in the batch. + + This method involves three main steps: + + 1. Process DSS result messages from the output queue + 2. Apply workflow-specific processing + """ + logger.info( + f"Processing DSS result messages from the output queue '{self.output_queue}'" + ) + sqs_results_summary = { + "received_messages": 0, + "ingest_success": 0, + "ingest_failed": 0, + "ingest_unknown": 0, + } + + # retrieve and create map of result messages + sqs_client = SQSClient( + region=CONFIG.aws_region_name, queue_name=self.output_queue + ) + logger.info( + f"Processing DSS result messages from the output queue '{self.output_queue}'" + ) + result_message_map: dict[str, DSSResultMessage] = {} + for message in sqs_client.receive(): + try: + result_message_object = DSSResultMessage.from_result_message(message) + result_message_map[result_message_object.item_identifier] = ( + result_message_object + ) + except InvalidSQSMessageError: + logger.exception(f"Failure parsing message '{message}'") + continue + + sqs_results_summary["received_messages"] = len(result_message_map) + + # retrieve item submissions from batch + for item_submission in ItemSubmission.get_batch(self.batch_id): + log_str = ITEM_SUBMISSION_LOG_STR.format( + batch_id=self.batch_id, item_identifier=item_submission.item_identifier + ) + if item_submission.status == ItemSubmissionStatus.INGEST_SUCCESS: + logger.debug(f"Record {log_str} already ingested, skipping") + continue + + item_submission.ingest_attempts += 1 + + result_message = result_message_map.get(item_submission.item_identifier) + + # skip item submission if result message is not found + if not result_message: + continue + + # update item submission status based on ingest result + if result_message.result_type == "success": + item_submission.status = ItemSubmissionStatus.INGEST_SUCCESS + item_submission.status_details = None + item_submission.dspace_handle = result_message.dspace_handle + sqs_results_summary["ingest_success"] += 1 + logger.debug(f"Record {log_str} was ingested") + elif result_message.result_type == "error": + item_submission.status = ItemSubmissionStatus.INGEST_FAILED + item_submission.status_details = result_message.error_info + sqs_results_summary["ingest_failed"] += 1 + logger.debug(f"Record {log_str} failed to ingest") + else: + item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN + sqs_results_summary["ingest_unknown"] += 1 + logger.debug(f"Unable to determine ingest status for record {log_str}") + item_submission.last_result_message = str(result_message.raw_message) + item_submission.last_run_date = self.run_date + item_submission.upsert_db() + sqs_client.delete( + receipt_handle=result_message.receipt_handle, + message_id=result_message.message_id, + ) + + # optional method used for some workflows + self.workflow_specific_processing() + + logger.info( + f"Processed DSS result messages from the output queue '{self.output_queue}': " + f"{json.dumps(sqs_results_summary)}" + ) + + def workflow_specific_processing(self) -> None: + logger.info( + f"No extra processing for batch based on workflow: " + f"'{self.workflow_name}' " + ) + + def send_report(self, report: Report, email_recipients: list[str]) -> None: + """Send report as an email via SES.""" + logger.info(f"Sending report to recipients: {email_recipients}") + ses_client = SESClient(region=CONFIG.aws_region_name) + ses_client.create_and_send_email( + subject=report.subject, + source_email_address=CONFIG.source_email, + recipient_email_addresses=email_recipients, + message_body=report.generate_summary(), + attachments=report.generate_attachments(), + ) diff --git a/dsc/workflows/metadata_mapping/archivesspace.json b/dsc/workflows/metadata_mapping/archivesspace.json deleted file mode 100644 index 32bfa7b..0000000 --- a/dsc/workflows/metadata_mapping/archivesspace.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "dc.title": { - "source_field_name": "title", - }, - "dc.contributor.author": { - "source_field_name": "author", - "delimiter": "|" - }, - "dc.description": { - "source_field_name": "description", - "language": "en_US" - }, - "dc.rights": { - "source_field_name": "rights_statement", - "language": "en_US" - }, - "dc.rights.uri": { - "source_field_name": "rights_uri" - } -} \ No newline at end of file diff --git a/dsc/workflows/metadata_mapping/opencourseware.json b/dsc/workflows/metadata_mapping/opencourseware.json deleted file mode 100644 index 7908b8c..0000000 --- a/dsc/workflows/metadata_mapping/opencourseware.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "dc.title": { - "source_field_name": "dc.title", - "required": true - }, - "dc.date.issued": { - "source_field_name": "dc.date.issued", - "required": true - }, - "dc.description.abstract": { - "source_field_name": "dc.description.abstract" - }, - "dc.contributor.author": { - "source_field_name": "dc.contributor.author" - }, - "dc.contributor.department": { - "source_field_name": "dc.contributor.department" - }, - "creativework.learningresourcetype": { - "source_field_name": "creativework.learningresourcetype" - }, - "dc.subject": { - "source_field_name": "dc.subject" - }, - "dc.identifier.other": { - "source_field_name": "dc.identifier.other" - }, - "dc.coverage.temporal": { - "source_field_name": "dc.coverage.temporal" - }, - "dc.audience.educationlevel": { - "source_field_name": "dc.audience.educationlevel" - }, - "dc.type": { - "source_field_name": "dc.type" - }, - "dc.rights": { - "source_field_name": "dc.rights" - }, - "dc.rights.uri": { - "source_field_name": "dc.rights.uri" - }, - "dc.language.iso": { - "source_field_name": "dc.language.iso" - } -} \ No newline at end of file diff --git a/dsc/workflows/metadata_mapping/sccs.json b/dsc/workflows/metadata_mapping/sccs.json deleted file mode 100644 index 1a44fc2..0000000 --- a/dsc/workflows/metadata_mapping/sccs.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "dc.title": { - "source_field_name": "dc.title", - "language": "en_US", - "required": true - }, - "dc.publisher": { - "source_field_name": "dc.publisher", - "language": "en_US" - }, - "dc.identifier.mitlicense": { - "source_field_name": "dc.identifier.mitlicense", - "language": "en_US" - }, - "dc.eprint.version": { - "source_field_name": "dc.eprint.version", - "language": "en_US" - }, - "dc.type": { - "source_field_name": "dc.type", - "language": "en_US" - }, - "dc.type.uri": { - "source_field_name": "dc.type.uri" - }, - "dc.source": { - "source_field_name": "dc.source", - "language": "en_US" - }, - "dc.contributor.author": { - "source_field_name": "dc.contributor.author", - "language": "en_US", - "delimiter": "|" - }, - "dc.contributor.department": { - "source_field_name": "dc.contributor.department" - }, - "dc.relation.isversionof": { - "source_field_name": "dc.relation.isversionof" - }, - "dc.relation.journal": { - "source_field_name": "dc.relation.journal" - }, - "dc.identifier.issn": { - "source_field_name": "dc.identifier.issn" - }, - "dc.date.issued": { - "source_field_name": "dc.date.issued" - }, - "dc.date.submitted": { - "source_field_name": "dc.date.submitted" - }, - "dc.rights": { - "source_field_name": "dc.rights", - "language": "en_US" - }, - "dc.rights.uri": { - "source_field_name": "dc.rights.uri" - }, - "dc.description": { - "source_field_name": "dc.description" - }, - "dc.description.sponsorship": { - "source_field_name": "dc.description.sponsorship", - "language": "en_US" - } -} \ No newline at end of file diff --git a/dsc/workflows/opencourseware/__init__.py b/dsc/workflows/opencourseware/__init__.py new file mode 100644 index 0000000..edc93f3 --- /dev/null +++ b/dsc/workflows/opencourseware/__init__.py @@ -0,0 +1,4 @@ +from dsc.workflows.opencourseware.transformer import OpenCourseWareTransformer +from dsc.workflows.opencourseware.workflow import OpenCourseWare + +__all__ = ["OpenCourseWare", "OpenCourseWareTransformer"] diff --git a/dsc/workflows/opencourseware.py b/dsc/workflows/opencourseware/transformer.py similarity index 65% rename from dsc/workflows/opencourseware.py rename to dsc/workflows/opencourseware/transformer.py index fad76ad..14f52b5 100644 --- a/dsc/workflows/opencourseware.py +++ b/dsc/workflows/opencourseware/transformer.py @@ -1,40 +1,30 @@ -import inspect -import json -import logging -import zipfile -from collections.abc import Iterable, Iterator -from typing import Any, ClassVar +from collections.abc import Iterable +from typing import ClassVar -import smart_open +from dsc.workflows.base import Transformer -from dsc.exceptions import ItemMetadataNotFoundError -from dsc.utilities.aws.s3 import S3Client -from dsc.workflows.base import Workflow -logger = logging.getLogger(__name__) - - -class OpenCourseWareTransformer: +class OpenCourseWareTransformer(Transformer): """Transformer for OpenCourseWare (OCW) source metadata.""" - fields: Iterable[str] = [ - # fields with derived values - "dc_title", - "dc_date_issued", - "dc_description_abstract", - "dc_contributor_author", - "dc_contributor_department", - "creativework_learningresourcetype", - "dc_subject", - "dc_identifier_other", - "dc_coverage_temporal", - "dc_audience_educationlevel", - # fields with static values - "dc_type", - "dc_rights", - "dc_rights_uri", - "dc_language_iso", - ] + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + # fields with derived values + "dc.description.abstract", + "dc.contributor.author", + "dc.contributor.department", + "creativework.learningresourcetype", + "dc.subject", + "dc.identifier.other", + "dc.coverage.temporal", + "dc.audience.educationlevel", + # fields with static values + "dc.type", + "dc.rights", + "dc.rights.uri", + "dc.language.iso", + ] department_mappings: ClassVar = { "1": "Massachusetts Institute of Technology. Department of Civil and Environmental Engineering", # noqa: E501 @@ -76,27 +66,6 @@ class OpenCourseWareTransformer: "EC": "Edgerton Center (Massachusetts Institute of Technology)", } - @classmethod - def transform(cls, source_metadata: dict) -> dict: - """Transform source metadata.""" - transformed_metadata: dict[str, Any] = {} - - if not source_metadata: - return transformed_metadata - - for field in cls.fields: - field_method = getattr(cls, field) - formatted_field_name = field.replace("_", ".") - - # check if 'source_metadata' is in signature - signature = inspect.signature(field_method) - if "source_metadata" in signature.parameters: - transformed_metadata[formatted_field_name] = field_method(source_metadata) - else: - transformed_metadata[formatted_field_name] = field_method() - - return transformed_metadata - @classmethod def dc_title(cls, source_metadata: dict) -> str: """Build a title string from course numbers, title, and term year. @@ -314,119 +283,3 @@ def dc_rights_uri(cls) -> str: @classmethod def dc_language_iso(cls) -> str: return "en_US" - - -class OpenCourseWare(Workflow): - """Workflow for OpenCourseWare (OCW) deposits. - - The deposits managed by this workflow are requested by the - Scholarly Communications and Collections Strategy (SCCS) department - and were previously deposited into DSpace@MIT by Technical services staff. - """ - - workflow_name: str = "opencourseware" - metadata_transformer = OpenCourseWareTransformer - - @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/opencourseware.json" - - def get_batch_bitstream_uris(self) -> list[str]: - """Get list of URIs for all zipfiles within the batch folder.""" - s3_client = S3Client() - return list( - s3_client.files_iter( - bucket=self.s3_bucket, - prefix=self.batch_path, - file_type=".zip", - exclude_prefixes=self.exclude_prefixes, - ) - ) - - def item_metadata_iter(self) -> Iterator[dict[str, Any]]: - """Yield item metadata from metadata JSON file in the zip file. - - If the zip file does not include a metadata JSON file (data.json), - this method yields a dict containing only the item identifier. - Otherwise, a dict containing the item identifier and transformed metadata - is yielded. - - NOTE: Item identifiers are retrieved from the filenames of the zip - files, which follow the naming format ".zip". - """ - for file in self.batch_bitstream_uris: - try: - source_metadata = self._read_metadata_from_zip_file(file) - except FileNotFoundError: - source_metadata = {} - - transformed_metadata = self.metadata_transformer.transform(source_metadata) - - yield { - "item_identifier": self._parse_item_identifier(file), - **transformed_metadata, - } - - def _read_metadata_from_zip_file(self, file: str) -> dict[str, str]: - """Read source metadata JSON file in zip archive. - - This method expects a JSON file called "data.json" at the root - level of the the zip file. - - Args: - file: Object prefix for bitstream zip file, formatted as the - path from the S3 bucket to the file. - Given an S3 URI "s3://dsc/opencourseware/batch-00/123.zip", - then file = "opencourseware/batch-00/123.zip". - """ - with ( - smart_open.open(file, "rb") as file_input, - zipfile.ZipFile(file_input) as zip_file, - zip_file.open("data.json") as json_file, - ): - return json.load(json_file) - - def _parse_item_identifier(self, file: str) -> str: - """Parse item identifier from bitstream zip file.""" - return file.split("/")[-1].removesuffix(".zip") - - def prepare_batch( - self, - *, - synced: bool = False, # noqa: ARG002 - ) -> tuple[list, ...]: - """Prepare a batch of item submissions, given a batch of zip files. - - For this workflow, the expected number of item submissions is determined - by the number of zip files in the batch folder. This method will iterate - over the yielded transformed metadata, checking whether metadata is provided: - - - If only the item identifier is provided and no other metadata is available, - an error is recorded - - If metadata is present, init params are generated for the item submission - - For the OpenCourseWare workflow, the batch preparation steps are the same - for synced vs. non-synced workflows. - """ - item_submissions = [] - errors = [] - - for item_metadata in self.item_metadata_iter(): - # check if metadata is provided - # item identifier is always returned by iter - if len(item_metadata) == 1 and "item_identifier" in item_metadata: - errors.append( - (item_metadata["item_identifier"], str(ItemMetadataNotFoundError())) - ) - continue - - # if item submission includes metadata, save init params - item_submissions.append( - { - "batch_id": self.batch_id, - "item_identifier": item_metadata["item_identifier"], - "workflow_name": self.workflow_name, - } - ) - - return item_submissions, errors diff --git a/dsc/workflows/opencourseware/workflow.py b/dsc/workflows/opencourseware/workflow.py new file mode 100644 index 0000000..5fd0d4c --- /dev/null +++ b/dsc/workflows/opencourseware/workflow.py @@ -0,0 +1,145 @@ +import json +import logging +import zipfile +from collections.abc import Iterator +from typing import Any + +import smart_open + +from dsc.exceptions import ItemMetadataNotFoundError +from dsc.item_submission import ItemSubmission +from dsc.utilities.aws.s3 import S3Client +from dsc.workflows.base import Workflow +from dsc.workflows.opencourseware import OpenCourseWareTransformer + +logger = logging.getLogger(__name__) + + +class OpenCourseWare(Workflow): + """Workflow for OpenCourseWare (OCW) deposits. + + The deposits managed by this workflow are requested by the + Scholarly Communications and Collections Strategy (SCCS) department + and were previously deposited into DSpace@MIT by Technical services staff. + """ + + workflow_name: str = "opencourseware" + + @property + def metadata_transformer(self) -> type[OpenCourseWareTransformer]: + return OpenCourseWareTransformer + + def get_batch_bitstream_uris(self) -> list[str]: + """Get list of URIs for all zipfiles within the batch folder.""" + s3_client = S3Client() + return list( + s3_client.files_iter( + bucket=self.s3_bucket, + prefix=self.batch_path, + file_type=".zip", + exclude_prefixes=self.exclude_prefixes, + ) + ) + + def item_metadata_iter(self) -> Iterator[dict[str, Any]]: + """Yield transformed metadata from metadata JSON file in the zip file. + + If the zip file does not include a metadata JSON file (data.json), + this method yields a dict containing only the item identifier. + Otherwise, a dict containing the item identifier and transformed metadata + is yielded. + + NOTE: Item identifiers are retrieved from the filenames of the zip + files, which follow the naming format ".zip". + + Yields: + A dict containing the item identifier and Dublin Core metadata + for DSpace. + """ + for file in self.batch_bitstream_uris: + try: + source_metadata = self._read_metadata_from_zip_file(file) + except FileNotFoundError: + source_metadata = {} + + transformed_metadata = self.metadata_transformer.transform(source_metadata) + + if transformed_metadata: + yield { + "item_identifier": self._parse_item_identifier(file), + **transformed_metadata, + } + else: + yield { + "item_identifier": self._parse_item_identifier(file), + } + + def _read_metadata_from_zip_file(self, file: str) -> dict[str, str]: + """Read source metadata JSON file in zip archive. + + This method expects a JSON file called "data.json" at the root + level of the the zip file. + + Args: + file: Object prefix for bitstream zip file, formatted as the + path from the S3 bucket to the file. + Given an S3 URI "s3://dsc/opencourseware/batch-00/123.zip", + then file = "opencourseware/batch-00/123.zip". + """ + with ( + smart_open.open(file, "rb") as file_input, + zipfile.ZipFile(file_input) as zip_file, + zip_file.open("data.json") as json_file, + ): + return json.load(json_file) + + def _parse_item_identifier(self, file: str) -> str: + """Parse item identifier from bitstream zip file.""" + return file.split("/")[-1].removesuffix(".zip") + + def _prepare_batch( + self, + *, + synced: bool = False, # noqa: ARG002 + ) -> tuple[list, ...]: + """Prepare a batch of item submissions, given a batch of zip files. + + For this workflow, the expected number of item submissions is determined + by the number of zip files in the batch folder. This method will iterate + over the yielded transformed metadata, checking whether metadata is provided: + + - If only the item identifier is provided and no other metadata is available, + an error is recorded + - If metadata is present, init params are generated for the item submission + + For the OpenCourseWare workflow, the batch preparation steps are the same + for synced vs. non-synced workflows. + """ + item_submissions = [] + errors = [] + + for item_metadata in self.item_metadata_iter(): + # check if metadata is provided + # item identifier is always returned by iter + if len(item_metadata) == 1 and "item_identifier" in item_metadata: + errors.append( + (item_metadata["item_identifier"], str(ItemMetadataNotFoundError())) + ) + continue + + # copy transformed metadata, excluding 'item_identifier' + dspace_metadata = { + k: v for k, v in item_metadata.items() if k != "item_identifier" + } + + # create ItemSubmission + item_submissions.append( + ItemSubmission( + batch_id=self.batch_id, + item_identifier=item_metadata["item_identifier"], + workflow_name=self.workflow_name, + dspace_metadata=dspace_metadata, + ) + ) + + return item_submissions, errors diff --git a/dsc/workflows/sccs/__init__.py b/dsc/workflows/sccs/__init__.py new file mode 100644 index 0000000..3b5bd23 --- /dev/null +++ b/dsc/workflows/sccs/__init__.py @@ -0,0 +1,4 @@ +from dsc.workflows.sccs.transformer import SCCSTransformer +from dsc.workflows.sccs.workflow import SCCS + +__all__ = ["SCCS", "SCCSTransformer"] diff --git a/dsc/workflows/sccs/transformer.py b/dsc/workflows/sccs/transformer.py new file mode 100644 index 0000000..eb7167b --- /dev/null +++ b/dsc/workflows/sccs/transformer.py @@ -0,0 +1,32 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class SCCSTransformer(Transformer): + """Transformer for SCCS source metadata.""" + + @classmethod + def optional_fields(cls) -> Iterable[str]: + return [ + "dc.publisher", + "dc.identifier.mitlicense", + "dc.eprint.version", + "dc.type", + "dc.type.uri", + "dc.source", + "dc.contributor.author", + "dc.contributor.department", + "dc.relation.isversionof", + "dc.relation.journal", + "dc.identifier.issn", + "dc.date.submitted", + "dc.rights", + "dc.rights.uri", + "dc.description", + "dc.description.sponsorship", + ] + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["dc.contributor.author"].split("|") diff --git a/dsc/workflows/sccs.py b/dsc/workflows/sccs/workflow.py similarity index 64% rename from dsc/workflows/sccs.py rename to dsc/workflows/sccs/workflow.py index 0f02ec3..7ab4780 100644 --- a/dsc/workflows/sccs.py +++ b/dsc/workflows/sccs/workflow.py @@ -1,4 +1,5 @@ -from dsc.workflows import SimpleCSV +from dsc.workflows.sccs import SCCSTransformer +from dsc.workflows.simple_csv import SimpleCSV class SCCS(SimpleCSV): @@ -12,8 +13,9 @@ class SCCS(SimpleCSV): workflow_name: str = "sccs" @property - def metadata_mapping_path(self) -> str: - return "dsc/workflows/metadata_mapping/sccs.json" + def metadata_transformer(self) -> type[SCCSTransformer]: + """Transformer for source metadata.""" + return SCCSTransformer @property def item_identifier_column_names(self) -> list[str]: diff --git a/dsc/workflows/simple_csv/__init__.py b/dsc/workflows/simple_csv/__init__.py new file mode 100644 index 0000000..43c9c16 --- /dev/null +++ b/dsc/workflows/simple_csv/__init__.py @@ -0,0 +1,3 @@ +from dsc.workflows.simple_csv.workflow import SimpleCSV + +__all__ = ["SimpleCSV"] diff --git a/dsc/workflows/base/simple_csv.py b/dsc/workflows/simple_csv/workflow.py similarity index 65% rename from dsc/workflows/base/simple_csv.py rename to dsc/workflows/simple_csv/workflow.py index 1cdce97..8d46635 100644 --- a/dsc/workflows/base/simple_csv.py +++ b/dsc/workflows/simple_csv/workflow.py @@ -1,4 +1,5 @@ import logging +from abc import abstractmethod from collections.abc import Iterator import numpy as np @@ -6,8 +7,9 @@ import smart_open from dsc.exceptions import ItemBitstreamsNotFoundError +from dsc.item_submission import ItemSubmission from dsc.utilities.aws import S3Client -from dsc.workflows.base import Workflow +from dsc.workflows.base import Transformer, Workflow logger = logging.getLogger(__name__) @@ -21,6 +23,11 @@ class SimpleCSV(Workflow): workflow_name: str = "simple-csv" + @property + @abstractmethod + def metadata_transformer(self) -> type[Transformer]: + """Transformer for source metadata.""" + @property def item_identifier_column_names(self) -> list[str]: return ["item_identifier"] @@ -35,14 +42,25 @@ def get_batch_bitstream_uris(self) -> list[str]: ) def item_metadata_iter(self, metadata_file: str = "metadata.csv") -> Iterator[dict]: - """Yield dicts of item metadata from metadata CSV file. + """Yield transformed metadata from metadata CSV file. + + This method will read rows from the metadata CSV file then call + self.metadata_transformer.transform to generate Dublin Core + metadata for DSpace. A dict where keys = dc.element.qualifier and + value = transformed/mapped value is returned. For logging purposes, + the dict includes an entry for 'item_identifier'. + + If self.metadata_transformer.transform returns None (i.e., + something went wrong with transformation), the yielded dict will + only include the 'item_identifier'. Args: metadata_file: Metadata CSV filename with the filename extension (.csv) included. Defaults to 'metadata.csv'. Yields: - Item metadata. + A dict containing the item identifier and Dublin Core metadata + for DSpace. """ with smart_open.open( f"s3://{self.s3_bucket}/{self.batch_path}{metadata_file}", @@ -73,12 +91,26 @@ def item_metadata_iter(self, metadata_file: str = "metadata.csv") -> Iterator[di for _, row in metadata_df.iterrows(): # replace all NaN values with None - yield { + source_metadata = { k: (None if isinstance(v, float) and np.isnan(v) else v) for k, v in row.items() } - def prepare_batch( + transformed_metadata = self.metadata_transformer.transform( + source_metadata + ) + + if transformed_metadata: + yield { + "item_identifier": source_metadata["item_identifier"], + **transformed_metadata, + } + else: + yield { + "item_identifier": source_metadata["item_identifier"], + } + + def _prepare_batch( self, *, synced: bool = False, # noqa: ARG002 @@ -112,13 +144,19 @@ def prepare_batch( ) continue - # if item submission has associated bitstreams, save init params + # copy transformed metadata, excluding 'item_identifier' + dspace_metadata = { + k: v for k, v in item_metadata.items() if k != "item_identifier" + } + + # create ItemSubmission item_submissions.append( - { - "batch_id": self.batch_id, - "item_identifier": item_metadata["item_identifier"], - "workflow_name": self.workflow_name, - } + ItemSubmission( + batch_id=self.batch_id, + item_identifier=item_metadata["item_identifier"], + workflow_name=self.workflow_name, + dspace_metadata=dspace_metadata, + ) ) return item_submissions, errors diff --git a/pyproject.toml b/pyproject.toml index 41bbe9f..9eabfd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ [project] name = "dsc" -version = "1.0" # See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#version +version = "1.3" # See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#version requires-python = "~=3.12.0" dependencies = [ diff --git a/tests/conftest.py b/tests/conftest.py index c61c4c5..ea5f47b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,99 +18,23 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows import OpenCourseWare from dsc.workflows.archivesspace import ArchivesSpace -from dsc.workflows.base import Workflow -from dsc.workflows.base.simple_csv import SimpleCSV +from dsc.workflows.opencourseware import OpenCourseWareTransformer +from tests.fixtures.workflows import TestOpenCourseWare, TestSimpleCSV, TestWorkflow +######################### +# Test Workflow classes +######################### -# Test Workflow classes ###################### -class TestWorkflow(Workflow): - workflow_name: str = "test" - submission_system: str = "Test@MIT" - - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - def get_batch_bitstream_uris(self) -> list[str]: - return [ - "s3://dsc/test/batch-aaa/123_01.pdf", - "s3://dsc/test/batch-aaa/123_02.pdf", - "s3://dsc/test/batch-aaa/789_01.pdf", - ] - - def item_metadata_iter(self): - yield from [ - { - "title": "Title", - "contributor": "Author 1|Author 2", - "item_identifier": "123", - }, - { - "title": "2nd Title", - "contributor": "Author 3|Author 4", - "item_identifier": "789", - }, - ] - - def prepare_batch(self, *, synced: bool = False): # noqa: ARG002 - return ( - [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "test", - }, - { - "batch_id": "batch-aaa", - "item_identifier": "789", - "workflow_name": "test", - }, - ], - [], - ) - - -class TestOpenCourseWare(OpenCourseWare): - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - -class TestSimpleCSV(SimpleCSV): - - workflow_name = "simple-csv" - submission_system: str = "Test@MIT" - - @property - def metadata_mapping_path(self) -> str: - return "tests/fixtures/test_metadata_mapping.json" - - @property - def item_identifier_column_names(self) -> list[str]: - return ["item_identifier", "filename"] - - @property - def output_queue(self) -> str: - return "mock-output-queue" - - -# Test Workflow instances #################### @pytest.fixture @freeze_time("2025-01-01 09:00:00") -def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3): +def base_workflow_instance(mocked_s3): return TestWorkflow(batch_id="batch-aaa") @pytest.fixture -def simple_csv_workflow_instance(metadata_mapping): +def simple_csv_workflow_instance(): return TestSimpleCSV(batch_id="batch-aaa") @@ -125,7 +49,7 @@ def opencourseware_workflow_instance(): return TestOpenCourseWare(batch_id="batch-aaa") -# Test fixtures ############################## +# Test fixtures @pytest.fixture(autouse=True) def _test_env(monkeypatch): monkeypatch.setenv("SENTRY_DSN", "None") @@ -158,44 +82,44 @@ def config_instance(): @pytest.fixture -def dspace_metadata(): +def item_submission_source_metadata(): return { - "metadata": [ - { - "key": "dc.title", - "language": "en_US", - "value": "Title", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 1", - }, - { - "key": "dc.contributor", - "language": None, - "value": "Author 2", - }, - ] + "title": "Title", + "date": 2026, + "contributor": "Author 1|Author 2", + "item_identifier": "123", } @pytest.fixture -def item_metadata(): +def item_submission_dspace_metadata(): return { - "title": "Title", - "contributor": "Author 1|Author 2", "item_identifier": "123", + "dc.title": "Title", + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 1", "Author 2"], + } + + +@pytest.fixture +def item_submission_dspace_metadataentry(): + return { + "metadata": [ + {"key": "dc.title", "value": "Title"}, + {"key": "dc.date.issued", "value": "2026"}, + {"key": "dc.contributor.author", "value": "Author 1"}, + {"key": "dc.contributor.author", "value": "Author 2"}, + ] } @pytest.fixture -def item_submission_instance(dspace_metadata): +def item_submission_instance(item_submission_dspace_metadata): return ItemSubmission( batch_id="batch-aaa", item_identifier="123", workflow_name="test", - dspace_metadata=dspace_metadata, + dspace_metadata=item_submission_dspace_metadata, bitstream_s3_uris=[ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -203,12 +127,6 @@ def item_submission_instance(dspace_metadata): ) -@pytest.fixture -def metadata_mapping(): - with open("tests/fixtures/test_metadata_mapping.json") as mapping_file: - return json.load(mapping_file) - - @pytest.fixture def mocked_item_submission_db(config_instance): with mock_aws(): @@ -244,13 +162,13 @@ def mocked_s3(config_instance): @pytest.fixture -def mocked_s3_simple_csv(mocked_s3, item_metadata): +def mocked_s3_simple_csv(mocked_s3, item_submission_source_metadata): # write in-memory metadata CSV file csv_buffer = StringIO() - fieldnames = item_metadata.keys() + fieldnames = item_submission_source_metadata.keys() writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames) writer.writeheader() - writer.writerows([item_metadata]) + writer.writerows([item_submission_source_metadata]) # seek to the beginning of the in-memory file before uploading csv_buffer.seek(0) @@ -287,15 +205,6 @@ def mocked_sqs_output(): yield sqs -@pytest.fixture -def opencourseware_source_metadata(): - with ( - zipfile.ZipFile("tests/fixtures/opencourseware/123.zip", "r") as zip_file, - zip_file.open("data.json") as file, - ): - return json.load(file) - - @pytest.fixture def result_message_attributes(): return { @@ -399,3 +308,26 @@ def submission_message_body(): ], } ) + + +######################### +# OpenCourseWare fixtures +######################### + + +@pytest.fixture +def opencourseware_itemsubmission_source_metadata(): + with ( + zipfile.ZipFile("tests/fixtures/opencourseware/123.zip", "r") as zip_file, + zip_file.open("data.json") as file, + ): + return json.load(file) + + +@pytest.fixture +def opencourseware_itemsubmission_dspace_metadata( + opencourseware_itemsubmission_source_metadata, +): + return OpenCourseWareTransformer.transform( + opencourseware_itemsubmission_source_metadata + ) diff --git a/tests/fixtures/test_metadata_mapping.json b/tests/fixtures/test_metadata_mapping.json deleted file mode 100644 index 303a38d..0000000 --- a/tests/fixtures/test_metadata_mapping.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "dc.title": { - "source_field_name": "title", - "language": "en_US", - "required": true - }, - "dc.contributor": { - "source_field_name": "contributor", - "delimiter": "|" - }, - "dc.subject": { - "source_field_name": "topics" - } -} \ No newline at end of file diff --git a/tests/fixtures/workflows/__init__.py b/tests/fixtures/workflows/__init__.py new file mode 100644 index 0000000..7f4c731 --- /dev/null +++ b/tests/fixtures/workflows/__init__.py @@ -0,0 +1,7 @@ +from tests.fixtures.workflows.localtest import ( + TestOpenCourseWare, + TestSimpleCSV, + TestWorkflow, +) + +__all__ = ["TestOpenCourseWare", "TestSimpleCSV", "TestWorkflow"] diff --git a/tests/fixtures/workflows/localtest/__init__.py b/tests/fixtures/workflows/localtest/__init__.py new file mode 100644 index 0000000..e98aad8 --- /dev/null +++ b/tests/fixtures/workflows/localtest/__init__.py @@ -0,0 +1,9 @@ +"""test package.""" + +from tests.fixtures.workflows.localtest.workflow import ( + TestOpenCourseWare, + TestSimpleCSV, + TestWorkflow, +) + +__all__ = ["TestOpenCourseWare", "TestSimpleCSV", "TestWorkflow"] diff --git a/tests/fixtures/workflows/localtest/transformer.py b/tests/fixtures/workflows/localtest/transformer.py new file mode 100644 index 0000000..14c7a80 --- /dev/null +++ b/tests/fixtures/workflows/localtest/transformer.py @@ -0,0 +1,23 @@ +from collections.abc import Iterable + +from dsc.workflows.base import Transformer + + +class TestTransformer(Transformer): + + @classmethod + def dc_title(cls, source_metadata) -> str: + return source_metadata["title"] + + @classmethod + def dc_date_issued(cls, source_metadata: dict) -> str: + return source_metadata["date"] + + @classmethod + def dc_contributor_author(cls, source_metadata: dict) -> list[str]: + return source_metadata["contributor"].split("|") + + @classmethod + def optional_fields(cls) -> Iterable[str]: + """Dublin Core metadata fields.""" + return ["dc.contributor.author"] diff --git a/tests/fixtures/workflows/localtest/workflow.py b/tests/fixtures/workflows/localtest/workflow.py new file mode 100644 index 0000000..afc7518 --- /dev/null +++ b/tests/fixtures/workflows/localtest/workflow.py @@ -0,0 +1,84 @@ +from dsc.item_submission import ItemSubmission +from dsc.workflows.base import Workflow +from dsc.workflows.opencourseware import OpenCourseWare +from dsc.workflows.simple_csv import SimpleCSV +from tests.fixtures.workflows.localtest.transformer import TestTransformer + + +class TestWorkflow(Workflow): + + workflow_name: str = "test" + submission_system: str = "Test@MIT" + + @property + def metadata_transformer(self) -> type[TestTransformer]: + return TestTransformer + + @property + def output_queue(self) -> str: + return "mock-output-queue" + + def get_batch_bitstream_uris(self) -> list[str]: + return [ + "s3://dsc/test/batch-aaa/123_01.pdf", + "s3://dsc/test/batch-aaa/123_02.pdf", + "s3://dsc/test/batch-aaa/789_01.pdf", + ] + + def item_metadata_iter(self): + yield from [ + { + "title": "Title", + "contributor": "Author 1|Author 2", + "item_identifier": "123", + }, + { + "title": "2nd Title", + "contributor": "Author 3|Author 4", + "item_identifier": "789", + }, + ] + + def _prepare_batch(self, *, synced: bool = False): # noqa: ARG002 + return ( + [ + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="test", + ), + ItemSubmission( + batch_id="batch-aaa", + item_identifier="789", + workflow_name="test", + ), + ], + [], + ) + + +class TestOpenCourseWare(OpenCourseWare): + + submission_system: str = "Test@MIT" + + @property + def output_queue(self) -> str: + return "mock-output-queue" + + +class TestSimpleCSV(SimpleCSV): + + workflow_name = "simple-csv" + submission_system: str = "Test@MIT" + + @property + def metadata_transformer(self) -> type[TestTransformer]: + return TestTransformer + + @property + def item_identifier_column_names(self) -> list[str]: + return ["item_identifier", "filename"] + + @property + def output_queue(self) -> str: + return "mock-output-queue" diff --git a/tests/test_cli.py b/tests/test_cli.py index 1f1aef6..bec8cd4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -122,9 +122,25 @@ def test_submit_success( s3_client, ): caplog.set_level("DEBUG") + + # mock upload bitstreams s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_001.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_002.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_001.pdf") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -161,15 +177,7 @@ def test_submit_success( "for batch 'batch-aaa'" ) in caplog.text assert "Preparing submission for item: 123" in caplog.text - assert ( - "Metadata uploaded to S3: s3://dsc/test/batch-aaa/dspace_metadata/123_metadata.json" - in caplog.text - ) assert "Preparing submission for item: 789" in caplog.text - assert ( - "Metadata uploaded to S3: s3://dsc/test/batch-aaa/dspace_metadata/789_metadata.json" - in caplog.text - ) assert json.dumps(expected_submission_summary) in caplog.text assert "Application exiting" in caplog.text assert "Total time elapsed" in caplog.text diff --git a/tests/test_item_submission.py b/tests/test_item_submission.py index dae6e80..ecf7b93 100644 --- a/tests/test_item_submission.py +++ b/tests/test_item_submission.py @@ -7,15 +7,15 @@ from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus from dsc.exceptions import ( DSpaceMetadataUploadError, - InvalidDSpaceMetadataError, - ItemMetadatMissingRequiredFieldError, SQSMessageSendError, ) from dsc.item_submission import ItemSubmission -def test_itemsubmission_init_success(item_submission_instance, dspace_metadata): - assert item_submission_instance.dspace_metadata == dspace_metadata +def test_itemsubmission_init_success( + item_submission_instance, item_submission_dspace_metadata +): + assert item_submission_instance.dspace_metadata == item_submission_dspace_metadata assert item_submission_instance.bitstream_s3_uris == [ "s3://dsc/workflow/folder/123_01.pdf", "s3://dsc/workflow/folder/123_02.pdf", @@ -155,65 +155,41 @@ def test_itemsubmission_ready_to_submit_with_ingest_failed(item_submission_insta assert item_submission_instance.ready_to_submit() is True -def test_itemsubmission_create_dspace_metadata_success( - item_submission_instance, item_metadata, metadata_mapping +def test_itemsubmission_create_dspace_metadataentry_success( + item_submission_instance, ): - item_metadata["topics"] = [ + item_submission_instance.dspace_metadata["dc.subject"] = [ "Topic Header - Topic Subheading - Topic Name", "Topic Header 2 - Topic Subheading 2 - Topic Name 2", ] - item_submission_instance.create_dspace_metadata(item_metadata, metadata_mapping) - assert item_submission_instance.dspace_metadata == { + assert item_submission_instance._create_dspace_metadataentry() == { # noqa: SLF001 "metadata": [ - {"key": "dc.title", "language": "en_US", "value": "Title"}, - {"key": "dc.contributor", "language": None, "value": "Author 1"}, - {"key": "dc.contributor", "language": None, "value": "Author 2"}, + {"key": "dc.title", "value": "Title"}, + {"key": "dc.date.issued", "value": "2026"}, + {"key": "dc.contributor.author", "value": "Author 1"}, + {"key": "dc.contributor.author", "value": "Author 2"}, { "key": "dc.subject", - "language": None, "value": "Topic Header - Topic Subheading - Topic Name", }, { "key": "dc.subject", - "language": None, "value": "Topic Header 2 - Topic Subheading 2 - Topic Name 2", }, ] } -def test_itemsubmission_create_dspace_metadata_required_field_missing_raises_exception( - item_submission_instance, item_metadata, metadata_mapping -): - item_metadata.pop("title") - with pytest.raises(ItemMetadatMissingRequiredFieldError): - item_submission_instance.create_dspace_metadata(item_metadata, metadata_mapping) - - -def test_itemsubmission_validate_dspace_metadata_success( - item_submission_instance, - dspace_metadata, -): - item_submission_instance.dspace_metadata = dspace_metadata - assert item_submission_instance.validate_dspace_metadata() - - -def test_itemsubmission_validate_dspace_metadata_invalid_raises_exception( - item_submission_instance, -): - item_submission_instance.dspace_metadata = {} - with pytest.raises(InvalidDSpaceMetadataError): - item_submission_instance.validate_dspace_metadata() - - def test_itemsubmission_upload_dspace_metadata_success( mocked_s3, item_submission_instance, s3_client ): - item_submission_instance.upload_dspace_metadata("dsc", "workflow/folder/") assert ( - item_submission_instance.metadata_s3_uri + item_submission_instance._upload_dspace_metadata( # noqa: SLF001 + "dsc", "workflow/folder/" + ) == "s3://dsc/workflow/folder/dspace_metadata/123_metadata.json" ) + response = s3_client.client.get_object( Bucket="dsc", Key="workflow/folder/dspace_metadata/123_metadata.json" ) @@ -236,7 +212,7 @@ def test_itemsubmission_upload_dspace_metadata_raises_custom_exception( with pytest.raises( DSpaceMetadataUploadError, match="The specified bucket does not exist" ): - item_submission_instance.upload_dspace_metadata( + item_submission_instance._upload_dspace_metadata( # noqa: SLF001 bucket="dsc", prefix="test/batch-bbb/" ) diff --git a/tests/test_transformer_opencourseware.py b/tests/test_transformer_opencourseware.py index 8992f40..fabec28 100644 --- a/tests/test_transformer_opencourseware.py +++ b/tests/test_transformer_opencourseware.py @@ -1,8 +1,10 @@ from dsc.workflows.opencourseware import OpenCourseWareTransformer -def test_opencourseware_transform_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.transform(opencourseware_source_metadata) == { +def test_opencourseware_transform_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.transform( + opencourseware_itemsubmission_source_metadata + ) == { "dc.title": "14.02 Principles of Macroeconomics, Fall 2004", "dc.date.issued": "2004", "dc.description.abstract": ( @@ -36,49 +38,60 @@ def test_opencourseware_transform_success(opencourseware_source_metadata): } -def test_opencourseware_dc_title_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.dc_title(opencourseware_source_metadata) == ( - "14.02 Principles of Macroeconomics, Fall 2004" - ) +def test_opencourseware_dc_title_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.dc_title( + opencourseware_itemsubmission_source_metadata + ) == ("14.02 Principles of Macroeconomics, Fall 2004") def test_opencourseware_dc_title_if_multi_extra_course_numbers_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): - opencourseware_source_metadata["extra_course_numbers"] = "14.027J,14.006" - - assert OpenCourseWareTransformer.dc_title(opencourseware_source_metadata) == ( - "14.02 / 14.027J / 14.006 Principles of Macroeconomics, Fall 2004" + opencourseware_itemsubmission_source_metadata["extra_course_numbers"] = ( + "14.027J,14.006" ) + assert OpenCourseWareTransformer.dc_title( + opencourseware_itemsubmission_source_metadata + ) == ("14.02 / 14.027J / 14.006 Principles of Macroeconomics, Fall 2004") + -def test_opencourseware_dc_date_issued_success(opencourseware_source_metadata): +def test_opencourseware_dc_date_issued_success( + opencourseware_itemsubmission_source_metadata, +): assert ( - OpenCourseWareTransformer.dc_date_issued(opencourseware_source_metadata) == "2004" + OpenCourseWareTransformer.dc_date_issued( + opencourseware_itemsubmission_source_metadata + ) + == "2004" ) -def test_opencourseware_dc_description_abstract(opencourseware_source_metadata): +def test_opencourseware_dc_description_abstract( + opencourseware_itemsubmission_source_metadata, +): assert isinstance( - OpenCourseWareTransformer.dc_description_abstract(opencourseware_source_metadata), + OpenCourseWareTransformer.dc_description_abstract( + opencourseware_itemsubmission_source_metadata + ), str, ) def test_opencourseware_dc_contributor_author_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): assert OpenCourseWareTransformer.dc_contributor_author( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Caballero, Ricardo"] def test_opencourseware_dc_contributor_author_if_any_names_empty_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): # the first four entries in the list below result in an empty name ("") # only the last entry is included - opencourseware_source_metadata["instructors"].extend( + opencourseware_itemsubmission_source_metadata["instructors"].extend( [ {}, # all fields missing {"middle_initial": "E."}, # all required fields missing @@ -88,49 +101,61 @@ def test_opencourseware_dc_contributor_author_if_any_names_empty_success( ] ) assert OpenCourseWareTransformer.dc_contributor_author( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == [ "Caballero, Ricardo", "Burger, Cheese E.", ] -def test_opencourseware_dc_contributor_department_success(opencourseware_source_metadata): - assert opencourseware_source_metadata["department_numbers"] == ["14"] +def test_opencourseware_dc_contributor_department_success( + opencourseware_itemsubmission_source_metadata, +): + assert opencourseware_itemsubmission_source_metadata["department_numbers"] == ["14"] assert OpenCourseWareTransformer.dc_contributor_department( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Massachusetts Institute of Technology. Department of Economics"] def test_opencourseware_creativework_learningresourcetype_success( - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, ): assert OpenCourseWareTransformer.creativework_learningresourcetype( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Problem Sets with Solutions", "Exams with Solutions", "Lecture Notes"] -def test_opencourseware_dc_subject_success(opencourseware_source_metadata): - assert OpenCourseWareTransformer.dc_subject(opencourseware_source_metadata) == [ +def test_opencourseware_dc_subject_success(opencourseware_itemsubmission_source_metadata): + assert OpenCourseWareTransformer.dc_subject( + opencourseware_itemsubmission_source_metadata + ) == [ "Social Science - Economics - International Economics", "Social Science - Economics - Macroeconomics", ] -def test_opencourseware_dc_identifier_other_success(opencourseware_source_metadata): +def test_opencourseware_dc_identifier_other_success( + opencourseware_itemsubmission_source_metadata, +): assert OpenCourseWareTransformer.dc_identifier_other( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["14.02", "14.02-Fall2004"] -def test_opencourseware_dc_coverage_temporal_success(opencourseware_source_metadata): +def test_opencourseware_dc_coverage_temporal_success( + opencourseware_itemsubmission_source_metadata, +): assert ( - OpenCourseWareTransformer.dc_coverage_temporal(opencourseware_source_metadata) + OpenCourseWareTransformer.dc_coverage_temporal( + opencourseware_itemsubmission_source_metadata + ) == "Fall 2004" ) -def test_opencourseware_dc_audience_educationlevel(opencourseware_source_metadata): +def test_opencourseware_dc_audience_educationlevel( + opencourseware_itemsubmission_source_metadata, +): assert OpenCourseWareTransformer.dc_audience_educationlevel( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) == ["Undergraduate"] diff --git a/tests/test_workflow_base.py b/tests/test_workflow_base.py index a55d8b5..a9f7eae 100644 --- a/tests/test_workflow_base.py +++ b/tests/test_workflow_base.py @@ -3,13 +3,10 @@ from unittest.mock import patch import pytest -from botocore.exceptions import ClientError from freezegun import freeze_time from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus -from dsc.exceptions import ( - InvalidWorkflowNameError, -) +from dsc.exceptions import InvalidWorkflowNameError, SQSMessageSendError from dsc.reports import FinalizeReport from dsc.workflows.base import Workflow @@ -19,10 +16,6 @@ def test_base_workflow_init_with_defaults_success(): workflow_instance = workflow_class(batch_id="batch-aaa") assert workflow_instance.workflow_name == "test" assert workflow_instance.submission_system == "Test@MIT" - assert ( - workflow_instance.metadata_mapping_path - == "tests/fixtures/test_metadata_mapping.json" - ) assert workflow_instance.batch_id == "batch-aaa" assert workflow_instance.s3_bucket == "dsc" assert workflow_instance.output_queue == "mock-output-queue" @@ -44,7 +37,7 @@ def test_base_workflow_get_workflow_invalid_workflow_name_raises_error( def test_base_workflow_create_batch_in_db_success( base_workflow_instance, mocked_item_submission_db ): - item_submissions, _ = base_workflow_instance.prepare_batch() + item_submissions, _ = base_workflow_instance._prepare_batch() # noqa: SLF001 base_workflow_instance._create_batch_in_db(item_submissions) # noqa: SLF001 item_submission = ItemSubmissionDB.get(hash_key="batch-aaa", range_key="123") @@ -62,9 +55,25 @@ def test_base_workflow_submit_items_success( mocked_item_submission_db, ): caplog.set_level("DEBUG") + + # mock upload bitstreams s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.pdf") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -77,6 +86,7 @@ def test_base_workflow_submit_items_success( workflow_name="test", status=ItemSubmissionStatus.BATCH_CREATED, ).create() + items = base_workflow_instance.submit_items(collection_handle="123.4/5678") expected_submission_summary = {"total": 2, "submitted": 2, "skipped": 0, "errors": 0} @@ -85,15 +95,30 @@ def test_base_workflow_submit_items_success( assert json.dumps(expected_submission_summary) in caplog.text -def test_base_workflow_submit_items_failed_ready_to_submit_is_skipped( +def test_base_workflow_submit_items_skips_ingested_item( caplog, base_workflow_instance, + s3_client, mocked_s3, mocked_sqs_input, mocked_sqs_output, mocked_item_submission_db, ): caplog.set_level("DEBUG") + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", @@ -123,6 +148,7 @@ def test_base_workflow_submit_items_exceptions_handled( mocked_method, caplog, base_workflow_instance, + s3_client, mocked_s3, mocked_sqs_input, mocked_sqs_output, @@ -130,17 +156,23 @@ def test_base_workflow_submit_items_exceptions_handled( ): side_effect = [ {"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}}, - ClientError( - { - "Error": { - "Code": "InvalidParameterValue", - "Message": "The specified S3 bucket does not exist.", - } - }, - "SendMessage", - ), + SQSMessageSendError, ] mocked_method.side_effect = side_effect + + # mock upload dspace metadata json + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/123_metadata.json", + ) + s3_client.put_file( + file_content="", + bucket="dsc", + key="test/batch-aaa/dspace_metadata/789_metadata.json", + ) + + # mock current state of Dynamodb table ItemSubmissionDB( item_identifier="123", batch_id="batch-aaa", diff --git a/tests/test_workflow_opencourseware.py b/tests/test_workflow_opencourseware.py index 1b6d88a..de69e22 100644 --- a/tests/test_workflow_opencourseware.py +++ b/tests/test_workflow_opencourseware.py @@ -6,12 +6,15 @@ from dsc.item_submission import ItemSubmission -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") -def test_workflow_ocw_metadata_mapping_dspace_metadata_success( +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) +def test_workflow_ocw_prepare_batch_success( mock_opencourseware_read_metadata_from_zip_file, - caplog, + mocked_item_submission_db, mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, + opencourseware_itemsubmission_dspace_metadata, opencourseware_workflow_instance, s3_client, ): @@ -21,94 +24,31 @@ def test_workflow_ocw_metadata_mapping_dspace_metadata_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) - item_submission = ItemSubmission( - batch_id="aaa", item_identifier="123", workflow_name="opencourseware" - ) - item_submission.create_dspace_metadata( - item_metadata=next(opencourseware_workflow_instance.item_metadata_iter()), - metadata_mapping=opencourseware_workflow_instance.metadata_mapping, + assert opencourseware_workflow_instance._prepare_batch() == ( + [ + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="opencourseware", + dspace_metadata=opencourseware_itemsubmission_dspace_metadata, + ) + ], + [], ) - assert item_submission.dspace_metadata["metadata"] == [ - { - "key": "dc.title", - "value": "14.02 Principles of Macroeconomics, Fall 2004", - "language": None, - }, - {"key": "dc.date.issued", "value": "2004", "language": None}, - { - "key": "dc.description.abstract", - "value": ( - "This course provides an overview of the following macroeconomic " - "issues: the determination of output, employment, unemployment, " - "interest rates, and inflation. Monetary and fiscal policies are " - "discussed, as are public debt and international economic issues. " - "This course also introduces basic models of macroeconomics and " - "illustrates principles with the experience of the United States " - "and other economies.\n" - ), - "language": None, - }, - {"key": "dc.contributor.author", "value": "Caballero, Ricardo", "language": None}, - { - "key": "dc.contributor.department", - "value": "Massachusetts Institute of Technology. Department of Economics", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Problem Sets with Solutions", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Exams with Solutions", - "language": None, - }, - { - "key": "creativework.learningresourcetype", - "value": "Lecture Notes", - "language": None, - }, - { - "key": "dc.subject", - "value": "Social Science - Economics - International Economics", - "language": None, - }, - { - "key": "dc.subject", - "value": "Social Science - Economics - Macroeconomics", - "language": None, - }, - {"key": "dc.identifier.other", "value": "14.02", "language": None}, - {"key": "dc.identifier.other", "value": "14.02-Fall2004", "language": None}, - {"key": "dc.coverage.temporal", "value": "Fall 2004", "language": None}, - {"key": "dc.audience.educationlevel", "value": "Undergraduate", "language": None}, - {"key": "dc.type", "value": "Learning Object", "language": None}, - { - "key": "dc.rights", - "value": "Attribution-NonCommercial-NoDerivs 4.0 United States", - "language": None, - }, - { - "key": "dc.rights.uri", - "value": "https://creativecommons.org/licenses/by-nc-nd/4.0/deed.en", - "language": None, - }, - {"key": "dc.language.iso", "value": "en_US", "language": None}, - ] - - -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") -def test_workflow_ocw_prepare_batch_success( + +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) +def test_workflow_ocw_itemsubmission_create_dspace_metadataentry_success( mock_opencourseware_read_metadata_from_zip_file, - mocked_item_submission_db, + caplog, mocked_s3, - opencourseware_source_metadata, - opencourseware_workflow_instance, + opencourseware_itemsubmission_source_metadata, + opencourseware_itemsubmission_dspace_metadata, s3_client, ): s3_client.put_file( @@ -117,27 +57,92 @@ def test_workflow_ocw_prepare_batch_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) - assert opencourseware_workflow_instance.prepare_batch() == ( - [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "opencourseware", - } - ], - [], + item_submission = ItemSubmission( + batch_id="aaa", + item_identifier="123", + workflow_name="opencourseware", + dspace_metadata=opencourseware_itemsubmission_dspace_metadata, ) + assert item_submission._create_dspace_metadataentry() == { + "metadata": [ + { + "key": "dc.title", + "value": "14.02 Principles of Macroeconomics, Fall 2004", + }, + {"key": "dc.date.issued", "value": "2004"}, + { + "key": "dc.description.abstract", + "value": ( + "This course provides an overview of the following macroeconomic " + "issues: the determination of output, employment, unemployment, " + "interest rates, and inflation. Monetary and fiscal policies are " + "discussed, as are public debt and international economic issues. " + "This course also introduces basic models of macroeconomics and " + "illustrates principles with the experience of the United States " + "and other economies.\n" + ), + }, + { + "key": "dc.contributor.author", + "value": "Caballero, Ricardo", + }, + { + "key": "dc.contributor.department", + "value": "Massachusetts Institute of Technology. Department of Economics", + }, + { + "key": "creativework.learningresourcetype", + "value": "Problem Sets with Solutions", + }, + { + "key": "creativework.learningresourcetype", + "value": "Exams with Solutions", + }, + { + "key": "creativework.learningresourcetype", + "value": "Lecture Notes", + }, + { + "key": "dc.subject", + "value": "Social Science - Economics - International Economics", + }, + { + "key": "dc.subject", + "value": "Social Science - Economics - Macroeconomics", + }, + {"key": "dc.identifier.other", "value": "14.02"}, + {"key": "dc.identifier.other", "value": "14.02-Fall2004"}, + {"key": "dc.coverage.temporal", "value": "Fall 2004"}, + { + "key": "dc.audience.educationlevel", + "value": "Undergraduate", + }, + {"key": "dc.type", "value": "Learning Object"}, + { + "key": "dc.rights", + "value": "Attribution-NonCommercial-NoDerivs 4.0 United States", + }, + { + "key": "dc.rights.uri", + "value": "https://creativecommons.org/licenses/by-nc-nd/4.0/deed.en", + }, + {"key": "dc.language.iso", "value": "en_US"}, + ] + } + -@patch("dsc.workflows.opencourseware.OpenCourseWare._read_metadata_from_zip_file") +@patch( + "dsc.workflows.opencourseware.workflow.OpenCourseWare._read_metadata_from_zip_file" +) def test_workflow_ocw_item_metadata_iter_success( mock_opencourseware_read_metadata_from_zip_file, caplog, mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, opencourseware_workflow_instance, s3_client, ): @@ -147,7 +152,7 @@ def test_workflow_ocw_item_metadata_iter_success( key="opencourseware/batch-aaa/123.zip", ) mock_opencourseware_read_metadata_from_zip_file.return_value = ( - opencourseware_source_metadata + opencourseware_itemsubmission_source_metadata ) assert next(opencourseware_workflow_instance.item_metadata_iter()) == { "item_identifier": "123", @@ -186,7 +191,7 @@ def test_workflow_ocw_item_metadata_iter_success( def test_workflow_ocw_read_metadata_from_zip_file_success( mocked_s3, - opencourseware_source_metadata, + opencourseware_itemsubmission_source_metadata, opencourseware_workflow_instance, ): """Read source metadata JSON file from test zip file. @@ -205,7 +210,7 @@ def test_workflow_ocw_read_metadata_from_zip_file_success( opencourseware_workflow_instance._read_metadata_from_zip_file( "s3://dsc/opencourseware/batch-aaa/123.zip" ) - == opencourseware_source_metadata + == opencourseware_itemsubmission_source_metadata ) diff --git a/tests/test_workflow_simple_csv.py b/tests/test_workflow_simple_csv.py index dcf7eef..291b23e 100644 --- a/tests/test_workflow_simple_csv.py +++ b/tests/test_workflow_simple_csv.py @@ -2,6 +2,8 @@ import pandas as pd +from dsc.item_submission import ItemSubmission + def test_workflow_simple_csv_prepare_batch_success( mocked_s3_simple_csv, @@ -18,13 +20,21 @@ def test_workflow_simple_csv_prepare_batch_success( bucket="dsc", key="simple-csv/batch-aaa/123_002.pdf", ) - assert simple_csv_workflow_instance.prepare_batch() == ( + assert simple_csv_workflow_instance._prepare_batch() == ( # noqa: SLF001 [ - { - "batch_id": "batch-aaa", - "item_identifier": "123", - "workflow_name": "simple-csv", - } + ItemSubmission( + batch_id="batch-aaa", + item_identifier="123", + workflow_name="simple-csv", + dspace_metadata={ + "dc.contributor.author": [ + "Author 1", + "Author 2", + ], + "dc.date.issued": "2026", + "dc.title": "Title", + }, + ) ], [], ) @@ -34,26 +44,31 @@ def test_workflow_simple_csv_prepare_batch_track_errors( mocked_s3_simple_csv, simple_csv_workflow_instance, ): - assert simple_csv_workflow_instance.prepare_batch() == ( + assert simple_csv_workflow_instance._prepare_batch() == ( # noqa: SLF001 [], [("123", "No bitstreams found for the item submission")], ) def test_workflow_simple_csv_item_metadata_iter_success( - simple_csv_workflow_instance, mocked_s3_simple_csv, item_metadata + simple_csv_workflow_instance, mocked_s3_simple_csv, item_submission_dspace_metadata ): metadata_iter = simple_csv_workflow_instance.item_metadata_iter( metadata_file="metadata.csv" ) - assert next(metadata_iter) == item_metadata + assert next(metadata_iter) == item_submission_dspace_metadata def test_workflow_simple_csv_item_metadata_iter_processing_success( simple_csv_workflow_instance, mocked_s3 ): metadata_df = pd.DataFrame( - {"filename": ["123.pdf", "456.pdf"], "TITLE": ["Cheeses of the World", ""]} + { + "filename": ["123.pdf", "456.pdf"], + "TITLE": ["Cheeses of the World", ""], + "date": [2026, 2026], + "contributor": ["Author 1", "Author 2"], + } ) # upload to mocked S3 bucket @@ -69,8 +84,18 @@ def test_workflow_simple_csv_item_metadata_iter_processing_success( metadata_file="metadata.csv" ) assert list(metadata_iter) == [ - {"item_identifier": "123.pdf", "title": "Cheeses of the World"}, - {"item_identifier": "456.pdf", "title": None}, + { + "item_identifier": "123.pdf", + "dc.title": "Cheeses of the World", + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 1"], + }, + { + "item_identifier": "456.pdf", + "dc.title": None, + "dc.date.issued": "2026", + "dc.contributor.author": ["Author 2"], + }, ] diff --git a/uv.lock b/uv.lock index f22704e..5ea6a39 100644 --- a/uv.lock +++ b/uv.lock @@ -423,7 +423,7 @@ sdist = { url = "https://files.pythonhosted.org/packages/a2/55/8f8cab2afd404cf57 [[package]] name = "dsc" -version = "1.0" +version = "1.3" source = { editable = "." } dependencies = [ { name = "boto3" },