Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dsc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class InvalidWorkflowNameError(Exception):
pass


class ItemMetadatMissingRequiredFieldError(Exception):
class ItemMetadataMissingRequiredFieldError(Exception):
pass


Expand Down
164 changes: 59 additions & 105 deletions dsc/item_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
from collections import defaultdict
from dataclasses import dataclass, fields
from typing import TYPE_CHECKING, Any

Expand All @@ -12,8 +13,7 @@
from dsc.db.models import ITEM_SUBMISSION_LOG_STR, ItemSubmissionDB, ItemSubmissionStatus
from dsc.exceptions import (
DSpaceMetadataUploadError,
InvalidDSpaceMetadataError,
ItemMetadatMissingRequiredFieldError,
ItemMetadataMissingRequiredFieldError,
SQSMessageSendError,
)
from dsc.utilities.aws.s3 import S3Client
Expand Down Expand Up @@ -176,9 +176,9 @@ def save(self) -> None:
)
item_submission_db.create()

logger.info(f"Saved record " f"{ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")
logger.info(f"Saved record {ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")

def upsert_db(self) -> None:
"""Upsert a record in DynamoDB from ItemSubmission.
Expand All @@ -195,9 +195,9 @@ def upsert_db(self) -> None:
)
item_submission_db.save()

logger.info(f"Upserted record " f"{ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")
logger.info(f"Upserted record {ITEM_SUBMISSION_LOG_STR.format(
batch_id=self.batch_id, item_identifier=self.item_identifier
)}")

def ready_to_submit(self) -> bool:
"""Check if the item submission is ready to be submitted."""
Expand All @@ -209,32 +209,31 @@ def ready_to_submit(self) -> bool:
case ItemSubmissionStatus.INGEST_SUCCESS:
logger.info(
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "already ingested, skipping submission"
item_identifier=self.item_identifier)} "
"already ingested, skipping submission"
)
case ItemSubmissionStatus.SUBMIT_SUCCESS:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " " already submitted, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
" already submitted, skipping submission"
)
case ItemSubmissionStatus.MAX_RETRIES_REACHED:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "max retries reached, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
"max retries reached, skipping submission"
)
case None:
logger.info(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " " status unknown, skipping submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} "
"status unknown, skipping submission"
)
case _:
logger.debug(
f"Record " f"{ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)
} " "allowed for submission"
f"Record {ITEM_SUBMISSION_LOG_STR.format(batch_id=self.batch_id,
item_identifier=self.item_identifier)} allowed for submission"
)
ready_to_submit = True

Expand All @@ -248,93 +247,48 @@ def _exceeded_retry_threshold(self) -> None:
if self.ingest_attempts > CONFIG.retry_threshold:
self.status = ItemSubmissionStatus.MAX_RETRIES_REACHED

def prepare_dspace_metadata(
self, metadata_mapping: dict, item_metadata: dict, s3_bucket: str, batch_path: str
) -> None:
def prepare_dspace_metadata(self, s3_bucket: str, batch_path: str) -> str:
"""Prepare DSpace metadata for the item submission."""
self.create_dspace_metadata(
item_metadata=item_metadata,
metadata_mapping=metadata_mapping,
)
self.validate_dspace_metadata()
self.upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path)

def create_dspace_metadata(
self, item_metadata: dict[str, Any], metadata_mapping: dict
) -> None:
"""Create DSpace metadata from the item's source metadata.

A metadata mapping is a dict with the format seen below:

{
"dc.contributor": {
"source_field_name": "contributor",
"language": "<language>",
"delimiter": "<delimiting character>",
"required": true | false
}
}

When setting up the metadata mapping JSON file, "language" and "delimiter"
can be omitted from the file if not applicable. Required fields ("item_identifier"
and "title") must be set as required (true); if "required" is not listed as a
a config, the field defaults as not required (false).
self.dspace_metadata = self._create_dspace_metadataentry()
return self._upload_dspace_metadata(bucket=s3_bucket, prefix=batch_path)

