Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dsc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class InvalidWorkflowNameError(Exception):
pass


class ItemMetadatMissingRequiredFieldError(Exception):
class ItemMetadataMissingRequiredFieldError(Exception):
pass


Expand Down
164 changes: 59 additions & 105 deletions dsc/item_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
from collections import defaultdict
from dataclasses import dataclass, fields
from typing import TYPE_CHECKING, Any

Expand All @@ -12,8 +13,7 @@
from dsc.db.models import ITEM_SUBMISSION_LOG_STR, ItemSubmissionDB, ItemSubmissionStatus
from dsc.exceptions import (
DSpaceMetadataUploadError,
InvalidDSpaceMetadataError,
ItemMetadatMissingRequiredFieldError,
ItemMetadataMissingRequiredFieldError,
SQSMessageSendError,
)
from dsc.utilities.aws.s3 import S3Client
Expand Down Expand Up @@ -176,9 +176,9 @@ def save(self) -> None:
)
item_submission_db.create()

logger.info(f"Saved record " f"{ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")
logger.info(f"Saved record {ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")

def upsert_db(self) -> None:
"""Upsert a record in DynamoDB from ItemSubmission.
Expand All @@ -195,9 +195,9 @@ def upsert_db(self) -> None:
)
item_submission_db.save()

logger.info(f"Upserted record " f"{ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")
logger.info(f"Upserted record {ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")

def ready_to_submit(self) -> bool:
"""Check if the item submission is ready to be submitted."""
Expand All @@ -209,32 +209,31 @@ def ready_to_submit(self) -> bool:
case ItemSubmissionStatus.INGEST_SUCCESS:
logger.info(
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "already ingested, skipping submission"
item_identifier=self.item_identifier)} "
"already ingested, skipping submission"
)
case ItemSubmissionStatus.SUBMIT_SUCCESS:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " " already submitted, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
" already submitted, skipping submission"
)
case ItemSubmissionStatus.MAX_RETRIES_REACHED:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "max retries reached, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
"max retries reached, skipping submission"
)
case None:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " " status unknown, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
"status unknown, skipping submission"
)
case _:
logger.debug(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "allowed for submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} allowed for submission"
)
ready_to_submit = True

Expand All @@ -248,93 +247,48 @@ def _exceeded_retry_threshold(self) -> None:
if self.ingest_attempts > CONFIG.retry_threshold:
self.status = ItemSubmissionStatus.MAX_RETRIES_REACHED

def prepare_dspace_metadata(
self, metadata_mapping: dict, item_metadata: dict, s3_bucket: str, batch_path: str
) -> None:
def prepare_dspace_metadata(self, s3_bucket: str, batch_path: str) -> str:
"""Prepare DSpace metadata for the item submission."""
self.create_dspace_metadata(
item_metadata=item_metadata,
metadata_mapping=metadata_mapping,
)
self.validate_dspace_metadata()
self.upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path)

def create_dspace_metadata(
self, item_metadata: dict[str, Any], metadata_mapping: dict
) -> None:
"""Create DSpace metadata from the item's source metadata.

A metadata mapping is a dict with the format seen below:

{
"dc.contributor": {
"source_field_name": "contributor",
"language": "<language>",
"delimiter": "<delimiting character>",
"required": true | false
}
}

When setting up the metadata mapping JSON file, "language" and "delimiter"
can be omitted from the file if not applicable. Required fields ("item_identifier"
and "title") must be set as required (true); if "required" is not listed as a
a config, the field defaults as not required (false).
self.dspace_metadata = self._create_dspace_metadataentry()
return self._upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path)

