From 06ac7632154c920f32df7c711578ba8fc546c8b3 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Fri, 3 Apr 2026 15:33:23 -0400 Subject: [PATCH 1/4] Add metrics client Why these changes are being introduced: * A metrics client class is needed to implement AWS Cloudwatch metrics How this addresses that need: * Add MetricsClient class with methods for publishing individual and batch metrics * Add METRICS_NAMESPACE and METRICS constants to config.py * Add metrics attributes to Workflow class * Add publish_metric method calls to submit_items and finalize_items methods * Update dependencies Side effects of this change: * None Relevant ticket(s): * NA --- dsc/config.py | 9 ++ dsc/utils/aws/__init__.py | 3 +- dsc/utils/aws/metrics.py | 198 +++++++++++++++++++++++++++++++++ dsc/workflows/base/workflow.py | 35 +++++- 4 files changed, 243 insertions(+), 2 deletions(-) create mode 100644 dsc/utils/aws/metrics.py diff --git a/dsc/config.py b/dsc/config.py index b937e1b..ac96885 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -4,6 +4,15 @@ import sentry_sdk +METRICS_NAMESPACE = "dso" + +METRICS = [ + "item_submitted", # item submitted to DSS + "submission_error", # error during submission to DSS + "ingested_item", # item ingested successfully into DSpace + "ingest_error", # error during attempted item ingest into DSpace +] + class Config: REQUIRED_ENV_VARS: Iterable[str] = [ diff --git a/dsc/utils/aws/__init__.py b/dsc/utils/aws/__init__.py index 700b14b..212d680 100644 --- a/dsc/utils/aws/__init__.py +++ b/dsc/utils/aws/__init__.py @@ -1,5 +1,6 @@ +from dsc.utils.aws.metrics import MetricsClient from dsc.utils.aws.s3 import S3Client from dsc.utils.aws.ses import SESClient from dsc.utils.aws.sqs import SQSClient -__all__ = ["S3Client", "SESClient", "SQSClient"] +__all__ = ["MetricsClient", "S3Client", "SESClient", "SQSClient"] diff --git a/dsc/utils/aws/metrics.py b/dsc/utils/aws/metrics.py new file mode 100644 index 0000000..0ebdb82 --- /dev/null +++ b/dsc/utils/aws/metrics.py @@ -0,0 +1,198 @@ +"""AWS CloudWatch metrics client for workflow submission tracking.""" + +from __future__ import annotations + +import logging + +import boto3 + +from dsc.config import METRICS, METRICS_NAMESPACE + +logger = logging.getLogger(__name__) + + +UNIT_VALUES = frozenset( + [ + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + ] +) + + +class MetricsClient: + """A simple client to record metrics to AWS CloudWatch.""" + + def __init__(self) -> None: + """Initialize the MetricsClient.""" + self.cloudwatch = boto3.client("cloudwatch") + self.batch_metrics: list[dict] = [] + + def publish_single_metric( + self, + metric_name: str, + value: int, + unit: str, + metric_dimensions: dict[str, str] | None = None, + ) -> None: + """Publish a single metric to CloudWatch. + + Args: + metric_name: The name of the metric to publish. + value: The value of the metric. + unit: The unit of the metric. + metric_dimensions: Optional dictionary of dimension names and values. + + Raises: + ValueError: If unit is invalid. + """ + metric_data = self._validate_and_build_metric_data( + metric_name, value, unit, metric_dimensions + ) + self._push_metric_data([metric_data]) + + def _validate_and_build_metric_data( + self, + metric_name: str, + value: int, + unit: str, + metric_dimensions: dict[str, str] | None = None, + ) -> dict: + """Validate and build a metric data dictionary for CloudWatch. + + Args: + metric_name: The name of the metric. + value: The value of the metric. + unit: The unit of the metric. + metric_dimensions: Optional dictionary of dimension names and values. + + Returns: + A metric data dictionary formatted for CloudWatch. + """ + self._approved_metric(metric_name) + self._validate_unit(unit) + dimensions = [ + {"Name": name, "Value": dim_value} + for name, dim_value in ( + metric_dimensions.items() if metric_dimensions else [] + ) + ] + return { + "MetricName": metric_name, + "Value": value, + "Unit": unit, + "Dimensions": dimensions, + } + + def _approved_metric(self, metric_name: str) -> bool: + """Check if a metric name is in the approved list of metrics for the application. + + Args: + metric_name: The name of the metric to check. + """ + if metric_name not in METRICS: + raise ValueError( + f"Metric name '{metric_name}' is not in the approved list of metrics: " + f"{', '.join(METRICS)}" + ) + return True + + def _validate_unit(self, unit: str) -> None: + """Validate that metric unit is allowed by AWS CloudWatch. + + Args: + unit: The unit to validate. + + Raises: + ValueError: If unit is not allowed by AWS CloudWatch. + """ + if unit not in UNIT_VALUES: + raise ValueError( + f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}" + ) + + def _push_metric_data(self, metric_data: list[dict]) -> None: + """Push metric data to CloudWatch. + + Args: + metric_data: List of metric dictionaries to push. + """ + try: + self.cloudwatch.put_metric_data( + Namespace=METRICS_NAMESPACE, MetricData=metric_data + ) + logger.info(f"Published metric with {metric_data} to CloudWatch.") + except Exception: + logger.exception( + f"Failed to publish metric with {metric_data} to CloudWatch." + ) + + def add_metric_to_batch( + self, + metric_name: str, + value: int, + unit: str, + metric_dimensions: dict[str, str] | None = None, + ) -> None: + """Add a metric to the batch for later publishing. + + Args: + metric_name: The name of the metric. + value: The value of the metric. + unit: The unit of the metric. + metric_dimensions: Optional dictionary of dimension names and values. + + Raises: + ValueError: If unit is invalid. + """ + metric_data = self._validate_and_build_metric_data( + metric_name, value, unit, metric_dimensions + ) + self.batch_metrics.append(metric_data) + + def publish_batch_metrics(self, batch_size: int = 20) -> None: + """Publish all accumulated batch metrics to CloudWatch. + + Raises: + ValueError: If any metric has an invalid unit or missing required fields. + """ + if not self.batch_metrics: + logger.info("No metrics to publish.") + return + + # Validate all metrics before publishing + for metric in self.batch_metrics: + if not all(key in metric for key in ["MetricName", "Value", "Unit"]): + raise ValueError( + f"Each metric must contain 'MetricName', 'Value', and 'Unit'. " + f"Invalid metric: {metric}" + ) + self._approved_metric(metric["MetricName"]) + self._validate_unit(metric["Unit"]) + + for x in range(0, len(self.batch_metrics), batch_size): + self._push_metric_data(self.batch_metrics[x : x + batch_size]) + self.batch_metrics.clear() diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index a35038f..f31ec8e 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -19,7 +19,7 @@ ) from dsc.item_submission import ItemSubmission from dsc.reports import Report -from dsc.utils.aws import SESClient, SQSClient +from dsc.utils.aws import MetricsClient, SESClient, SQSClient from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY if TYPE_CHECKING: # pragma: no cover @@ -125,6 +125,11 @@ def __init__(self, batch_id: str) -> None: "skipped": 0, "errors": 0, } + self.metrics_client = MetricsClient() + self.metrics_dimensions = { + "application": "dsc", + "workflow_name": self.workflow_name, + } # cache list of bitstreams self._batch_bitstream_uris: list[str] | None = None @@ -325,6 +330,12 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.status_details = None item_submission.submit_attempts += 1 item_submission.upsert_db() + self.metrics_client.publish_single_metric( + metric_name="item_submitted", + value=1, + unit="Count", + metric_dimensions=self.metrics_dimensions, + ) except NotImplementedError: raise except Exception as exception: # noqa: BLE001 @@ -334,6 +345,13 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.submit_attempts += 1 item_submission.upsert_db() + self.metrics_client.publish_single_metric( + metric_name="submission_error", + value=1, + unit="Count", + metric_dimensions=self.metrics_dimensions, + ) + logger.info( f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' " f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}" @@ -416,11 +434,26 @@ def finalize_items(self) -> None: item_submission.dspace_handle = result_message.dspace_handle sqs_results_summary["ingest_success"] += 1 logger.debug(f"Record {log_str} was ingested") + + self.metrics_client.publish_single_metric( + metric_name="ingested_item", + value=1, + unit="Count", + metric_dimensions=self.metrics_dimensions, + ) elif result_message.result_type == "error": item_submission.status = ItemSubmissionStatus.INGEST_FAILED item_submission.status_details = result_message.error_info sqs_results_summary["ingest_failed"] += 1 logger.debug(f"Record {log_str} failed to ingest") + + self.metrics_client.publish_single_metric( + metric_name="ingest_error", + value=1, + unit="Count", + metric_dimensions=self.metrics_dimensions, + ) + else: item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN sqs_results_summary["ingest_unknown"] += 1 From a27d72cc16f2e3e8b78bdd692ce7af24aa93d681 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Fri, 10 Apr 2026 11:29:00 -0400 Subject: [PATCH 2/4] Updates based on discussion in PR #226 * Rename Config.METRICS > ALLOWED_METRICS and change from list to set * Add Metric dataclass and update MetricsClient class to use it * Update calls in Workflow class to use Metric dataclass * Update dependencies --- dsc/config.py | 4 +- dsc/utils/aws/__init__.py | 4 +- dsc/utils/aws/metrics.py | 188 ++++++++++++++++----------------- dsc/workflows/base/workflow.py | 46 ++++---- 4 files changed, 121 insertions(+), 121 deletions(-) diff --git a/dsc/config.py b/dsc/config.py index ac96885..0ad8e4b 100644 --- a/dsc/config.py +++ b/dsc/config.py @@ -6,12 +6,12 @@ METRICS_NAMESPACE = "dso" -METRICS = [ +ALLOWED_METRICS = { "item_submitted", # item submitted to DSS "submission_error", # error during submission to DSS "ingested_item", # item ingested successfully into DSpace "ingest_error", # error during attempted item ingest into DSpace -] +} class Config: diff --git a/dsc/utils/aws/__init__.py b/dsc/utils/aws/__init__.py index 212d680..5088c4c 100644 --- a/dsc/utils/aws/__init__.py +++ b/dsc/utils/aws/__init__.py @@ -1,6 +1,6 @@ -from dsc.utils.aws.metrics import MetricsClient +from dsc.utils.aws.metrics import Metric, MetricsClient from dsc.utils.aws.s3 import S3Client from dsc.utils.aws.ses import SESClient from dsc.utils.aws.sqs import SQSClient -__all__ = ["MetricsClient", "S3Client", "SESClient", "SQSClient"] +__all__ = ["Metric", "MetricsClient", "S3Client", "SESClient", "SQSClient"] diff --git a/dsc/utils/aws/metrics.py b/dsc/utils/aws/metrics.py index 0ebdb82..75e4796 100644 --- a/dsc/utils/aws/metrics.py +++ b/dsc/utils/aws/metrics.py @@ -3,10 +3,11 @@ from __future__ import annotations import logging +from dataclasses import dataclass import boto3 -from dsc.config import METRICS, METRICS_NAMESPACE +from dsc.config import METRICS_NAMESPACE logger = logging.getLogger(__name__) @@ -43,84 +44,66 @@ ) +@dataclass +class Metric: + """A class representing a single metric to be published to CloudWatch.""" + + name: str + value: int + unit: str + dimensions: dict[str, str] | None = None + + class MetricsClient: """A simple client to record metrics to AWS CloudWatch.""" - def __init__(self) -> None: + def __init__(self, allowed_metrics: set[str] | None = None) -> None: """Initialize the MetricsClient.""" - self.cloudwatch = boto3.client("cloudwatch") - self.batch_metrics: list[dict] = [] + self.namespace = METRICS_NAMESPACE + self.allowed_metrics: set[str] | None = allowed_metrics + self._cloudwatch = boto3.client("cloudwatch") + self.batch_metrics: list[Metric] = [] def publish_single_metric( self, - metric_name: str, - value: int, - unit: str, - metric_dimensions: dict[str, str] | None = None, + metric: Metric, ) -> None: - """Publish a single metric to CloudWatch. - - Args: - metric_name: The name of the metric to publish. - value: The value of the metric. - unit: The unit of the metric. - metric_dimensions: Optional dictionary of dimension names and values. - - Raises: - ValueError: If unit is invalid. - """ - metric_data = self._validate_and_build_metric_data( - metric_name, value, unit, metric_dimensions - ) - self._push_metric_data([metric_data]) + """Publish a single metric to CloudWatch.""" + self._validate_metric(metric) + self._push_metric_data([metric]) - def _validate_and_build_metric_data( + def _validate_metric( self, - metric_name: str, - value: int, - unit: str, - metric_dimensions: dict[str, str] | None = None, - ) -> dict: - """Validate and build a metric data dictionary for CloudWatch. + metric: Metric, + ) -> bool: + """Validate that a metric has required fields and allowed unit. Args: - metric_name: The name of the metric. - value: The value of the metric. - unit: The unit of the metric. - metric_dimensions: Optional dictionary of dimension names and values. - - Returns: - A metric data dictionary formatted for CloudWatch. + metric: The Metric instance to validate. """ - self._approved_metric(metric_name) - self._validate_unit(unit) - dimensions = [ - {"Name": name, "Value": dim_value} - for name, dim_value in ( - metric_dimensions.items() if metric_dimensions else [] + if not all(hasattr(metric, attr) for attr in ["name", "value", "unit"]): + raise ValueError( + f"Metric must have 'name', 'value', and 'unit' attributes. Invalid " + f"metric: {metric}" ) - ] - return { - "MetricName": metric_name, - "Value": value, - "Unit": unit, - "Dimensions": dimensions, - } + self._allowed_metric(metric.name) + self._validate_metric_unit(metric.unit) + return True - def _approved_metric(self, metric_name: str) -> bool: - """Check if a metric name is in the approved list of metrics for the application. + def _allowed_metric(self, metric_name: str) -> bool: + """Check if a metric name is in the allowed list of metrics for the application. Args: metric_name: The name of the metric to check. """ - if metric_name not in METRICS: + if self.allowed_metrics and metric_name not in self.allowed_metrics: raise ValueError( - f"Metric name '{metric_name}' is not in the approved list of metrics: " - f"{', '.join(METRICS)}" + f"Metric name '{metric_name}' is not in the allowed list of metrics: " + f"{', '.join(self.allowed_metrics)}" ) return True - def _validate_unit(self, unit: str) -> None: + def _validate_metric_unit(self, unit: str) -> bool: """Validate that metric unit is allowed by AWS CloudWatch. Args: @@ -133,66 +116,75 @@ def _validate_unit(self, unit: str) -> None: raise ValueError( f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}" ) + return True - def _push_metric_data(self, metric_data: list[dict]) -> None: - """Push metric data to CloudWatch. + def _push_metric_data(self, metrics: list[Metric]) -> None: + """Push metrics to CloudWatch. Args: - metric_data: List of metric dictionaries to push. + metrics: List of metric instances to push. """ + if not metrics: + logger.info("No metrics to publish.") + return + try: - self.cloudwatch.put_metric_data( - Namespace=METRICS_NAMESPACE, MetricData=metric_data + metric_data = [] + for metric in metrics: + metric_dict = { + "MetricName": metric.name, + "Value": metric.value, + "Unit": metric.unit, + } + if metric.dimensions: + metric_dict["Dimensions"] = [ + {"Name": key, "Value": value} + for key, value in metric.dimensions.items() + ] + metric_data.append(metric_dict) + + self._cloudwatch.put_metric_data( + Namespace=self.namespace, + MetricData=metric_data, + ) + logger.info( + f"Published {len(metrics)} metric(s) to CloudWatch namespace " + f"'{self.namespace}'." ) - logger.info(f"Published metric with {metric_data} to CloudWatch.") except Exception: logger.exception( - f"Failed to publish metric with {metric_data} to CloudWatch." + f"Failed to publish {len(metrics)} metric(s) to CloudWatch: " ) + raise - def add_metric_to_batch( - self, - metric_name: str, - value: int, - unit: str, - metric_dimensions: dict[str, str] | None = None, - ) -> None: - """Add a metric to the batch for later publishing. + def add_metric_to_batch(self, metric: Metric) -> None: + """Add a metric to the batch queue. Args: - metric_name: The name of the metric. - value: The value of the metric. - unit: The unit of the metric. - metric_dimensions: Optional dictionary of dimension names and values. - - Raises: - ValueError: If unit is invalid. + metric: The metric to add to the batch. """ - metric_data = self._validate_and_build_metric_data( - metric_name, value, unit, metric_dimensions - ) - self.batch_metrics.append(metric_data) + self._validate_metric(metric) + self.batch_metrics.append(metric) def publish_batch_metrics(self, batch_size: int = 20) -> None: - """Publish all accumulated batch metrics to CloudWatch. + """Publish a batch of metrics to CloudWatch. - Raises: - ValueError: If any metric has an invalid unit or missing required fields. + Clears the batch queue after publishing. + + Args: + batch_size: Number of metrics to publish in each batch. """ if not self.batch_metrics: logger.info("No metrics to publish.") return - # Validate all metrics before publishing - for metric in self.batch_metrics: - if not all(key in metric for key in ["MetricName", "Value", "Unit"]): - raise ValueError( - f"Each metric must contain 'MetricName', 'Value', and 'Unit'. " - f"Invalid metric: {metric}" - ) - self._approved_metric(metric["MetricName"]) - self._validate_unit(metric["Unit"]) - - for x in range(0, len(self.batch_metrics), batch_size): - self._push_metric_data(self.batch_metrics[x : x + batch_size]) - self.batch_metrics.clear() + try: + # Re-validate all metrics before publishing to CloudWatch + for metric in self.batch_metrics: + self._validate_metric(metric) + + for x in range(0, len(self.batch_metrics), batch_size): + batch = self.batch_metrics[x : x + batch_size] + self._push_metric_data(batch) + finally: + self.batch_metrics.clear() diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index f31ec8e..af8da2f 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -10,7 +10,7 @@ import jsonschema import jsonschema.exceptions -from dsc.config import Config +from dsc.config import ALLOWED_METRICS, Config from dsc.db.models import ItemSubmissionStatus from dsc.exceptions import ( BatchCreationFailedError, @@ -19,7 +19,7 @@ ) from dsc.item_submission import ItemSubmission from dsc.reports import Report -from dsc.utils.aws import MetricsClient, SESClient, SQSClient +from dsc.utils.aws import Metric, MetricsClient, SESClient, SQSClient from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY if TYPE_CHECKING: # pragma: no cover @@ -125,7 +125,7 @@ def __init__(self, batch_id: str) -> None: "skipped": 0, "errors": 0, } - self.metrics_client = MetricsClient() + self.metrics_client = MetricsClient(allowed_metrics=ALLOWED_METRICS) self.metrics_dimensions = { "application": "dsc", "workflow_name": self.workflow_name, @@ -331,10 +331,12 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.submit_attempts += 1 item_submission.upsert_db() self.metrics_client.publish_single_metric( - metric_name="item_submitted", - value=1, - unit="Count", - metric_dimensions=self.metrics_dimensions, + Metric( + name="item_submitted", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) ) except NotImplementedError: raise @@ -346,10 +348,12 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.upsert_db() self.metrics_client.publish_single_metric( - metric_name="submission_error", - value=1, - unit="Count", - metric_dimensions=self.metrics_dimensions, + Metric( + name="submission_error", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) ) logger.info( @@ -436,10 +440,12 @@ def finalize_items(self) -> None: logger.debug(f"Record {log_str} was ingested") self.metrics_client.publish_single_metric( - metric_name="ingested_item", - value=1, - unit="Count", - metric_dimensions=self.metrics_dimensions, + Metric( + name="ingested_item", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) ) elif result_message.result_type == "error": item_submission.status = ItemSubmissionStatus.INGEST_FAILED @@ -448,10 +454,12 @@ def finalize_items(self) -> None: logger.debug(f"Record {log_str} failed to ingest") self.metrics_client.publish_single_metric( - metric_name="ingest_error", - value=1, - unit="Count", - metric_dimensions=self.metrics_dimensions, + Metric( + name="ingest_error", + value=1, + unit="Count", + dimensions=self.metrics_dimensions, + ) ) else: From d9d47d1f2e4cc3e042886a49580c115b52fddcc4 Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Wed, 15 Apr 2026 10:56:29 -0400 Subject: [PATCH 3/4] More updates based on discussion in PR #226 * Add namespace param to Metric class and MetricsClient.__init__ * Rename publish_single_metric > publish_metric, _push_metric_data > _publish_metrics, add_metric_to_batch > add_metrics_to_batch, and publish_batch_metrics > publish_metrics_batch * Add logic to use Metric.namespace over MetricsClient.namespace * Remove validation from publish_metrics_batch * Update calls in Workflow class --- dsc/utils/aws/metrics.py | 45 +++++++++++++++++++--------------- dsc/workflows/base/workflow.py | 15 +++++++----- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/dsc/utils/aws/metrics.py b/dsc/utils/aws/metrics.py index 75e4796..621d599 100644 --- a/dsc/utils/aws/metrics.py +++ b/dsc/utils/aws/metrics.py @@ -7,8 +7,6 @@ import boto3 -from dsc.config import METRICS_NAMESPACE - logger = logging.getLogger(__name__) @@ -52,25 +50,26 @@ class Metric: value: int unit: str dimensions: dict[str, str] | None = None + namespace: str | None = None class MetricsClient: """A simple client to record metrics to AWS CloudWatch.""" - def __init__(self, allowed_metrics: set[str] | None = None) -> None: + def __init__(self, namespace: str, allowed_metrics: set[str] | None = None) -> None: """Initialize the MetricsClient.""" - self.namespace = METRICS_NAMESPACE + self.namespace = namespace self.allowed_metrics: set[str] | None = allowed_metrics self._cloudwatch = boto3.client("cloudwatch") self.batch_metrics: list[Metric] = [] - def publish_single_metric( + def publish_metric( self, metric: Metric, ) -> None: """Publish a single metric to CloudWatch.""" self._validate_metric(metric) - self._push_metric_data([metric]) + self._publish_metrics([metric]) def _validate_metric( self, @@ -118,7 +117,7 @@ def _validate_metric_unit(self, unit: str) -> bool: ) return True - def _push_metric_data(self, metrics: list[Metric]) -> None: + def _publish_metrics(self, metrics: list[Metric]) -> None: """Push metrics to CloudWatch. Args: @@ -127,10 +126,10 @@ def _push_metric_data(self, metrics: list[Metric]) -> None: if not metrics: logger.info("No metrics to publish.") return - try: metric_data = [] for metric in metrics: + self._validate_namespace(metric) metric_dict = { "MetricName": metric.name, "Value": metric.value, @@ -144,7 +143,7 @@ def _push_metric_data(self, metrics: list[Metric]) -> None: metric_data.append(metric_dict) self._cloudwatch.put_metric_data( - Namespace=self.namespace, + Namespace=metric.namespace or self.namespace, MetricData=metric_data, ) logger.info( @@ -157,16 +156,26 @@ def _push_metric_data(self, metrics: list[Metric]) -> None: ) raise - def add_metric_to_batch(self, metric: Metric) -> None: - """Add a metric to the batch queue. + def _validate_namespace(self, metric: Metric) -> bool: + """Validate metric has a namespace or the client has a default namespace.""" + if not metric.namespace and not self.namespace: + raise ValueError( + f"Metric '{metric.name}' must have a namespace if no default " + f"namespace is set for the MetricsClient." + ) + return True + + def add_metrics_to_batch(self, metrics: list[Metric]) -> None: + """Add metrics to the batch queue. Args: - metric: The metric to add to the batch. + metrics: The metrics to add to the batch. """ - self._validate_metric(metric) - self.batch_metrics.append(metric) + for metric in metrics: + self._validate_metric(metric) + self.batch_metrics.append(metric) - def publish_batch_metrics(self, batch_size: int = 20) -> None: + def publish_metrics_batch(self, batch_size: int = 20) -> None: """Publish a batch of metrics to CloudWatch. Clears the batch queue after publishing. @@ -179,12 +188,8 @@ def publish_batch_metrics(self, batch_size: int = 20) -> None: return try: - # Re-validate all metrics before publishing to CloudWatch - for metric in self.batch_metrics: - self._validate_metric(metric) - for x in range(0, len(self.batch_metrics), batch_size): batch = self.batch_metrics[x : x + batch_size] - self._push_metric_data(batch) + self._publish_metrics(batch) finally: self.batch_metrics.clear() diff --git a/dsc/workflows/base/workflow.py b/dsc/workflows/base/workflow.py index af8da2f..6a93114 100644 --- a/dsc/workflows/base/workflow.py +++ b/dsc/workflows/base/workflow.py @@ -10,7 +10,7 @@ import jsonschema import jsonschema.exceptions -from dsc.config import ALLOWED_METRICS, Config +from dsc.config import ALLOWED_METRICS, METRICS_NAMESPACE, Config from dsc.db.models import ItemSubmissionStatus from dsc.exceptions import ( BatchCreationFailedError, @@ -125,7 +125,9 @@ def __init__(self, batch_id: str) -> None: "skipped": 0, "errors": 0, } - self.metrics_client = MetricsClient(allowed_metrics=ALLOWED_METRICS) + self.metrics_client = MetricsClient( + namespace=METRICS_NAMESPACE, allowed_metrics=ALLOWED_METRICS + ) self.metrics_dimensions = { "application": "dsc", "workflow_name": self.workflow_name, @@ -330,7 +332,8 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.status_details = None item_submission.submit_attempts += 1 item_submission.upsert_db() - self.metrics_client.publish_single_metric( + + self.metrics_client.publish_metric( Metric( name="item_submitted", value=1, @@ -347,7 +350,7 @@ def submit_items(self, collection_handle: str | None = None) -> list: item_submission.submit_attempts += 1 item_submission.upsert_db() - self.metrics_client.publish_single_metric( + self.metrics_client.publish_metric( Metric( name="submission_error", value=1, @@ -439,7 +442,7 @@ def finalize_items(self) -> None: sqs_results_summary["ingest_success"] += 1 logger.debug(f"Record {log_str} was ingested") - self.metrics_client.publish_single_metric( + self.metrics_client.publish_metric( Metric( name="ingested_item", value=1, @@ -453,7 +456,7 @@ def finalize_items(self) -> None: sqs_results_summary["ingest_failed"] += 1 logger.debug(f"Record {log_str} failed to ingest") - self.metrics_client.publish_single_metric( + self.metrics_client.publish_metric( Metric( name="ingest_error", value=1, From 0c0452e4a09d8b46d43fca354a5ddb646fef0c2b Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Wed, 15 Apr 2026 13:14:19 -0400 Subject: [PATCH 4/4] Update MetricsClient's namespace to keyword arg --- dsc/utils/aws/metrics.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dsc/utils/aws/metrics.py b/dsc/utils/aws/metrics.py index 621d599..fb92f84 100644 --- a/dsc/utils/aws/metrics.py +++ b/dsc/utils/aws/metrics.py @@ -56,7 +56,9 @@ class Metric: class MetricsClient: """A simple client to record metrics to AWS CloudWatch.""" - def __init__(self, namespace: str, allowed_metrics: set[str] | None = None) -> None: + def __init__( + self, namespace: str | None = None, allowed_metrics: set[str] | None = None + ) -> None: """Initialize the MetricsClient.""" self.namespace = namespace self.allowed_metrics: set[str] | None = allowed_metrics