Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dsc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

import sentry_sdk

METRICS_NAMESPACE = "dso"

ALLOWED_METRICS = {
"item_submitted", # item submitted to DSS
"submission_error", # error during submission to DSS
"ingested_item", # item ingested successfully into DSpace
"ingest_error", # error during attempted item ingest into DSpace
}


class Config:
REQUIRED_ENV_VARS: Iterable[str] = [
Expand Down
3 changes: 2 additions & 1 deletion dsc/utils/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dsc.utils.aws.metrics import Metric, MetricsClient
from dsc.utils.aws.s3 import S3Client
from dsc.utils.aws.ses import SESClient
from dsc.utils.aws.sqs import SQSClient

__all__ = ["S3Client", "SESClient", "SQSClient"]
__all__ = ["Metric", "MetricsClient", "S3Client", "SESClient", "SQSClient"]
197 changes: 197 additions & 0 deletions dsc/utils/aws/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""AWS CloudWatch metrics client for workflow submission tracking."""

from __future__ import annotations

import logging
from dataclasses import dataclass

import boto3

logger = logging.getLogger(__name__)


UNIT_VALUES = frozenset(
[
"Seconds",
"Microseconds",
"Milliseconds",
"Bytes",
"Kilobytes",
"Megabytes",
"Gigabytes",
"Terabytes",
"Bits",
"Kilobits",
"Megabits",
"Gigabits",
"Terabits",
"Percent",
"Count",
"Bytes/Second",
"Kilobytes/Second",
"Megabytes/Second",
"Gigabytes/Second",
"Terabytes/Second",
"Bits/Second",
"Kilobits/Second",
"Megabits/Second",
"Gigabits/Second",
"Terabits/Second",
"Count/Second",
]
)


@dataclass
class Metric:
"""A class representing a single metric to be published to CloudWatch."""

name: str
value: int
unit: str
dimensions: dict[str, str] | None = None
namespace: str | None = None


class MetricsClient:
"""A simple client to record metrics to AWS CloudWatch."""

def __init__(
self, namespace: str | None = None, allowed_metrics: set[str] | None = None
) -> None:
"""Initialize the MetricsClient."""
self.namespace = namespace
self.allowed_metrics: set[str] | None = allowed_metrics
self._cloudwatch = boto3.client("cloudwatch")
self.batch_metrics: list[Metric] = []

def publish_metric(
self,
metric: Metric,
) -> None:
"""Publish a single metric to CloudWatch."""
self._validate_metric(metric)
self._publish_metrics([metric])

def _validate_metric(
self,
metric: Metric,
) -> bool:
"""Validate that a metric has required fields and allowed unit.

Args:
metric: The Metric instance to validate.
"""
if not all(hasattr(metric, attr) for attr in ["name", "value", "unit"]):
raise ValueError(
f"Metric must have 'name', 'value', and 'unit' attributes. Invalid "
f"metric: {metric}"
)
self._allowed_metric(metric.name)
self._validate_metric_unit(metric.unit)
return True

def _allowed_metric(self, metric_name: str) -> bool:
"""Check if a metric name is in the allowed list of metrics for the application.

Args:
metric_name: The name of the metric to check.
"""
if self.allowed_metrics and metric_name not in self.allowed_metrics:
raise ValueError(
f"Metric name '{metric_name}' is not in the allowed list of metrics: "
f"{', '.join(self.allowed_metrics)}"
)
return True

def _validate_metric_unit(self, unit: str) -> bool:
"""Validate that metric unit is allowed by AWS CloudWatch.

Args:
unit: The unit to validate.

Raises:
ValueError: If unit is not allowed by AWS CloudWatch.
"""
if unit not in UNIT_VALUES:
raise ValueError(
f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}"
)
return True

def _publish_metrics(self, metrics: list[Metric]) -> None:
"""Push metrics to CloudWatch.

Args:
metrics: List of metric instances to push.
"""
if not metrics:
logger.info("No metrics to publish.")
return
try:
metric_data = []
for metric in metrics:
self._validate_namespace(metric)
metric_dict = {
"MetricName": metric.name,
"Value": metric.value,
"Unit": metric.unit,
}
if metric.dimensions:
metric_dict["Dimensions"] = [
{"Name": key, "Value": value}
for key, value in metric.dimensions.items()
]
metric_data.append(metric_dict)

self._cloudwatch.put_metric_data(
Namespace=metric.namespace or self.namespace,
MetricData=metric_data,
)
logger.info(
f"Published {len(metrics)} metric(s) to CloudWatch namespace "
f"'{self.namespace}'."
)
except Exception:
logger.exception(
f"Failed to publish {len(metrics)} metric(s) to CloudWatch: "
)
raise

def _validate_namespace(self, metric: Metric) -> bool:
"""Validate metric has a namespace or the client has a default namespace."""
if not metric.namespace and not self.namespace:
raise ValueError(
f"Metric '{metric.name}' must have a namespace if no default "
f"namespace is set for the MetricsClient."
)
return True

def add_metrics_to_batch(self, metrics: list[Metric]) -> None:
"""Add metrics to the batch queue.

Args:
metrics: The metrics to add to the batch.
"""
for metric in metrics:
self._validate_metric(metric)
self.batch_metrics.append(metric)

def publish_metrics_batch(self, batch_size: int = 20) -> None:
"""Publish a batch of metrics to CloudWatch.

Clears the batch queue after publishing.

Args:
batch_size: Number of metrics to publish in each batch.
"""
if not self.batch_metrics:
logger.info("No metrics to publish.")
return

try:
for x in range(0, len(self.batch_metrics), batch_size):
batch = self.batch_metrics[x : x + batch_size]
self._publish_metrics(batch)
finally:
self.batch_metrics.clear()
48 changes: 46 additions & 2 deletions dsc/workflows/base/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import jsonschema
import jsonschema.exceptions

from dsc.config import Config
from dsc.config import ALLOWED_METRICS, METRICS_NAMESPACE, Config
from dsc.db.models import ItemSubmissionStatus
from dsc.exceptions import (
BatchCreationFailedError,
Expand All @@ -19,7 +19,7 @@
)
from dsc.item_submission import ItemSubmission
from dsc.reports import Report
from dsc.utils.aws import SESClient, SQSClient
from dsc.utils.aws import Metric, MetricsClient, SESClient, SQSClient
from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -125,6 +125,13 @@ def __init__(self, batch_id: str) -> None:
"skipped": 0,
"errors": 0,
}
self.metrics_client = MetricsClient(
namespace=METRICS_NAMESPACE, allowed_metrics=ALLOWED_METRICS
)
self.metrics_dimensions = {
"application": "dsc",
"workflow_name": self.workflow_name,
}

# cache list of bitstreams
self._batch_bitstream_uris: list[str] | None = None
Expand Down Expand Up @@ -325,6 +332,15 @@ def submit_items(self, collection_handle: str | None = None) -> list:
item_submission.status_details = None
item_submission.submit_attempts += 1
item_submission.upsert_db()

self.metrics_client.publish_metric(
Metric(
name="item_submitted",
value=1,
unit="Count",
dimensions=self.metrics_dimensions,
)
)
except NotImplementedError:
raise
except Exception as exception: # noqa: BLE001
Expand All @@ -334,6 +350,15 @@ def submit_items(self, collection_handle: str | None = None) -> list:
item_submission.submit_attempts += 1
item_submission.upsert_db()

self.metrics_client.publish_metric(
Metric(
name="submission_error",
value=1,
unit="Count",
dimensions=self.metrics_dimensions,
)
)

logger.info(
f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' "
f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}"
Expand Down Expand Up @@ -416,11 +441,30 @@ def finalize_items(self) -> None:
item_submission.dspace_handle = result_message.dspace_handle
sqs_results_summary["ingest_success"] += 1
logger.debug(f"Record {log_str} was ingested")

self.metrics_client.publish_metric(
Metric(
name="ingested_item",
value=1,
unit="Count",
dimensions=self.metrics_dimensions,
)
)
elif result_message.result_type == "error":
item_submission.status = ItemSubmissionStatus.INGEST_FAILED
item_submission.status_details = result_message.error_info
sqs_results_summary["ingest_failed"] += 1
logger.debug(f"Record {log_str} failed to ingest")

self.metrics_client.publish_metric(
Metric(
name="ingest_error",
value=1,
unit="Count",
dimensions=self.metrics_dimensions,
)
)

else:
item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN
sqs_results_summary["ingest_unknown"] += 1
Expand Down
Loading