Args:
item_metadata: Item metadata from which the DSpace metadata will be derived.
metadata_mapping: A mapping of DSpace metadata fields to source metadata
fields.
"""
metadata_entries = []
for field_name, field_mapping in metadata_mapping.items():
if field_name not in ["item_identifier"]:

field_value = item_metadata.get(field_mapping["source_field_name"])
if not field_value and field_mapping.get("required", False):
raise ItemMetadatMissingRequiredFieldError(
"Item metadata missing required field: '"
f"{field_mapping["source_field_name"]}'"
)

if field_value:
if isinstance(field_value, list):
field_values = field_value
elif delimiter := field_mapping.get("delimiter"):
field_values = field_value.split(delimiter)
else:
field_values = [field_value]

metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": field_mapping.get("language"),
}
for value in field_values
]
)
self.dspace_metadata = {"metadata": metadata_entries}

def validate_dspace_metadata(self) -> bool:
"""Validate that DSpace metadata follows the expected format for DSpace 6.x.
def _create_dspace_metadataentry(self) -> None | dict:
"""Format transformed metadata for DSpace REST API.

Args:
dspace_metadata: DSpace metadata to be validated.
This method follows the specs for a 'MetadataEntry Object' for
the DSpace 6 REST API:
https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-MetadataEntryO
Comment on lines +258 to +260
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did confirm that the MetadataEntry concept is not reflected in the DSpace 7+ REST API (REST contract) so I have reservations about a refactor incorporating DSpace 6 structures when we're so close to DSpace 8.

However, reviewing the REST contract did reveal that we can access metadata schemas which I think will be critical for meaningful metadata validation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, see #213 (comment). 🤔

Similar to our approach for DSS, DSC will need to support DS6 and DS8 until the migration is complete. I feel confident that aligning how we transform metadata / generate metadata required for DSpace REST API across all our workflows will make it easier to solve the case of supporting multiple DSpace versions!

"""
valid = False
if self.dspace_metadata and self.dspace_metadata.get("metadata") is not None:
for element in self.dspace_metadata["metadata"]:
if element.get("key") is not None and element.get("value") is not None:
valid = True
logger.debug("Valid DSpace metadata created")
else:
raise InvalidDSpaceMetadataError(
f"Invalid DSpace metadata created: {self.dspace_metadata} ",
)
return valid
if not self.dspace_metadata:
return None

metadataentry = defaultdict(list)

for field, value in self.dspace_metadata.items():
if field == "item_identifier":
continue

if not value and field in ["dc.title", "dc.date.issued"]:
raise ItemMetadataMissingRequiredFieldError(
f"Item metadata missing required field: {field}"
)

if value:
value_list = value if isinstance(value, list) else [value]

metadataentry["metadata"].extend(
[
{
"key": field,
"value": _value,
}
for _value in value_list
]
)

return dict(metadataentry)

def upload_dspace_metadata(self, bucket: str, prefix: str) -> None:
def _upload_dspace_metadata(self, bucket: str, prefix: str) -> str:
"""Upload DSpace metadata to S3 using the specified bucket and keyname.

Args:
Expand All @@ -361,7 +315,7 @@ def upload_dspace_metadata(self, bucket: str, prefix: str) -> None:

metadata_s3_uri = f"s3://{bucket}/{metadata_s3_key}"
logger.info(f"Metadata uploaded to S3: {metadata_s3_uri}")
self.metadata_s3_uri = metadata_s3_uri
return metadata_s3_uri

def send_submission_message(
self,
Expand Down
2 changes: 1 addition & 1 deletion dsc/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from dsc.workflows.archivesspace import ArchivesSpace
from dsc.workflows.base import Workflow
from dsc.workflows.base.simple_csv import SimpleCSV
from dsc.workflows.opencourseware import OpenCourseWare
from dsc.workflows.sccs import SCCS
from dsc.workflows.simple_csv import SimpleCSV

__all__ = ["SCCS", "ArchivesSpace", "OpenCourseWare", "SimpleCSV", "Workflow"]
4 changes: 4 additions & 0 deletions dsc/workflows/archivesspace/__init__.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a version of this comment that seemed to fall in a black hole so repeating: I really like this organization, the previous approach worked but I appreciate having a workflow's files collocated like this!

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dsc.workflows.archivesspace.transformer import ArchivesSpaceTransformer
from dsc.workflows.archivesspace.workflow import ArchivesSpace

__all__ = ["ArchivesSpace", "ArchivesSpaceTransformer"]
20 changes: 20 additions & 0 deletions dsc/workflows/archivesspace/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from collections.abc import Iterable

from dsc.workflows.base import Transformer


class ArchivesSpaceTransformer(Transformer):
"""Transformer for OpenCourseWare (OCW) source metadata."""

@classmethod
def dc_contributor_author(cls, source_metadata: dict) -> list[str]:
return source_metadata["author"].split("|")

@classmethod
def optional_fields(cls) -> Iterable[str]:
return [
"dc.contributor.author",
"dc.description",
"dc.rights",
"dc.rights.uri",
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from dsc.db.models import ItemSubmissionStatus
from dsc.item_submission import ItemSubmission
from dsc.workflows.base.simple_csv import SimpleCSV
from dsc.workflows.archivesspace import ArchivesSpaceTransformer
from dsc.workflows.simple_csv import SimpleCSV

logger = logging.getLogger(__name__)

Expand All @@ -24,8 +25,8 @@ class ArchivesSpace(SimpleCSV):
submission_system: str = "Dome"

@property
def metadata_mapping_path(self) -> str:
return "dsc/workflows/metadata_mapping/archivesspace.json"
def metadata_transformer(self) -> type[ArchivesSpaceTransformer]:
return ArchivesSpaceTransformer

@property
def output_path(self) -> str:
Expand Down
Loading