Args:
item_metadata: Item metadata from which the DSpace metadata will be derived.
metadata_mapping: A mapping of DSpace metadata fields to source metadata
fields.
"""
metadata_entries = []
for field_name, field_mapping in metadata_mapping.items():
if field_name not in ["item_identifier"]:

field_value = item_metadata.get(field_mapping["source_field_name"])
if not field_value and field_mapping.get("required", False):
raise ItemMetadatMissingRequiredFieldError(
"Item metadata missing required field: '"
f"{field_mapping["source_field_name"]}'"
)

if field_value:
if isinstance(field_value, list):
field_values = field_value
elif delimiter := field_mapping.get("delimiter"):
field_values = field_value.split(delimiter)
else:
field_values = [field_value]

metadata_entries.extend(
[
{
"key": field_name,
"value": value,
"language": field_mapping.get("language"),
}
for value in field_values
]
)
self.dspace_metadata = {"metadata": metadata_entries}

def validate_dspace_metadata(self) -> bool:
"""Validate that DSpace metadata follows the expected format for DSpace 6.x.
def _create_dspace_metadataentry(self) -> None | dict:
"""Format transformed metadata for DSpace REST API.

Args:
dspace_metadata: DSpace metadata to be validated.
This method follows the specs for a 'MetadataEntry Object' for
the DSpace 6 REST API:
https://wiki.lyrasis.org/display/DSDOC6x/REST+API#RESTAPI-MetadataEntryO
Comment on lines +258 to +260
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did confirm that the MetadataEntry concept is not reflected in the DSpace 7+ REST API (REST contract) so I have reservations about a refactor incorporating DSpace 6 structures when we're so close to DSpace 8.

However, reviewing the REST contract did reveal that we can access metadata schemas which I think will be critical for meaningful metadata validation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, see #213 (comment). 🤔

Similar to our approach for DSS, DSC will need to support DS6 and DS8 until the migration is complete. I feel confident that aligning how we transform metadata / generate metadata required for DSpace REST API across all our workflows will make it easier to solve the case of supporting multiple DSpace versions!

"""
valid = False
if self.dspace_metadata and self.dspace_metadata.get("metadata") is not None:
for element in self.dspace_metadata["metadata"]:
if element.get("key") is not None and element.get("value") is not None:
valid = True
logger.debug("Valid DSpace metadata created")
else:
raise InvalidDSpaceMetadataError(
f"Invalid DSpace metadata created: {self.dspace_metadata} ",
)
return valid
if not self.dspace_metadata:
return None

metadataentry = defaultdict(list)

for field, value in self.dspace_metadata.items():
if field == "item_identifier":
continue

if not value and field in ["dc.title", "dc.date.issued"]:
raise ItemMetadataMissingRequiredFieldError(
f"Item metadata missing required field: {field}"
)

if value:
value_list = value if isinstance(value, list) else [value]

metadataentry["metadata"].extend(
[
{
"key": field,
"value": _value,
}
for _value in value_list
]
)

return dict(metadataentry)

def upload_dspace_metadata(self, bucket: str, prefix: str) -> None:
def _upload_dspace_metadata(self, bucket: str, prefix: str) -> str:
"""Upload DSpace metadata to S3 using the specified bucket and keyname.

Args:
Expand All @@ -361,7 +315,7 @@ def upload_dspace_metadata(self, bucket: str, prefix: str) -> None:

metadata_s3_uri = f"s3://{bucket}/{metadata_s3_key}"
logger.info(f"Metadata uploaded to S3: {metadata_s3_uri}")
self.metadata_s3_uri = metadata_s3_uri
return metadata_s3_uri

def send_submission_message(
self,
Expand Down
2 changes: 1 addition & 1 deletion dsc/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from dsc.workflows.archivesspace import ArchivesSpace
from dsc.workflows.base import Workflow
from dsc.workflows.base.simple_csv import SimpleCSV
from dsc.workflows.opencourseware import OpenCourseWare
from dsc.workflows.sccs import SCCS
from dsc.workflows.simple_csv import SimpleCSV

__all__ = ["SCCS", "ArchivesSpace", "OpenCourseWare", "SimpleCSV", "Workflow"]
3 changes: 3 additions & 0 deletions dsc/workflows/archivesspace/__init__.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a version of this comment that seemed to fall in a black hole so repeating: I really like this organization, the previous approach worked but I appreciate having a workflow's files collocated like this!

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dsc.workflows.archivesspace.workflow import ArchivesSpace

__all__ = ["ArchivesSpace"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from dsc.db.models import ItemSubmissionStatus
from dsc.item_submission import ItemSubmission
from dsc.workflows.base.simple_csv import SimpleCSV
from dsc.workflows.simple_csv import SimpleCSV

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +25,7 @@ class ArchivesSpace(SimpleCSV):

@property
def metadata_mapping_path(self) -> str:
return "dsc/workflows/metadata_mapping/archivesspace.json"
return "dsc/workflows/archivesspace/metadata_mapping.json"

@property
def output_path(self) -> str:
Expand Down
Loading