diff --git a/dsc/config.py b/dsc/config.py index b937e1b..0ad8e4b 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -4,6 +4,15 @@ import sentry_sdk +METRICS_NAMESPACE = "dso" + +ALLOWED_METRICS = { + "item_submitted", # item submitted to DSS + "submission_error", # error during submission to DSS + "ingested_item", # item ingested successfully into DSpace + "ingest_error", # error during attempted item ingest into DSpace +} + class Config: REQUIRED_ENV_VARS: Iterable[str] = [ diff --git a/dsc/utils/aws/__init__.py b/dsc/utils/aws/__init__.py index 700b14b..5088c4c 100644 --- a/dsc/utils/aws/__init__.py +++ b/dsc/utils/aws/__init__.py @@ -1,5 +1,6 @@ +from dsc.utils.aws.metrics import Metric, MetricsClient from dsc.utils.aws.s3 import S3Client from dsc.utils.aws.ses import SESClient from dsc.utils.aws.sqs import SQSClient -__all__ = ["S3Client", "SESClient", "SQSClient"] +__all__ = ["Metric", "MetricsClient", "S3Client", "SESClient", "SQSClient"] diff --git a/dsc/utils/aws/metrics.py b/dsc/utils/aws/metrics.py new file mode 100644 index 0000000..fb92f84 --- /dev/null +++ b/dsc/utils/aws/metrics.py @@ -0,0 +1,197 @@ +"""AWS CloudWatch metrics client for workflow submission tracking.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass + +import boto3 + +logger = logging.getLogger(__name__) + + +UNIT_VALUES = frozenset( + [ + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + ] +) + + +@dataclass +class Metric: + """A class representing a single metric to be published to CloudWatch.""" + + name: str + value: int + unit: str + dimensions: dict[str, str] | None = None + namespace: str | None = None + + +class MetricsClient: + """A simple client to record metrics to AWS CloudWatch.""" + + def __init__( + self, namespace: str | None = None, allowed_metrics: set[str] | None = None + ) -> None: + """Initialize the MetricsClient.""" + self.namespace = namespace + self.allowed_metrics: set[str] | None = allowed_metrics + self._cloudwatch = boto3.client("cloudwatch") + self.batch_metrics: list[Metric] = [] + + def publish_metric( + self, + metric: Metric, + ) -> None: + """Publish a single metric to CloudWatch.""" + self._validate_metric(metric) + self._publish_metrics([metric]) + + def _validate_metric( + self, + metric: Metric, + ) -> bool: + """Validate that a metric has required fields and allowed unit. + + Args: + metric: The Metric instance to validate. + """ + if not all(hasattr(metric, attr) for attr in ["name", "value", "unit"]): + raise ValueError( + f"Metric must have 'name', 'value', and 'unit' attributes. Invalid " + f"metric: {metric}" + ) + self._allowed_metric(metric.name) + self._validate_metric_unit(metric.unit) + return True + + def _allowed_metric(self, metric_name: str) -> bool: + """Check if a metric name is in the allowed list of metrics for the application. + + Args: + metric_name: The name of the metric to check. + """ + if self.allowed_metrics and metric_name not in self.allowed_metrics: + raise ValueError( + f"Metric name '{metric_name}' is not in the allowed list of metrics: " + f"{', '.join(self.allowed_metrics)}" + ) + return True + + def _validate_metric_unit(self, unit: str) -> bool: + """Validate that metric unit is allowed by AWS CloudWatch. + + Args: + unit: The unit to validate. + + Raises: + ValueError: If unit is not allowed by AWS CloudWatch. + """ + if unit not in UNIT_VALUES: + raise ValueError( + f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}" + ) + return True + + def _publish_metrics(self, metrics: list[Metric]) -> None: + """Push metrics to CloudWatch. + + Args: + metrics: List of metric instances to push. + """ + if not metrics: + logger.info("No metrics to publish.") + return + try: + metric_data = [] + for metric in metrics: + self._validate_namespace(metric) + metric_dict = { + "MetricName": metric.name, + "Value": metric.value, + "Unit": metric.unit, + } + if metric.dimensions: + metric_dict["Dimensions"] = [ + {"Name": key, "Value": value} + for key, value in metric.dimensions.items() + ] + metric_data.append(metric_dict) + + self._cloudwatch.put_metric_data( + Namespace=metric.namespace or self.namespace, + MetricData=metric_data, + ) + logger.info( + f"Published {len(metrics)} metric(s) to CloudWatch namespace " + f"'{self.namespace}'." + ) + except Exception: + logger.exception( + f"Failed to publish {len(metrics)} metric(s) to CloudWatch: " + ) + raise + + def _validate_namespace(self, metric: Metric) -> bool: + """Validate metric has a namespace or the client has a default namespace.""" + if not metric.namespace and not self.namespace: + raise ValueError( + f"Metric '{metric.name}' must have a namespace if no default " + f"namespace is set for the MetricsClient." + ) + return True + + def add_metrics_to_batch(self, metrics: list[Metric]) -> None: + """Add metrics to the batch queue. + + Args: + metrics: The metrics to add to the batch. + """ + for metric in metrics: + self._validate_metric(metric) + self.batch_metrics.append(metric) + + def publish_metrics_batch(self, batch_size: int = 20) -> None: + """Publish a batch of metrics to CloudWatch. + + Clears the batch queue after publishing. + + Args: + batch_size: Number of metrics to publish in each batch. + """ + if not self.batch_metrics: + logger.info("No metrics to publish.") + return + + try: + for x in range(0, len(self.batch_metrics), batch_size): + batch = self.batch_metrics[x : x + batch_size] + self._publish_metrics(batch) + finally: + self.batch_metrics.clear() diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index a35038f..6a93114 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -10,7 +10,7 @@ import jsonschema import jsonschema.exceptions -from dsc.config import Config +from dsc.config import ALLOWED_METRICS, METRICS_NAMESPACE, Config from dsc.db.models import ItemSubmissionStatus from dsc.exceptions import ( BatchCreationFailedError, @@ -19,7 +19,7 @@ ) from dsc.item_submission import ItemSubmission from dsc.reports import Report -from dsc.utils.aws import SESClient, SQSClient +from dsc.utils.aws import Metric, MetricsClient, SESClient, SQSClient from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY if TYPE_CHECKING: # pragma: no cover @@ -125,6 +125,13 @@ def __init__(self, batch_id: str) -> None: "skipped": 0, "errors": 0, } + self.metrics_client = MetricsClient( + namespace=METRICS_NAMESPACE, allowed_metrics=ALLOWED_METRICS + ) + self.metrics_dimensions = { + "application": "dsc", + "workflow_name": self.workflow_name, + } # cache list of bitstreams self._batch_bitstream_uris: list[str] | None = None @@ -325,6 +332,15 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.status_details = None item_submission.submit_attempts += 1 item_submission.upsert_db() + + self.metrics_client.publish_metric( + Metric( + name="item_submitted", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) + ) except NotImplementedError: raise except Exception as exception: # noqa: BLE001 @@ -334,6 +350,15 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.submit_attempts += 1 item_submission.upsert_db() + self.metrics_client.publish_metric( + Metric( + name="submission_error", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) + ) + logger.info( f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" @@ -416,11 +441,30 @@ def finalize_items(self) -> None: item_submission.dspace_handle = result_message.dspace_handle sqs_results_summary["ingest_success"] += 1 logger.debug(f"Record {log_str} was ingested") + + self.metrics_client.publish_metric( + Metric( + name="ingested_item", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) + ) elif result_message.result_type == "error": item_submission.status = ItemSubmissionStatus.INGEST_FAILED item_submission.status_details = result_message.error_info sqs_results_summary["ingest_failed"] += 1 logger.debug(f"Record {log_str} failed to ingest") + + self.metrics_client.publish_metric( + Metric( + name="ingest_error", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) + ) + else: item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN sqs_results_summary["ingest_unknown"] += 1