From 5b6bc1f2898abbec0567d69593dea9e0e5a2273f Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 19:01:10 +0200 Subject: [PATCH 01/32] [gcp] Add package deployment type to distinguish between gen1 and gen2 --- configs/cpp.json | 1 + configs/example.json | 1 + configs/java.json | 1 + configs/nodejs.json | 1 + configs/python.json | 3 ++- sebs/gcp/config.py | 14 ++++++++++++++ 6 files changed, 20 insertions(+), 1 deletion(-) diff --git a/configs/cpp.json b/configs/cpp.json index e4715b6f..33dcdc5d 100644 --- a/configs/cpp.json +++ b/configs/cpp.json @@ -68,6 +68,7 @@ "project_name": "", "credentials": "", "configuration": { + "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/example.json b/configs/example.json index efcafb71..a2461cea 100644 --- a/configs/example.json +++ b/configs/example.json @@ -60,6 +60,7 @@ "project_name": "", "credentials": "", "configuration": { + "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/java.json b/configs/java.json index 5727d06a..f8c3445f 100644 --- a/configs/java.json +++ b/configs/java.json @@ -60,6 +60,7 @@ "project_name": "", "credentials": "", "configuration": { + "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/nodejs.json b/configs/nodejs.json index 8bdbefe1..01300022 100644 --- a/configs/nodejs.json +++ b/configs/nodejs.json @@ -68,6 +68,7 @@ "project_name": "", "credentials": "", "configuration": { + "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/python.json b/configs/python.json index f184d235..d619dce9 100644 --- a/configs/python.json +++ b/configs/python.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "container_deployment": false, "runtime": { "language": "python", "version": "3.11" @@ -68,6 +68,7 @@ "project_name": "", "credentials": "", "configuration": { + "package-deployment-type": "function-gen2", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index 5143431d..d4a4bb8d 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -437,6 +437,7 @@ def __init__(self) -> None: self._function_gen1_config: GCPFunctionGen1Config self._function_gen2_config: GCPFunctionGen2Config self._container_config: GCPContainerConfig + self._package_deployment_type: str = "function-gen1" @staticmethod def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: @@ -453,6 +454,13 @@ def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: config._function_gen1_config = GCPFunctionGen1Config.deserialize(dct["function-gen1"]) config._function_gen2_config = GCPFunctionGen2Config.deserialize(dct["function-gen2"]) config._container_config = GCPContainerConfig.deserialize(dct["container"]) + config._package_deployment_type = dct.get("package-deployment-type", "function-gen1") + + if config._package_deployment_type not in ("function-gen1", "function-gen2"): + raise ValueError( + "Invalid GCP package deployment type " + f"{config._package_deployment_type}. Expected function-gen1 or function-gen2." + ) return config @@ -466,6 +474,7 @@ def serialize(self) -> Dict: out["function-gen1"] = self._function_gen1_config.serialize() out["function-gen2"] = self._function_gen2_config.serialize() out["container"] = self._container_config.serialize() + out["package-deployment-type"] = self._package_deployment_type return out @property @@ -495,6 +504,11 @@ def container_config(self) -> GCPContainerConfig: """ return self._container_config + @property + def package_deployment_type(self) -> str: + """Get the package deployment selector used when container mode is disabled.""" + return self._package_deployment_type + class GCPResources(Resources): """Resource manager for serverless resources on Google Cloud Platform. From f61eb44312f8afa90569b00d3fe17c0de832fb73 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 19:02:39 +0200 Subject: [PATCH 02/32] [gcp] Expose system variant for gen2 in the CLI --- sebs/cli.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sebs/cli.py b/sebs/cli.py index ea4d026c..5f73a161 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -125,8 +125,14 @@ def common_params(func): ) @click.option( "--container-deployment/--no-container-deployment", - default=False, - help="Deploy functions as container images (AWS only).", + default=None, + help="Override whether functions should be deployed as container images.", + ) + @click.option( + "--system-variant", + default=None, + type=str, + help="Optional system-specific deployment variant interpreted by the selected platform.", ) @click.option( "--resource-prefix", @@ -158,6 +164,7 @@ def parse_common_params( language_variant, architecture, container_deployment, + system_variant, resource_prefix: Optional[str] = None, initialize_deployment: bool = True, ignore_cache: bool = False, @@ -188,6 +195,16 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "architecture"], architecture) update_nested_dict(config_obj, ["experiments", "container_deployment"], container_deployment) + selected_deployment = config_obj.get("deployment", {}).get("name") + if selected_deployment == "gcp": + update_nested_dict( + config_obj, + ["deployment", "gcp", "configuration", "package-deployment-type"], + system_variant, + ) + else: + raise RuntimeError(f"Unsupported deployment {selected_deployment} for system variant configuration.") + # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) From 75c99365f648c59b459d1bcde952beec8e895763 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 20:07:25 +0200 Subject: [PATCH 03/32] [system] Add new functionalities to support multiple system variants --- sebs/faas/system.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sebs/faas/system.py b/sebs/faas/system.py index d76d3e48..7fd7e233 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -154,6 +154,10 @@ def container_client(self) -> DockerContainer | None: """ return None + def system_variant_suffix(self, container_deployment: bool) -> Optional[str]: + """Return an optional provider-local system variant suffix.""" + return None + @property def system_resources(self) -> SystemResources: """ @@ -489,6 +493,14 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) ) self.logging.error(e) is_function_cached = False + else: + err_msg = self.can_reuse_cached_function(function, code_package) + if err_msg is not None: + self.logging.info( + f"Cached function {func_name} is not compatible with the current " + f"deployment configuration and will be replaced. Reason: {err_msg}" + ) + is_function_cached = False # Create new function if not cached or deserialize failed if not is_function_cached: @@ -614,6 +626,20 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma return changed + def can_reuse_cached_function( + self, cached_function: Function, benchmark: Benchmark + ) -> Optional[str]: + """Check whether a cached function can be reused as-is. + + Args: + cached_function: Cached function selected from SeBS cache. + benchmark: Benchmark requesting the function. + + Returns: + string explaining why the function cannot be reused, or None if it can be reused. + """ + return None + @abstractmethod def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None From e29485d7767b3b4793b99fe6202938186f6b6d53 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 20:10:21 +0200 Subject: [PATCH 04/32] [system] Support system variant suffices in function names --- sebs/benchmark.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sebs/benchmark.py b/sebs/benchmark.py index e0173e79..57b6d46f 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -477,9 +477,12 @@ def cache_language_key(self) -> str: Add language variant to the cache key so that different variants of the same language don't conflict in cache. """ - if self._language_variant == "default": - return self._language.value - return f"{self._language.value}_{self._language_variant}" + base_key = self._language.value + if self._language_variant != "default": + base_key = f"{base_key}_{self._language_variant}" + if self._system_variant_suffix: + return f"{base_key}_{self._system_variant_suffix}" + return base_key @property def language_version(self) -> str: @@ -579,6 +582,7 @@ def __init__( output_dir: str, cache_client: Cache, docker_client: docker.client.DockerClient, + system_variant_suffix: Optional[str] = None, verbose: bool = False, ): """ @@ -596,6 +600,7 @@ def __init__( output_dir: Directory for output files cache_client: Cache client for caching code packages docker_client: Docker client for building dependencies + system_variant_suffix: Optional provider-local system variant suffix verbose: Print verbose build logs. Raises: @@ -611,6 +616,7 @@ def __init__( self._language_variant = config.runtime.variant.value self._architecture = self._experiment_config.architecture self._container_deployment = config.container_deployment + self._system_variant_suffix = system_variant_suffix self._verbose = verbose benchmark_path = find_benchmark(self.benchmark, "benchmarks") @@ -639,7 +645,15 @@ def __init__( self._language_variant, self._language_version, self._architecture, - "container" if self._container_deployment else "package", + ( + "container" + if self._container_deployment + else ( + f"package_{self._system_variant_suffix}" + if self._system_variant_suffix + else "package" + ) + ), ) self._container_uri: Optional[str] = None From cb3a95028384a0703f637db2e40e2b8a44bc8675 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 20:10:26 +0200 Subject: [PATCH 05/32] [dev] Linting --- sebs/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sebs/cli.py b/sebs/cli.py index 5f73a161..1911b704 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -203,7 +203,9 @@ def parse_common_params( system_variant, ) else: - raise RuntimeError(f"Unsupported deployment {selected_deployment} for system variant configuration.") + raise RuntimeError( + f"Unsupported deployment {selected_deployment} for system variant configuration." + ) # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) From 4abda816ae95164566814fbff1ec0f94409283ae Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 20:10:42 +0200 Subject: [PATCH 06/32] [system] Forgotten function --- sebs/gcp/function.py | 9 +++++++++ sebs/sebs.py | 1 + 2 files changed, 10 insertions(+) diff --git a/sebs/gcp/function.py b/sebs/gcp/function.py index 5a71135c..51188426 100644 --- a/sebs/gcp/function.py +++ b/sebs/gcp/function.py @@ -47,6 +47,15 @@ def is_container(self) -> bool: """ return self == FunctionDeploymentType.CONTAINER + @staticmethod + def resolve( + container_deployment: bool, package_deployment_type: str + ) -> "FunctionDeploymentType": + """Resolve the effective GCP deployment type from experiment and GCP-local config.""" + if container_deployment: + return FunctionDeploymentType.CONTAINER + return FunctionDeploymentType.deserialize(package_deployment_type) + @staticmethod def deserialize(val: str) -> FunctionDeploymentType: """Deserialize a string value to a FunctionDeploymentEngine enum. diff --git a/sebs/sebs.py b/sebs/sebs.py index 5d40b103..25cc2596 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -368,6 +368,7 @@ def get_benchmark( self._output_dir, self.cache_client, self.docker_client, + deployment.system_variant_suffix(config.container_deployment), self.verbose, ) From abd97ec9d062eb3ede4aac5aad00fdecc10a3c82 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:06:44 +0200 Subject: [PATCH 07/32] [gcp] Implement basic functionalities of gen2 functions --- sebs/gcp/gcp.py | 953 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 762 insertions(+), 191 deletions(-) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index ff3dcc05..4ba78bff 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -33,6 +33,7 @@ import shutil import time import math +import urllib.request import zipfile from datetime import datetime, timezone from typing import Any, cast, Dict, Optional, Tuple, List, Type, Protocol, Union @@ -334,6 +335,185 @@ def download_metrics( ... +class CloudRunMetricsHelper: + """Helpers shared by Cloud Run services and Cloud Functions gen2.""" + + @staticmethod + def download_execution_metrics( + logging: ColoredWrapper, + project_name: str, + service_name: str, + start_time: int, + end_time: int, + requests: Dict, + label: str, + ) -> None: + """Download execution times from Cloud Run request logs.""" + import google.cloud.logging as gcp_logging + + timestamps = [] + for timestamp in [start_time, end_time + 1]: + utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) + timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + + logging_client = gcp_logging.Client() + entries = logging_client.list_entries( + filter_=( + 'resource.type = "cloud_run_revision" ' + 'logName = "projects/' + f'{project_name}/logs/run.googleapis.com%2Frequests" ' + f'resource.labels.service_name = "{service_name}" ' + f'timestamp >= "{timestamps[0]}" ' + f'timestamp <= "{timestamps[1]}"' + ), + page_size=1000, + ) + + found_metrics = 0 + total_entries = 0 + for entry in entries: + total_entries += 1 + trace_id = CloudRunMetricsHelper.extract_trace_id(entry) + if trace_id is None or trace_id not in requests: + continue + + execution_time_us = CloudRunMetricsHelper.extract_latency_us(entry) + if execution_time_us is None: + continue + + requests[trace_id].provider_times.execution = execution_time_us + found_metrics += 1 + + logging.info( + f"{label}: Received {total_entries} log entries, found time metrics for " + f"{found_metrics} out of {len(requests.keys())} invocations." + ) + + @staticmethod + def download_metrics( + project_name: str, + service_name: str, + start_time: int, + end_time: int, + metrics: Dict, + ) -> None: + """Download Cloud Run monitoring metrics.""" + available_metrics = [ + ("container/billable_instance_time", "delta", "double"), + ("container/instance_count", "gauge", "int64"), + ("container/max_request_concurrencies", "delta", "distribution"), + ("container/memory/utilizations", "delta", "distribution"), + ("container/cpu/utilizations", "delta", "distribution"), + ("container/cpu/allocation_time", "delta", "double"), + ("container/memory/allocation_time", "delta", "double"), + ("container/network/sent_bytes_count", "delta", "int64"), + ("container/network/received_bytes_count", "delta", "int64"), + ("container/startup_latencies", "delta", "distribution"), + ("request_count", "delta", "int64"), + ("request_latencies", "distribution", "distribution"), + ("request_latency/e2e_latencies", "delta", "distribution"), + ("request_latency/ingress_to_region", "delta", "distribution"), + ("request_latency/pending", "delta", "distribution"), + ("request_latency/response_egress", "delta", "distribution"), + ("request_latency/routing", "delta", "distribution"), + ("request_latency/user_execution", "delta", "distribution"), + ] + + client = monitoring_v3.MetricServiceClient() + project_path = client.common_project_path(project_name) + + _, end_time_seconds = math.modf(end_time) + _, start_time_seconds = math.modf(start_time) + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": int(end_time_seconds) + 300}, + "start_time": {"seconds": int(start_time_seconds)}, + } + ) + + for metric, kind, value_type in available_metrics: + metrics[metric] = [] + flt = ( + f'metric.type = "run.googleapis.com/{metric}" ' + f'AND resource.type = "cloud_run_revision" ' + f'AND resource.labels.service_name = "{service_name}"' + ) + list_request = monitoring_v3.ListTimeSeriesRequest( + name=project_path, + filter=flt, + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + for result in client.list_time_series(list_request): + revision = result.resource.labels.get("revision_name") + for point in result.points: + if value_type == "distribution": + sq_dev = point.value.distribution_value.sum_of_squared_deviation + metrics[metric].append( + { + "kind": kind, + "revision": revision, + "mean_value": point.value.distribution_value.mean, + "squared_deviations": sq_dev, + "count": point.value.distribution_value.count, + "ts": point.interval.end_time.timestamp(), + } + ) + else: + value: int | float + value = ( + point.value.int64_value + if value_type == "int64" + else point.value.double_value + ) + metrics[metric].append( + { + "revision": revision, + "value": value, + "kind": kind, + "ts": point.interval.end_time.timestamp(), + } + ) + + @staticmethod + def extract_trace_id(entry) -> Optional[str]: + """Extract the trace ID from a Cloud Run log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Trace ID if present, otherwise ``None``. + """ + trace = getattr(entry, "trace", None) + if not isinstance(trace, str) or "/traces/" not in trace: + return None + return trace.rsplit("/traces/", 1)[1] + + @staticmethod + def extract_latency_us(entry) -> Optional[int]: + """Extract request latency from a Cloud Run log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Request latency in microseconds, or ``None`` if unavailable. + """ + http_request = getattr(entry, "http_request", None) + if http_request is None: + return None + + latency = http_request.get("latency") + if not isinstance(latency, str): + return None + + try: + return int(float(latency[:-1]) * 1_000_000) + except (ValueError, TypeError): + return None + + class CloudFunctionGen1Strategy(DeploymentStrategy): """Deployment strategy for Google Cloud Functions Gen1.""" @@ -1429,174 +1609,517 @@ def download_execution_metrics( requests: Dict, ) -> None: """Download execution times for Cloud Run from request logs.""" - import google.cloud.logging as gcp_logging - service_name = function_name.replace("_", "-").lower() - - timestamps = [] - for timestamp in [start_time, end_time + 1]: - utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) - timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) - - logging_client = gcp_logging.Client() - entries = logging_client.list_entries( - filter_=( - 'resource.type = "cloud_run_revision" ' - 'logName = "projects/' - f'{self.config.project_name}/logs/run.googleapis.com%2Frequests" ' - f'resource.labels.service_name = "{service_name}" ' - f'timestamp >= "{timestamps[0]}" ' - f'timestamp <= "{timestamps[1]}"' - ), - page_size=1000, - ) - - found_metrics = 0 - total_entries = 0 - for entry in entries: - total_entries += 1 - trace_id = self._extract_trace_id(entry) - if trace_id is None or trace_id not in requests: - continue - - execution_time_us = self._extract_latency_us(entry) - if execution_time_us is None: - continue - - requests[trace_id].provider_times.execution = execution_time_us - found_metrics += 1 - - self.logging.info( - f"GCP Cloud Run: Received {total_entries} log entries, found time metrics for " - f"{found_metrics} out of {len(requests.keys())} invocations." + CloudRunMetricsHelper.download_execution_metrics( + self.logging, + self.config.project_name, + service_name, + start_time, + end_time, + requests, + "GCP Cloud Run", ) def download_metrics( self, function_name: str, start_time: int, end_time: int, metrics: Dict ) -> None: - """ - Download monitoring metrics for Cloud Functions Gen1. - Use metrics to find estimated values for maximum memory used, active instances - and network traffic. - https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions - """ - # (metric_path, kind) — kind is "distribution" or "int64" - available_metrics = [ - ("container/billable_instance_time", "delta", "double"), # seconds - ("container/instance_count", "gauge", "int64"), - ("container/max_request_concurrencies", "delta", "distribution"), - ("container/memory/utilizations", "delta", "distribution"), # fraction - ("container/cpu/utilizations", "delta", "distribution"), # fraction - ("container/cpu/allocation_time", "delta", "double"), # seconds - ("container/memory/allocation_time", "delta", "double"), # gigabyte-seconds - ("container/network/sent_bytes_count", "delta", "int64"), # bytes (delta) - ("container/network/received_bytes_count", "delta", "int64"), # bytes (delta) - ("container/startup_latencies", "delta", "distribution"), # ms, cold start - ("request_count", "delta", "int64"), - ("request_latencies", "distribution", "distribution"), # ms - ("request_latency/e2e_latencies", "delta", "distribution"), # ms - ("request_latency/ingress_to_region", "delta", "distribution"), # ms - ("request_latency/pending", "delta", "distribution"), # ms - ("request_latency/response_egress", "delta", "distribution"), # ms - ("request_latency/routing", "delta", "distribution"), # ms - ("request_latency/user_execution", "delta", "distribution"), # ms - ] - - client = monitoring_v3.MetricServiceClient() - project_name = client.common_project_path(self.config.project_name) + """Download monitoring metrics for a Cloud Run service. - _, end_time_seconds = math.modf(end_time) - _, start_time_seconds = math.modf(start_time) - interval = monitoring_v3.TimeInterval( - { - # some metrics are reported with a delay - "end_time": {"seconds": int(end_time_seconds) + 300}, - "start_time": {"seconds": int(start_time_seconds)}, - } + Args: + function_name: Name of the deployed service. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + metrics: Dictionary to populate with monitoring samples. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_metrics( + self.config.project_name, service_name, start_time, end_time, metrics ) - for metric, kind, value_type in available_metrics: - metrics[metric] = [] - # Filter on resource.type AND service_name server-side — much faster than - # pulling every revision in the project and filtering client-side. - flt = ( - f'metric.type = "run.googleapis.com/{metric}" ' - f'AND resource.type = "cloud_run_revision" ' - f'AND resource.labels.service_name = "{function_name}"' - ) - list_request = monitoring_v3.ListTimeSeriesRequest( - name=project_name, - filter=flt, - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - ) - for result in client.list_time_series(list_request): - revision = result.resource.labels.get("revision_name") - for point in result.points: - if value_type == "distribution": - sq_dev = point.value.distribution_value.sum_of_squared_deviation - metrics[metric].append( - { - "kind": kind, - "revision": revision, - "mean_value": point.value.distribution_value.mean, - "squared_deviations": sq_dev, - "count": point.value.distribution_value.count, - "ts": point.interval.end_time.timestamp(), - } - ) - else: - value: int | float - if value_type == "int64": - value = point.value.int64_value - else: - value = point.value.double_value - metrics[metric].append( - { - "revision": revision, - "value": value, - "kind": kind, - "ts": point.interval.end_time.timestamp(), - } - ) - @staticmethod - def _extract_trace_id(entry) -> Optional[str]: - """Extract the trace ID from a Cloud Run log entry. +class CloudFunctionGen2Strategy(DeploymentStrategy): + """Deployment strategy for Google Cloud Functions Gen2 package deployments.""" - Args: - entry: Log entry to inspect. + def __init__(self, config: GCPConfig, logging_handlers: ColoredWrapper): + """Initialize the Gen2 deployment strategy. - Returns: - Trace ID if present, otherwise ``None``. + Args: + config: GCP deployment configuration. + logging_handlers: Logging wrapper used for status reporting. """ - trace = getattr(entry, "trace", None) - if not isinstance(trace, str) or "/traces/" not in trace: - return None - return trace.rsplit("/traces/", 1)[1] + self.config = config + self.logging = logging_handlers + self.function_client = build("cloudfunctions", "v2", cache_discovery=False) + self.run_client = build("run", "v2", cache_discovery=False) @staticmethod - def _extract_latency_us(entry) -> Optional[int]: - """Extract request latency from a Cloud Run log entry in microseconds. + def get_full_function_name(project_name: str, location: str, func_name: str) -> str: + """Build the fully qualified Cloud Functions Gen2 resource name. Args: - entry: Log entry to inspect. + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. Returns: - Request latency in microseconds, or ``None`` if unavailable. + Fully qualified Cloud Functions Gen2 resource name. """ - http_request = getattr(entry, "http_request", None) - if http_request is None: - return None + return f"projects/{project_name}/locations/{location}/functions/{func_name}" - latency = http_request.get("latency") - if not isinstance(latency, str): - return None + def function_exists(self, project_name: str, location: str, func_name: str) -> Any: + """Check whether the Cloud Functions Gen2 resource exists. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. + Returns: + True if the function exists, otherwise False. + """ + full_resource_name = self.get_full_function_name(project_name, location, func_name) + get_req = ( + self.function_client.projects().locations().functions().get(name=full_resource_name) + ) try: - return int(float(latency[:-1]) * 1_000_000) - except (ValueError, TypeError): - return None + self._execute_with_retry(self.logging, get_req) + return True + except HttpError as e: + if e.resp.status == 404: + return False + raise RuntimeError(f"Error checking function existence: {e}") from None + + def _entry_point(self, code_package: Benchmark) -> str: + """Resolve the runtime entry point for a benchmark package. + + Args: + code_package: Benchmark package being deployed. + + Returns: + Entry point name expected by the deployed runtime. + """ + return ( + "org.serverlessbench.Handler" if code_package.language == Language.JAVA else "handler" + ) + + def _runtime(self, code_package: Benchmark) -> str: + """Resolve the Cloud Functions Gen2 runtime identifier. + + Args: + code_package: Benchmark package being deployed. + + Returns: + Runtime identifier string for the GCP API. + """ + return code_package.language_name + code_package.language_version.replace(".", "") + + def _service_config( + self, benchmark_config: BenchmarkConfig | FunctionConfig, envs: Dict + ) -> Dict: + """Build the Gen2 service configuration payload. + + Args: + benchmark_config: Benchmark or function configuration with memory and timeout. + envs: Environment variables to configure on the service. + + Returns: + Service configuration payload for Cloud Functions Gen2. + """ + dep_config = self.config.deployment_config.function_gen2_config + return { + "availableMemory": f"{benchmark_config.memory}Mi", + "timeoutSeconds": benchmark_config.timeout, + "environmentVariables": envs, + "minInstanceCount": dep_config.min_instances, + "maxInstanceCount": dep_config.max_instances, + "availableCpu": str(dep_config.vcpus), + "maxInstanceRequestConcurrency": dep_config.gcp_concurrency, + "ingressSettings": "ALLOW_ALL", + "allTrafficOnLatestRevision": True, + } + + def _build_body( + self, + func_name: str, + code_package: Benchmark, + envs: Dict, + storage_source: Dict, + ) -> Dict: + """Build the full Cloud Functions Gen2 create or patch payload. + + Args: + func_name: Function name. + code_package: Benchmark package being deployed. + envs: Environment variables for the service. + storage_source: Uploaded source archive descriptor. + + Returns: + Full function resource payload. + """ + return { + "name": self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ), + "buildConfig": { + "runtime": self._runtime(code_package), + "entryPoint": self._entry_point(code_package), + "source": {"storageSource": storage_source}, + }, + "serviceConfig": self._service_config(code_package.benchmark_config, envs), + } + + def _generate_upload_url(self) -> Dict: + """Request a signed upload URL for a Gen2 source archive. + + Returns: + Upload metadata returned by the Cloud Functions Gen2 API. + """ + parent = f"projects/{self.config.project_name}/locations/{self.config.region}" + req = ( + self.function_client.projects() + .locations() + .functions() + .generateUploadUrl(parent=parent, body={"environment": "GEN_2"}) + ) + return self._execute_with_retry(self.logging, req) + + def _upload_zip_archive(self, package_path: str) -> Dict: + """Upload a ZIP archive to the signed Gen2 source upload URL. + + Args: + package_path: Path to the ZIP archive to upload. + + Returns: + Storage source descriptor referencing the uploaded archive. + """ + upload_info = self._generate_upload_url() + with open(package_path, "rb") as package_fp: + request = urllib.request.Request( + upload_info["uploadUrl"], + data=package_fp.read(), + method="PUT", + headers={"Content-Type": "application/zip"}, + ) + with urllib.request.urlopen(request) as response: + if response.status not in (200, 201): + raise RuntimeError( + f"Upload of package archive failed with HTTP {response.status}" + ) + return cast(Dict, upload_info["storageSource"]) + + def create( + self, + func_name: str, + code_package: Benchmark, + function_cfg: FunctionConfig, + envs: Dict, + container_uri: str | None, + ) -> None: + """Create a new Cloud Functions Gen2 deployment. + + Args: + func_name: Function name to create. + code_package: Benchmark package with code to deploy. + function_cfg: Function configuration. + envs: Environment variables for the function service. + container_uri: Unused for package deployments. + """ + if code_package.code_location is None: + raise RuntimeError("Code location is not set for GCP deployment") + + parent = f"projects/{self.config.project_name}/locations/{self.config.region}" + storage_source = self._upload_zip_archive(code_package.code_location) + function_body = self._build_body(func_name, code_package, envs, storage_source) + create_req = ( + self.function_client.projects() + .locations() + .functions() + .create(parent=parent, functionId=func_name, body=function_body) + ) + self._operation_response = self._execute_with_retry(self.logging, create_req) + self.logging.info(f"Function {func_name} is creating through Cloud Functions Gen2") + + def update_code( + self, + function: GCPFunction, + code_package: Benchmark, + envs: Dict, + container_uri: str | None, + ) -> None: + """Update the code of an existing Cloud Functions Gen2 deployment. + + Args: + function: Existing deployed function. + code_package: New benchmark package to upload. + envs: Environment variables for the updated service. + container_uri: Unused for package deployments. + """ + if code_package.code_location is None: + raise RuntimeError("Code location is not set for GCP deployment") + + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + storage_source = self._upload_zip_archive(code_package.code_location) + function_body = self._build_body(function.name, code_package, envs, storage_source) + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + body=function_body, + updateMask="buildConfig.runtime,buildConfig.entryPoint," + "buildConfig.source.storageSource,serviceConfig", + ) + ) + self._operation_response = self._execute_with_retry(self.logging, req) + self.logging.info(f"Function {function.name} code update initiated for Gen2") + + def update_config(self, function: GCPFunction, envs: Dict) -> int: + """Update configuration of an existing Cloud Functions Gen2 deployment. + + Args: + function: Deployed function to update. + envs: Full environment variable map to apply. + + Returns: + Placeholder version value for interface compatibility. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + body = {"serviceConfig": self._service_config(function.config, envs)} + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + body=body, + updateMask="serviceConfig", + ) + ) + self._operation_response = self._execute_with_retry(self.logging, req) + self.wait_for_deployment(function.name) + return 0 + + def wait_for_deployment(self, func_name: str) -> None: + """Wait for the active create or patch operation to complete. + + Args: + func_name: Function name being deployed. + """ + if not hasattr(self, "_operation_response"): + raise RuntimeError("No operation to wait for - create/update not called") + + op_name = self._operation_response["name"] + begin = time.time() + while True: + op_req = self.function_client.projects().locations().operations().get(name=op_name) + op_res = self._execute_with_retry(self.logging, op_req) + if op_res.get("done"): + if "error" in op_res: + raise RuntimeError(f"Cloud Functions Gen2 deployment failed: {op_res['error']}") + break + if time.time() - begin > 600: + raise RuntimeError(f"Timeout waiting for Cloud Functions Gen2 operation {op_name}") + time.sleep(3) + + self._wait_for_active_status(func_name) + delattr(self, "_operation_response") + + def _wait_for_active_status(self, func_name: str, timeout: int = 300) -> None: + """Poll the Gen2 function until it reaches the ACTIVE state. + + Args: + func_name: Function name to monitor. + timeout: Maximum wait time in seconds. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + begin = time.time() + last_state: Optional[str] = None + while True: + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + state = func_details.get("state") + if state != last_state: + self.logging.info(f"Function {func_name} state: {state}") + last_state = cast(Optional[str], state) + + if state == "ACTIVE" and func_details.get("serviceConfig", {}).get("uri"): + return + if state in ("FAILED", "UNKNOWN"): + raise RuntimeError( + f"Function {func_name} deployment failed with state {state}: " + f"{func_details.get('stateMessages', [])}" + ) + if time.time() - begin > timeout: + raise RuntimeError( + "Timeout waiting for function " + f"{func_name} to become ACTIVE. Last state: {state}" + ) + time.sleep(3) + + def allow_public_access(self, project_name: str, location: str, func_name: str) -> None: + """Grant public invocation access to the underlying Cloud Run service. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name whose backing service should be public. + """ + service_name = func_name.replace("_", "-").lower() + full_service_name = f"projects/{project_name}/locations/{location}/services/{service_name}" + req = ( + self.run_client.projects() + .locations() + .services() + .setIamPolicy( + resource=full_service_name, + body={ + "policy": { + "bindings": [ + { + "role": "roles/run.invoker", + "members": ["allUsers"], + } + ] + } + }, + ) + ) + self._execute_with_retry(self.logging, req) + + def create_trigger(self, func_name: str) -> str: + """Return the HTTPS trigger URL for a Gen2 function. + + Args: + func_name: Function name. + + Returns: + Public invoke URL for the function. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + invoke_url = func_details["serviceConfig"]["uri"] + self.logging.info(f"Function {func_name} - HTTP trigger URL: {invoke_url}") + return invoke_url + + def update_envs(self, full_function_name: str, envs: Dict) -> Dict: + """Merge new environment variables with existing Gen2 service variables. + + Args: + full_function_name: Fully qualified function name. + envs: New environment variables to add or override. + + Returns: + Merged environment variables dictionary. + """ + req = self.function_client.projects().locations().functions().get(name=full_function_name) + response = self._execute_with_retry(self.logging, req) + existing_envs = response.get("serviceConfig", {}).get("environmentVariables", {}) + return {**existing_envs, **envs} + + def generate_runtime_envs(self) -> Dict: + """Return runtime environment overrides for Gen2 package deployments. + + Returns: + Environment variables controlling Gunicorn worker settings. + """ + dep_config = self.config.deployment_config.function_gen2_config + return { + "GUNICORN_WORKERS": str(dep_config.worker_concurrency), + "GUNICORN_THREADS": str(dep_config.worker_threads), + } + + def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: + """Check whether a Gen2 function is deployed and ready. + + Args: + func_name: Function name to inspect. + versionId: Unused for Gen2 deployments. + + Returns: + Tuple of readiness flag and placeholder version value. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + try: + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + is_ready = func_details.get("state") == "ACTIVE" and "uri" in func_details.get( + "serviceConfig", {} + ) + return (is_ready, 0) + except HttpError: + return (False, -1) + + def delete_function(self, func_name: str) -> None: + """Delete a Cloud Functions Gen2 deployment. + + Args: + func_name: Function name to delete. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + try: + req = ( + self.function_client.projects().locations().functions().delete(name=full_func_name) + ) + self._execute_with_retry(self.logging, req) + except HttpError as e: + if e.resp.status != 404: + raise + + def download_execution_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict, + ) -> None: + """Download execution timings for a Gen2 function from request logs. + + Args: + function_name: Function name whose backing service is queried. + start_time: Start timestamp for log collection. + end_time: End timestamp for log collection. + requests: Invocation results keyed by request ID. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_execution_metrics( + self.logging, + self.config.project_name, + service_name, + start_time, + end_time, + requests, + "GCP Cloud Functions Gen2", + ) + + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """Download monitoring metrics for a Gen2 function. + + Args: + function_name: Function name whose backing service is queried. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + metrics: Dictionary to populate with monitoring samples. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_metrics( + self.config.project_name, service_name, start_time, end_time, metrics + ) class GCP(System): @@ -1703,6 +2226,7 @@ def initialize( self.cloud_function_gen1_strategy = CloudFunctionGen1Strategy( storage, self._config, self.logging ) + self.cloud_function_gen2_strategy = CloudFunctionGen2Strategy(self._config, self.logging) self.run_container_strategy = RunContainerStrategy(self._config, self.logging) self.gcr_client = GCRContainer(self.system_config, self.config, self.docker_client) @@ -1732,6 +2256,50 @@ def get_run_client(self): """ return self.run_container_strategy.run_client + def _resolve_deployment_type(self, container_deployment: bool) -> FunctionDeploymentType: + """Resolve the effective GCP deployment type for a benchmark. + + Args: + container_deployment: Whether the experiment selected container mode. + + Returns: + Effective deployment type after applying GCP-local package settings. + """ + return FunctionDeploymentType.resolve( + container_deployment, self.config.deployment_config.package_deployment_type + ) + + def system_variant_suffix(self, container_deployment: bool) -> Optional[str]: + """Return a provider-local system variant suffix for GCP package variants. + + Args: + container_deployment: Whether the benchmark uses container deployment. + + Returns: + Short suffix for GCP package variants, otherwise ``None``. + """ + deployment_type = self._resolve_deployment_type(container_deployment) + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + return "gen1" + if deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + return "gen2" + return None + + def _strategy_for_deployment_type(self, deployment_type: FunctionDeploymentType): + """Return the deployment strategy for a resolved GCP deployment type. + + Args: + deployment_type: Effective deployment type to dispatch. + + Returns: + Deployment strategy object handling the requested mode. + """ + if deployment_type == FunctionDeploymentType.CONTAINER: + return self.run_container_strategy + if deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + return self.cloud_function_gen2_strategy + return self.cloud_function_gen1_strategy + def _get_deployment_config( self, deployment_type: FunctionDeploymentType ) -> Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig]: @@ -1769,6 +2337,15 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma # Check if deployment config has changed cached_function = cast(GCPFunction, cached_function) + expected_deployment_type = self._resolve_deployment_type(benchmark.container_deployment) + if cached_function.deployment_type != expected_deployment_type: + self.logging.info( + f"Deployment type has changed for {cached_function.name}: " + f"cached function uses {cached_function.deployment_type.value}, " + f"requested deployment is {expected_deployment_type.value}." + ) + changed = True + current_dep_config = self._get_deployment_config(cached_function.deployment_type) if cached_function.deployment_config != current_dep_config: @@ -1780,6 +2357,29 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma return changed + def can_reuse_cached_function( + self, cached_function: Function, benchmark: Benchmark + ) -> Optional[str]: + """Check whether a cached GCP function matches the requested deployment mode. + + Args: + cached_function: Cached function selected from SeBS cache. + benchmark: Benchmark requesting the function. + + Returns: + str: if the cached function does not fit the requested deployment type. + """ + gcp_function = cast(GCPFunction, cached_function) + expected_deployment_type = self._resolve_deployment_type(benchmark.container_deployment) + + if gcp_function.deployment_type != expected_deployment_type: + return ( + f"cached deployment type {gcp_function.deployment_type.value} " + f"does not match requested deployment type {expected_deployment_type.value}" + ) + + return None + def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: @@ -1807,11 +2407,16 @@ def default_function_name( code_package.language_version, code_package.architecture, ) - if code_package.container_deployment: + deployment_type = self._resolve_deployment_type(code_package.container_deployment) + if deployment_type == FunctionDeploymentType.CONTAINER: func_name = f"{func_name}-docker" + elif deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + func_name = f"{func_name}-gen1" + elif deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + func_name = f"{func_name}-gen2" return ( GCP.format_function_name(func_name) - if not code_package.container_deployment + if deployment_type != FunctionDeploymentType.CONTAINER else func_name.replace(".", "-") ) @@ -1968,22 +2573,12 @@ def create_function( if architecture == "arm64": raise RuntimeError("GCP does not support arm64 deployments") - # Select deployment strategy - strategy = ( - self.run_container_strategy - if container_deployment - else self.cloud_function_gen1_strategy - ) + deployment_type = self._resolve_deployment_type(container_deployment) + strategy = self._strategy_for_deployment_type(deployment_type) # Check if function/service already exists function_exists = strategy.function_exists(project_name, location, func_name) - deployment_type = ( - FunctionDeploymentType.CONTAINER - if container_deployment - else FunctionDeploymentType.FUNCTION_GEN1 - ) - dep_config = self._get_deployment_config(deployment_type) if not function_exists: # Create new function/service @@ -1998,7 +2593,7 @@ def create_function( strategy.wait_for_deployment(func_name) strategy.allow_public_access(project_name, location, func_name) - if not container_deployment: + if not deployment_type.is_container: storage_client = self._system_resources.get_storage() code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) function = GCPFunction( @@ -2042,7 +2637,7 @@ def create_function( # Add LibraryTrigger to a new function # Not supported on containers - if not container_deployment: + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: from sebs.gcp.triggers import LibraryTrigger trigger = LibraryTrigger(func_name, self, function.deployment_type) @@ -2077,11 +2672,7 @@ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) self.logging.info(f"Function {function.name} - waiting for deployment...") # Select deployment strategy - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) # Get trigger URL from strategy invoke_url = strategy.create_trigger(function.name) @@ -2143,11 +2734,7 @@ def update_function( function = cast(GCPFunction, function) # Select deployment strategy - strategy = ( - self.run_container_strategy - if container_deployment - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) # Generate environment variables envs = { @@ -2212,11 +2799,7 @@ def update_function_configuration( function = cast(GCPFunction, function) # Select deployment strategy - strategy = ( - self.run_container_strategy - if code_package.container_deployment - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) # Get full resource name for env merging full_func_name = strategy.get_full_function_name( @@ -2252,11 +2835,7 @@ def delete_function(self, func_name: str, function: Dict) -> None: # Select deployment strategy based on function name # v1 functions don't allow hyphens, new functions don't allow underscores gcp_function = GCPFunction.deserialize(function) - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) strategy.delete_function(func_name) @@ -2296,11 +2875,7 @@ def download_metrics( function = GCPFunction.deserialize(functions[function_name]) - strategy = ( - self.run_container_strategy - if function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) strategy.download_execution_metrics(function_name, start_time, end_time, requests) strategy.download_metrics(function_name, start_time, end_time, metrics) @@ -2414,11 +2989,7 @@ def is_deployed(self, function: Function, versionId: int = -1) -> Tuple[bool, in # Select deployment strategy based on function name # v1 functions don't allow hyphens, new functions don't allow underscores gcp_function = cast(GCPFunction, function) - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) return strategy.is_deployed(function.name, versionId) From e1da8e76a9a0ef5932d6dbea1980c6f943f71c74 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:12:24 +0200 Subject: [PATCH 08/32] [system] Strengthen cache with per-instance locks and atomic replacement File replacement avoids the problem of writing half of the JSON file and then crashing --- sebs/cache.py | 158 +++++++++++++++++++++++++++++++------------------- 1 file changed, 99 insertions(+), 59 deletions(-) diff --git a/sebs/cache.py b/sebs/cache.py index 78f4a8f4..8114e30c 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -29,6 +29,7 @@ import os import shutil import threading +import tempfile from typing import Any, Callable, Dict, List, Mapping, Optional, TYPE_CHECKING # noqa from sebs.utils import LoggingBase, serialize @@ -113,6 +114,9 @@ class Cache(LoggingBase): docker_client (docker.DockerClient): Docker client for container operations. """ + _lock_registry_guard = threading.Lock() + _lock_registry: Dict[str, threading.RLock] = {} + def __init__(self, cache_dir: str, docker_client: docker.DockerClient) -> None: """Initialize the Cache with directory and Docker client. @@ -131,7 +135,7 @@ def __init__(self, cache_dir: str, docker_client: docker.DockerClient) -> None: self.cache_dir = os.path.abspath(cache_dir) self.ignore_functions: bool = False self.ignore_storage: bool = False - self._lock = threading.RLock() + self._lock = self._cache_dir_lock(self.cache_dir) if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir, exist_ok=True) else: @@ -146,6 +150,48 @@ def typename() -> str: """ return "Cache" + @classmethod + def _cache_dir_lock(cls, cache_dir: str) -> threading.RLock: + """Return a shared lock for all Cache instances pointing at one cache dir.""" + with cls._lock_registry_guard: + if cache_dir not in cls._lock_registry: + cls._lock_registry[cache_dir] = threading.RLock() + return cls._lock_registry[cache_dir] + + @staticmethod + def _write_json_atomic(path: str, data: Any) -> None: + """Atomically replace a JSON file after fully writing it to a temp file.""" + directory = os.path.dirname(path) + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") + try: + with os.fdopen(fd, "w") as fp: + json.dump(data, fp, indent=2) + fp.flush() + os.fsync(fp.fileno()) + os.replace(tmp_path, path) + except Exception: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + @staticmethod + def _write_serialized_atomic(path: str, data: Dict[str, Any]) -> None: + """Atomically replace a JSON file using the SeBS serializer.""" + directory = os.path.dirname(path) + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") + try: + with os.fdopen(fd, "w") as fp: + fp.write(serialize(data)) + fp.flush() + os.fsync(fp.fileno()) + os.replace(tmp_path, path) + except Exception: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + def load_config(self) -> None: """Load cached cloud configurations from disk. @@ -200,12 +246,12 @@ def shutdown(self) -> None: JSON files in the cache directory. """ if self.config_updated: - for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: - if cloud in self.cached_config: - cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) - self.logging.info("Update cached config {}".format(cloud_config_file)) - with open(cloud_config_file, "w") as out: - json.dump(self.cached_config[cloud], out, indent=2) + with self._lock: + for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: + if cloud in self.cached_config: + cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) + self.logging.info("Update cached config {}".format(cloud_config_file)) + self._write_json_atomic(cloud_config_file, self.cached_config[cloud]) def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict[str, Any]]: """Access cached configuration of a benchmark. @@ -217,13 +263,14 @@ def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict Returns: Optional[Dict[str, Any]]: Benchmark configuration or None if not found. """ - benchmark_dir = os.path.join(self.cache_dir, benchmark) - if os.path.exists(benchmark_dir): - config_file = os.path.join(benchmark_dir, "config.json") - if os.path.exists(config_file): - with open(config_file, "r") as fp: - cfg = json.load(fp) - return cfg[deployment] if deployment in cfg else None + with self._lock: + benchmark_dir = os.path.join(self.cache_dir, benchmark) + if os.path.exists(benchmark_dir): + config_file = os.path.join(benchmark_dir, "config.json") + if os.path.exists(config_file): + with open(config_file, "r") as fp: + cfg = json.load(fp) + return cfg[deployment] if deployment in cfg else None return None def get_code_package( @@ -322,8 +369,7 @@ def invalidate_all_container_uris(self, deployment: str) -> None: modified = True if modified: - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def update_container_uri( self, @@ -358,8 +404,7 @@ def update_container_uri( key = f"{language_version}-{architecture}" config[deployment][language]["containers"][key]["image-uri"] = uri - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def get_functions( self, deployment: str, benchmark: str, language: str @@ -398,25 +443,26 @@ def get_all_functions(self, deployment: str) -> Dict[str, Any]: if not os.path.exists(self.cache_dir) or self.ignore_functions: return result - for entry in os.listdir(self.cache_dir): - config_path = os.path.join(self.cache_dir, entry, "config.json") - if not os.path.exists(config_path): - continue - - with open(config_path, "r") as fp: - config = json.load(fp) + with self._lock: + for entry in os.listdir(self.cache_dir): + config_path = os.path.join(self.cache_dir, entry, "config.json") + if not os.path.exists(config_path): + continue - dep_cfg = config.get(deployment) - if dep_cfg is None: - continue + with open(config_path, "r") as fp: + config = json.load(fp) - for lang_cfg in dep_cfg.values(): - if lang_cfg is None: + dep_cfg = config.get(deployment) + if dep_cfg is None: continue - functions = lang_cfg.get("functions") - if functions is not None: - result.update(functions) + for lang_cfg in dep_cfg.values(): + if lang_cfg is None: + continue + + functions = lang_cfg.get("functions") + if functions is not None: + result.update(functions) return result @@ -461,21 +507,22 @@ def get_nosql_configs(self, deployment: str) -> Dict[str, Any]: if not os.path.exists(self.cache_dir): return result - for entry in os.listdir(self.cache_dir): - config_path = os.path.join(self.cache_dir, entry, "config.json") - if not os.path.exists(config_path): - continue + with self._lock: + for entry in os.listdir(self.cache_dir): + config_path = os.path.join(self.cache_dir, entry, "config.json") + if not os.path.exists(config_path): + continue - with open(config_path, "r") as fp: - config = json.load(fp) + with open(config_path, "r") as fp: + config = json.load(fp) - dep_cfg = config.get(deployment) - if dep_cfg is None: - continue + dep_cfg = config.get(deployment) + if dep_cfg is None: + continue - nosql = dep_cfg.get("nosql") - if nosql is not None: - result.update(nosql) + nosql = dep_cfg.get("nosql") + if nosql is not None: + result.update(nosql) return result @@ -549,8 +596,7 @@ def remove_function(self, deployment: str, benchmark: str, language: str, functi self.logging.info(f"Deleting function {function_name} from cache") del lang_cfg["functions"][function_name] - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def remove_storage(self, deployment: str): """Remove storage config entries across all benchmarks for a deployment. @@ -589,8 +635,7 @@ def _remove_resource_config(self, deployment: str, resource: str): if deployment in config and resource in config[deployment]: del config[deployment][resource] - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def get_config_key(self, keys: List[str]) -> Optional[Any]: """Return the value at a nested key path in the cached configuration. @@ -666,8 +711,7 @@ def _update_resources( else: cached_config[deployment] = {resource: config} - with open(config_file, "w") as fp: - json.dump(cached_config, fp, indent=2) + self._write_json_atomic(config_file, cached_config) def add_code_package( self, @@ -791,8 +835,7 @@ def add_code_package( # language unknown, platform unknown - add new dictionary cached_config[deployment_name] = config[deployment_name] config = cached_config - with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: # TODO: update @@ -904,8 +947,7 @@ def update_code_package( "image-uri" ] = code_package.container_uri - with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: self.add_code_package(deployment_name, code_package) @@ -949,8 +991,7 @@ def add_function( functions_config ) config = cached_config - with open(cache_config, "w") as fp: - fp.write(serialize(config)) + self._write_serialized_atomic(cache_config, config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) @@ -986,8 +1027,7 @@ def update_function(self, function: "Function") -> None: cached_config[deployment][language]["functions"][ name ] = function.serialize() - with open(cache_config, "w") as fp: - fp.write(serialize(cached_config)) + self._write_serialized_atomic(cache_config, cached_config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) From 1ff2b79c62f6c0fb98ee4d1b1244f2c89d7b0657 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:29:17 +0200 Subject: [PATCH 09/32] [dev] Linting --- sebs/gcp/config.py | 4 ++-- sebs/gcp/gcp.py | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index d4a4bb8d..05da2bf8 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -22,7 +22,7 @@ import json import os -from typing import cast, Dict, List, Optional, Tuple +from typing import cast, Any, Dict, List, Optional, Tuple import time from googleapiclient.errors import HttpError @@ -470,7 +470,7 @@ def serialize(self) -> Dict: Returns: Dictionary representation of resources for cache storage """ - out = {} + out: Dict[str, Any] = {} out["function-gen1"] = self._function_gen1_config.serialize() out["function-gen2"] = self._function_gen2_config.serialize() out["container"] = self._container_config.serialize() diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 4ba78bff..edbc9146 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -1834,7 +1834,9 @@ def create( self.function_client.projects() .locations() .functions() - .create(parent=parent, functionId=func_name, body=function_body) + .create( + parent=parent, functionId=func_name, body=function_body # type: ignore[arg-type] + ) ) self._operation_response = self._execute_with_retry(self.logging, create_req) self.logging.info(f"Function {func_name} is creating through Cloud Functions Gen2") @@ -1868,7 +1870,7 @@ def update_code( .functions() .patch( name=full_func_name, - body=function_body, + body=function_body, # type: ignore[arg-type] updateMask="buildConfig.runtime,buildConfig.entryPoint," "buildConfig.source.storageSource,serviceConfig", ) @@ -1896,7 +1898,7 @@ def update_config(self, function: GCPFunction, envs: Dict) -> int: .functions() .patch( name=full_func_name, - body=body, + body=body, # type: ignore[arg-type] updateMask="serviceConfig", ) ) From 7db15cd21fca513e6f738779c8c8811f34768771 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:53:29 +0200 Subject: [PATCH 10/32] [system] First step towards ntroducing system variant --- sebs/config.py | 24 ++------------ sebs/experiments/config.py | 65 +++++++++++++++++++++++++++++++------- sebs/sebs.py | 16 +++------- 3 files changed, 61 insertions(+), 44 deletions(-) diff --git a/sebs/config.py b/sebs/config.py index 6f66e888..6ffc9ff5 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -166,27 +166,9 @@ def supported_variants(self, deployment_name: str, language_name: str) -> List[s languages = self._system_config.get(deployment_name, {}).get("languages", {}) return languages.get(language_name, {}).get("supported_variants", ["default"]) - def supported_package_deployment(self, deployment_name: str) -> bool: - """Check if package-based deployment is supported for a platform. - - Args: - deployment_name (str): Name of the deployment platform (e.g., 'aws', 'azure'). - - Returns: - bool: True if package deployment is supported, False otherwise. - """ - return "package" in self._system_config[deployment_name]["deployments"] - - def supported_container_deployment(self, deployment_name: str) -> bool: - """Check if container-based deployment is supported for a platform. - - Args: - deployment_name (str): Name of the deployment platform (e.g., 'aws', 'azure'). - - Returns: - bool: True if container deployment is supported, False otherwise. - """ - return "container" in self._system_config[deployment_name]["deployments"] + def supported_system_variants(self, deployment_name: str) -> List[str]: + """Return the supported deployment variants for a platform.""" + return self._system_config[deployment_name]["deployments"] def benchmark_base_images( self, deployment_name: str, language_name: str, architecture: str diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index edde88de..a8e919e4 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -18,6 +18,49 @@ from sebs.faas.function import Runtime +class SystemVariant: + """Deployment variant selected for an experiment. + + The variant is provider-specific, but exposes a shared ``is_container`` + property for logic that only cares about the package vs container split. + """ + + def __init__(self, value: str): + """Initialize the system variant. + + Args: + value: Provider-specific deployment variant name. + """ + self._value = value + + @property + def value(self) -> str: + """Get the provider-specific deployment variant name.""" + return self._value + + @property + def is_container(self) -> bool: + """Return whether this deployment variant uses containers.""" + return self._value == "container" + + def serialize(self) -> str: + """Serialize the deployment variant to a string.""" + return self._value + + @staticmethod + def deserialize(value: str | "SystemVariant") -> "SystemVariant": + """Deserialize a deployment variant from a string or passthrough object.""" + if isinstance(value, SystemVariant): + return value + return SystemVariant(value) + + def __eq__(self, other: object) -> bool: + """Compare two system variants.""" + if not isinstance(other, SystemVariant): + return False + return self.value == other.value + + class Config: """Configuration class for benchmark experiments. @@ -28,7 +71,7 @@ class Config: Attributes: _update_code: Whether to update function code _update_storage: Whether to update storage resources - _container_deployment: Whether to use container-based deployment + _system_variant: Deployment variant selected for the target provider _download_results: Whether to download experiment results _architecture: CPU architecture (e.g., "x64", "arm64") _flags: Dictionary of boolean flags for custom settings @@ -40,7 +83,7 @@ def __init__(self): """Initialize a new experiment configuration with default values.""" self._update_code: bool = False self._update_storage: bool = False - self._container_deployment: bool = False + self._system_variant = SystemVariant("package") self._download_results: bool = False self._architecture: str = "x64" self._flags: Dict[str, bool] = {} @@ -107,13 +150,9 @@ def architecture(self) -> str: return self._architecture @property - def container_deployment(self) -> bool: - """Get whether to use container-based deployment. - - Returns: - True if container-based deployment should be used, False otherwise - """ - return self._container_deployment + def system_variant(self) -> SystemVariant: + """Get the selected deployment variant.""" + return self._system_variant def experiment_settings(self, name: str) -> dict: """Get settings for a specific experiment. @@ -146,7 +185,7 @@ def serialize(self) -> dict: "flags": self._flags, "experiments": self._experiment_configs, "architecture": self._architecture, - "container_deployment": self._container_deployment, + "system_variant": self._system_variant.serialize(), } return out @@ -173,7 +212,11 @@ def deserialize(config: dict) -> "Config": cfg._update_code = config["update_code"] cfg._update_storage = config["update_storage"] cfg._download_results = config["download_results"] - cfg._container_deployment = config["container_deployment"] + if "system_variant" in config: + cfg._system_variant = SystemVariant.deserialize(config["system_variant"]) + else: + legacy_is_container = config.get("container_deployment", False) + cfg._system_variant = SystemVariant("container" if legacy_is_container else "package") cfg._runtime = Runtime.deserialize(config["runtime"]) cfg._flags = config["flags"] if "flags" in config else {} cfg._architecture = config["architecture"] diff --git a/sebs/sebs.py b/sebs/sebs.py index 25cc2596..366445dc 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -227,17 +227,9 @@ def get_deployment( ) ) - # Validate deployment type - container - if config["experiments"][ - "container_deployment" - ] and not self._config.supported_container_deployment(name): - raise RuntimeError(f"Container deployment is not supported in {name}.") - - # Validate deployment type - package - if not config["experiments"][ - "container_deployment" - ] and not self._config.supported_package_deployment(name): - raise RuntimeError(f"Code package deployment is not supported in {name}.") + selected_variant = config["experiments"]["system_variant"] + if selected_variant not in self._config.supported_system_variants(name): + raise RuntimeError(f"System variant {selected_variant} is not supported in {name}.") # Set up logging and create deployment configuration handlers = self.generate_logging_handlers(logging_filename) @@ -368,7 +360,7 @@ def get_benchmark( self._output_dir, self.cache_client, self.docker_client, - deployment.system_variant_suffix(config.container_deployment), + deployment.system_variant_suffix(config.system_variant), self.verbose, ) From bea57da731453f3f34580cdfdc0a6e3c9afd60f4 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:56:45 +0200 Subject: [PATCH 11/32] [system] Introduce system variant everywhere --- sebs/aws/aws.py | 21 ++++++++++--------- sebs/azure/azure.py | 17 +++++++-------- sebs/benchmark.py | 41 ++++++++++++++++--------------------- sebs/cache.py | 12 +++++------ sebs/faas/system.py | 29 +++++++++++++------------- sebs/local/local.py | 13 ++++++------ sebs/openwhisk/openwhisk.py | 15 +++++++------- 7 files changed, 73 insertions(+), 75 deletions(-) diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index c001e651..b4bae0f4 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -29,6 +29,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.utils import LoggingHandlers from sebs.faas.function import Function, ExecutionResult, Trigger, FunctionConfig from sebs.faas.system import System @@ -321,7 +322,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "LambdaFunction": """ @@ -333,8 +334,8 @@ def create_function( Args: code_package: Benchmark code package func_name: Name of the function - container_deployment: Whether to use container deployment - container_uri: URI of the container image (if container_deployment=True) + system_variant: Selected deployment variant + container_uri: URI of the container image (if container deployment is selected) Returns: LambdaFunction: The created or updated Lambda function @@ -366,7 +367,7 @@ def create_function( self.config.resources.lambda_role(self.session), function_cfg, ) - self.update_function(lambda_function, code_package, container_deployment, container_uri) + self.update_function(lambda_function, code_package, system_variant, container_uri) lambda_function.updated_code = True # TODO: get configuration of REST API except self.client.exceptions.ResourceNotFoundException: @@ -379,7 +380,7 @@ def create_function( "Code": {}, } - if container_deployment: + if system_variant.is_container: create_function_params["PackageType"] = "Image" create_function_params["Code"] = {"ImageUri": container_uri} self.logging.info( @@ -467,7 +468,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ): """ @@ -480,13 +481,13 @@ def update_function( Args: function: The function to update code_package: Benchmark code package - container_deployment: Whether to use container deployment - container_uri: URI of the container image (if container_deployment=True) + system_variant: Selected deployment variant + container_uri: URI of the container image (if container deployment is selected) """ name = function.name function = cast(LambdaFunction, function) - if container_deployment: + if system_variant.is_container: self.client.update_function_code(FunctionName=name, ImageUri=container_uri) else: code_size = code_package.code_size @@ -612,7 +613,7 @@ def default_function_name( code_package.language_version, code_package.architecture, ) - if code_package.container_deployment: + if code_package.system_variant.is_container: func_name = f"{func_name}-docker" return AWS.format_function_name(func_name) diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 9a55085e..762b372e 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -54,6 +54,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.utils import LoggingHandlers, execute from sebs.faas.function import Function, FunctionConfig, ExecutionResult from sebs.faas.system import System @@ -237,7 +238,7 @@ def package_code( architecture: Target architecture (currently unused) benchmark: Name of the benchmark is_cached: Whether the package is from cache - container_deployment: Whether to use container deployment + system_variant: Selected deployment variant Returns: Tuple of (directory_path, code_size_bytes, container_uri) @@ -504,7 +505,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update existing Azure Function with new code. @@ -517,14 +518,14 @@ def update_function( Args: function: Function instance to update code_package: New benchmark code package - container_deployment: Whether using container deployment + system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Raises: NotImplementedError: If container deployment is requested. """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") assert code_package.has_input_processed @@ -752,7 +753,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> AzureFunction: """Create new Azure Function. @@ -764,7 +765,7 @@ def create_function( Args: code_package: Benchmark code package to deploy func_name: Name for the Azure Function App - container_deployment: Whether to use container deployment + system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Returns: @@ -775,7 +776,7 @@ def create_function( RuntimeError: If function creation fails. """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") language = code_package.language_name @@ -852,7 +853,7 @@ def create_function( ) # update existing function app - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 57b6d46f..d584f479 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -33,7 +33,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from sebs.experiments.config import Config as ExperimentConfig + from sebs.experiments.config import Config as ExperimentConfig, SystemVariant class LanguageSpec: @@ -286,7 +286,7 @@ class Benchmark(LoggingBase): uses_storage: Whether the benchmark uses cloud storage uses_nosql: Whether the benchmark uses NoSQL databases architecture: CPU architecture of the deployment target - container_deployment: Whether using container deployment + system_variant: Deployment variant selected for the target provider """ @@ -535,14 +535,9 @@ def architecture(self) -> str: return self._architecture @property - def container_deployment(self) -> bool: - """ - Check if using container deployment. - - Returns: - bool: True if using container deployment, False otherwise - """ - return self._container_deployment + def system_variant(self) -> SystemVariant: + """Return the selected deployment variant for this benchmark.""" + return self._system_variant @property # noqa: A003 def hash(self) -> str: @@ -615,7 +610,7 @@ def __init__( assert config.runtime.variant is not None self._language_variant = config.runtime.variant.value self._architecture = self._experiment_config.architecture - self._container_deployment = config.container_deployment + self._system_variant = config.system_variant self._system_variant_suffix = system_variant_suffix self._verbose = verbose @@ -647,7 +642,7 @@ def __init__( self._architecture, ( "container" - if self._container_deployment + if self._system_variant.is_container else ( f"package_{self._system_variant_suffix}" if self._system_variant_suffix @@ -762,7 +757,7 @@ def query_cache(self) -> None: and deployment combination. Updates the cache status fields based on whether the cache exists and if it's still valid (hash matches). """ - if self.container_deployment: + if self.system_variant.is_container: self._code_package = self._cache_client.get_container( deployment=self._deployment_name, benchmark=self._benchmark, @@ -1379,7 +1374,7 @@ def build( container_client: DockerContainer | None, container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None, - ) -> Tuple[bool, str | None, bool, str | None]: + ) -> Tuple[bool, str | None, SystemVariant, str | None]: """Build the complete benchmark deployment package. Orchestrates the entire build process for a benchmark, including: @@ -1399,12 +1394,12 @@ def build( Tuple containing: - bool: Whether a new build was performed (False if cached) - str: Path to the built code package - - bool: Whether this is a container deployment + - SystemVariant: Selected deployment variant - str: Container URI (empty string if not container deployment) """ # Skip build if files are up to date and user didn't enforce rebuild if self.is_cached and self.is_cached_valid: - if self.container_deployment: + if self.system_variant.is_container: if self._container_uri is None: assert container_client is not None self._container_uri = container_client.push_to_registry( @@ -1426,12 +1421,12 @@ def build( self.benchmark, self.container_uri ) ) - return False, None, self.container_deployment, self.container_uri + return False, None, self.system_variant, self.container_uri else: self.logging.info( "Using cached benchmark {} at {}".format(self.benchmark, self.code_location) ) - return False, self.code_location, self.container_deployment, None + return False, self.code_location, self.system_variant, None msg = ( "no cached code package/container." @@ -1461,7 +1456,7 @@ def build( """ self._container_uri = None - if self.container_deployment: + if self.system_variant.is_container: assert container_client is not None repo_name = self._system_config.docker_repository() @@ -1608,7 +1603,7 @@ def build( return ( True, self._code_location, - self._container_deployment, + self._system_variant, self._container_uri, ) @@ -1746,7 +1741,7 @@ def code_package_modify(self, filename: str, data: bytes) -> None: Raises: NotImplementedError: If the code package is not a ZIP archive """ - if not self.container_deployment and self.code_package_is_archive(): + if not self.system_variant.is_container and self.code_package_is_archive(): assert self.code_location is not None self._update_zip(self.code_location, filename, data) new_size = self.code_package_recompute_size() / 1024.0 / 1024.0 @@ -1764,7 +1759,7 @@ def code_package_is_archive(self) -> bool: bool: True if package is a ZIP archive, False if it's a directory """ - if self.container_deployment: + if self.system_variant.is_container: return False code_location = self.code_location @@ -1783,7 +1778,7 @@ def code_package_recompute_size(self) -> float: Returns: float: Updated package size in bytes """ - if self.container_deployment: + if self.system_variant.is_container: raise NotImplementedError() if self.code_location is None: diff --git a/sebs/cache.py b/sebs/cache.py index 8114e30c..7e769782 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -740,7 +740,7 @@ def add_code_package( benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) os.makedirs(benchmark_dir, exist_ok=True) - package_type = "docker" if code_package.container_deployment else "package" + package_type = "docker" if code_package.system_variant.is_container else "package" # Check if cache directory for this deployment exist cached_dir = os.path.join( benchmark_dir, @@ -785,7 +785,7 @@ def add_code_package( } key = f"{language_version}-{architecture}" - if code_package.container_deployment: + if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) language_config["image-uri"] = code_package.container_uri language_config["image-id"] = image.id @@ -817,7 +817,7 @@ def add_code_package( if deployment_name in cached_config: # language known, platform known, extend dictionary if language in cached_config[deployment_name]: - if code_package.container_deployment: + if code_package.system_variant.is_container: cached_config[deployment_name][language]["containers"][ key ] = language_config @@ -866,7 +866,7 @@ def update_code_package( architecture = code_package.architecture benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) - package_type = "docker" if code_package.container_deployment else "package" + package_type = "docker" if code_package.system_variant.is_container else "package" # Check if cache directory for this deployment exist cached_dir = os.path.join( benchmark_dir, @@ -893,7 +893,7 @@ def update_code_package( created a code package earlier (which creates a directory), but not a container. """ key = f"{language_version}-{architecture}" - if code_package.container_deployment: + if code_package.system_variant.is_container: main_key = "containers" else: main_key = "code_package" @@ -940,7 +940,7 @@ def update_code_package( config[deployment_name][language][main_key][key]["hash"] = code_package.hash config[deployment_name][language][main_key][key]["size"] = code_package.code_size - if code_package.container_deployment: + if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) config[deployment_name][language][main_key][key]["image-id"] = image.id config[deployment_name][language][main_key][key][ diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 7fd7e233..3b8d1e0c 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -20,6 +20,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.faas.container import DockerContainer from sebs.faas.resources import SystemResources from sebs.faas.config import Resources @@ -154,7 +155,7 @@ def container_client(self) -> DockerContainer | None: """ return None - def system_variant_suffix(self, container_deployment: bool) -> Optional[str]: + def system_variant_suffix(self, system_variant: SystemVariant) -> Optional[str]: """Return an optional provider-local system variant suffix.""" return None @@ -334,7 +335,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> Function: """ @@ -345,14 +346,14 @@ def create_function( Args: code_package: Benchmark containing the function code func_name: Name of the function - container_deployment: Whether to deploy as a container + system_variant: Selected deployment variant container_uri: URI of the container image Returns: Function: Created function instance Raises: - NotImplementedError: If container deployment is requested but not supported + NotImplementedError: If the deployment variant is not supported """ pass @@ -376,7 +377,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ): """ @@ -385,11 +386,11 @@ def update_function( Args: function: Existing function instance to update code_package: New benchmark containing the function code - container_deployment: Whether to deploy as a container + system_variant: Selected deployment variant container_uri: URI of the container image Raises: - NotImplementedError: If container deployment is requested but not supported + NotImplementedError: If the deployment variant is not supported """ pass @@ -417,14 +418,14 @@ def build_function(self, code_package: Benchmark, func_name: Optional[str] = Non if not func_name: func_name = self.default_function_name(code_package) - _, code_package_loc, container_deployment, container_uri = code_package.build( + _, code_package_loc, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) if code_package_loc is not None: self.logging.info( f"Created code package for function {func_name} at {code_package_loc}" ) - if container_deployment: + if system_variant.is_container: self.logging.info( f"Created container deployment for function {func_name}: {container_uri}" ) @@ -472,7 +473,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) func_name = self.default_function_name(code_package) # Build the code package - rebuilt, _, container_deployment, container_uri = code_package.build( + rebuilt, _, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) @@ -510,9 +511,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) - function = self.create_function( - code_package, func_name, container_deployment, container_uri - ) + function = self.create_function(code_package, func_name, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), language_name=code_package.language_name, @@ -524,7 +523,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else: assert function is not None self.cached_function(function) - if code_package.container_deployment: + if code_package.system_variant.is_container: self.logging.info( f"Using cached function {func_name} container {code_package.container_uri}" ) @@ -553,7 +552,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) ) # Update function code - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) function.code_package_hash = code_package.hash function.updated_code = True self.cache_client.add_function( diff --git a/sebs/local/local.py b/sebs/local/local.py index 302cc95c..31fa13c1 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -29,6 +29,7 @@ from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.storage.resources import SelfHostedSystemResources from sebs.utils import LoggingHandlers, is_linux from sebs.local.config import LocalConfig @@ -431,7 +432,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "LocalFunction": """Create a new function deployment. In practice, it starts a new Docker container. @@ -439,7 +440,7 @@ def create_function( Args: code_package: Benchmark code package to deploy func_name: Name for the function - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container URI (unused for local) Returns: @@ -448,7 +449,7 @@ def create_function( Raises: NotImplementedError: If container deployment is requested """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Local") return self._start_container(code_package, func_name, None) @@ -456,7 +457,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update an existing function with new code. @@ -466,10 +467,10 @@ def update_function( Args: function: Existing function to update code_package: New benchmark code package - container_deployment: Whether to use container deployment (unused) + system_variant: Selected deployment variant container_uri: Container URI (unused) """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Local") func = cast(LocalFunction, function) diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 320510d7..4141ef61 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -16,6 +16,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache +from sebs.experiments.config import SystemVariant from sebs.faas import System from sebs.faas.function import Function, ExecutionResult, Trigger from sebs.openwhisk.container import OpenWhiskContainer @@ -368,7 +369,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "OpenWhiskFunction": """ @@ -381,7 +382,7 @@ def create_function( Args: code_package: Benchmark configuration and code package func_name: Name for the OpenWhisk action - container_deployment: Whether to use container-based deployment + system_variant: Selected deployment variant container_uri: URI of the Docker image for the function Returns: @@ -391,7 +392,7 @@ def create_function( RuntimeError: If WSK CLI is not accessible or function creation fails """ - if not container_deployment: + if not system_variant.is_container: raise RuntimeError("Non-container deployment is not supported in OpenWhisk!") self.logging.info("Creating function as an action in OpenWhisk.") @@ -425,7 +426,7 @@ def create_function( ) # Update function - we don't know what version is stored self.logging.info(f"Retrieved existing OpenWhisk action {func_name}.") - self.update_function(res, code_package, container_deployment, container_uri) + self.update_function(res, code_package, system_variant, container_uri) else: try: self.logging.info(f"Creating new OpenWhisk action {func_name}") @@ -502,7 +503,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """ @@ -511,13 +512,13 @@ def update_function( Args: function: Existing function to update code_package: New benchmark configuration and code package - container_deployment: Whether to use container-based deployment + system_variant: Selected deployment variant container_uri: URI of the new Docker image Raises: RuntimeError: If WSK CLI is not accessible or update fails """ - if not container_deployment: + if not system_variant.is_container: raise RuntimeError( "Code location must be set for OpenWhisk action! " "OpenWhisk requires container deployment with a code package." From 8469a53d98046ee7c881e3338a13d2abd15a77c5 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 28 Apr 2026 23:58:51 +0200 Subject: [PATCH 12/32] [system] Remove container flags from the CLI directly --- sebs/cli.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/sebs/cli.py b/sebs/cli.py index 1911b704..6fb0f679 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -123,11 +123,6 @@ def common_params(func): type=click.Choice(["x64", "arm64"]), help="Target architecture", ) - @click.option( - "--container-deployment/--no-container-deployment", - default=None, - help="Override whether functions should be deployed as container images.", - ) @click.option( "--system-variant", default=None, @@ -163,7 +158,6 @@ def parse_common_params( language_version, language_variant, architecture, - container_deployment, system_variant, resource_prefix: Optional[str] = None, initialize_deployment: bool = True, @@ -193,19 +187,7 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "update_code"], update_code) update_nested_dict(config_obj, ["experiments", "update_storage"], update_storage) update_nested_dict(config_obj, ["experiments", "architecture"], architecture) - update_nested_dict(config_obj, ["experiments", "container_deployment"], container_deployment) - - selected_deployment = config_obj.get("deployment", {}).get("name") - if selected_deployment == "gcp": - update_nested_dict( - config_obj, - ["deployment", "gcp", "configuration", "package-deployment-type"], - system_variant, - ) - else: - raise RuntimeError( - f"Unsupported deployment {selected_deployment} for system variant configuration." - ) + update_nested_dict(config_obj, ["experiments", "system_variant"], system_variant) # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) @@ -714,7 +696,7 @@ def start( update_storage=False, deployment="local", storage_configuration=storage_configuration, - container_deployment=False, + system_variant="package", architecture=architecture, **kwargs, ) From a5e8985ecf24405e9430a78e8cbe2b063bb8ace9 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:00:30 +0200 Subject: [PATCH 13/32] [system] Update configs with the new defaults --- configs/cpp.json | 3 +-- configs/example.json | 5 ++--- configs/java.json | 7 +++---- configs/nodejs.json | 3 +-- configs/python.json | 3 +-- configs/systems.json | 3 ++- 6 files changed, 10 insertions(+), 14 deletions(-) diff --git a/configs/cpp.json b/configs/cpp.json index 33dcdc5d..6fe14263 100644 --- a/configs/cpp.json +++ b/configs/cpp.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "system_variant": "package", "runtime": { "language": "cpp", "version": "all" @@ -68,7 +68,6 @@ "project_name": "", "credentials": "", "configuration": { - "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/example.json b/configs/example.json index a2461cea..a3f3f9b2 100644 --- a/configs/example.json +++ b/configs/example.json @@ -4,8 +4,8 @@ "update_code": false, "update_storage": false, "download_results": false, - "architecture": "x64", - "container_deployment": true, + "architecture": "x64", + "system_variant": "package", "runtime": { "language": "python", "version": "3.11" @@ -60,7 +60,6 @@ "project_name": "", "credentials": "", "configuration": { - "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/java.json b/configs/java.json index f8c3445f..f1b32df2 100644 --- a/configs/java.json +++ b/configs/java.json @@ -5,10 +5,10 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": false, + "system_variant": "package", "runtime": { - "language": "java", - "version": "11" + "language": "java", + "version": "11" }, "type": "invocation-overhead", "perf-cost": { @@ -60,7 +60,6 @@ "project_name": "", "credentials": "", "configuration": { - "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/nodejs.json b/configs/nodejs.json index 01300022..2560a5b8 100644 --- a/configs/nodejs.json +++ b/configs/nodejs.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "system_variant": "package", "runtime": { "language": "nodejs", "version": "20" @@ -68,7 +68,6 @@ "project_name": "", "credentials": "", "configuration": { - "package-deployment-type": "function-gen1", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/python.json b/configs/python.json index d619dce9..951cc760 100644 --- a/configs/python.json +++ b/configs/python.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": false, + "system_variant": "package", "runtime": { "language": "python", "version": "3.11" @@ -68,7 +68,6 @@ "project_name": "", "credentials": "", "configuration": { - "package-deployment-type": "function-gen2", "function-gen1": { "min-instances": 0, "max-instances": 20 diff --git a/configs/systems.json b/configs/systems.json index 019b2ad8..7b0beb0c 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -185,7 +185,8 @@ "arm64" ], "deployments": [ - "package", + "function-gen1", + "function-gen2", "container" ] }, From 2bb0dd88c00cf0cfa7ce8462720c76ee30063441 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:12:48 +0200 Subject: [PATCH 14/32] [experiment] Adapt to current interface --- sebs/experiments/invocation_overhead.py | 5 ++++- sebs/experiments/perf_cost.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sebs/experiments/invocation_overhead.py b/sebs/experiments/invocation_overhead.py index 150a4790..b1df8aae 100644 --- a/sebs/experiments/invocation_overhead.py +++ b/sebs/experiments/invocation_overhead.py @@ -92,7 +92,10 @@ def before_sample(self, size: int, input_benchmark: dict) -> None: arr = bytearray((random.getrandbits(8) for i in range(size))) self._benchmark.code_package_modify("randomdata.bin", bytes(arr)) function = self._deployment_client.get_function(self._benchmark) - self._deployment_client.update_function(function, self._benchmark, False, "") + # FIXME: we might want a change in the future - support containers + self._deployment_client.update_function( + function, self._benchmark, self._benchmark.system_variant, "" + ) class PayloadSize: diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index e8b2489a..e4421559 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -176,8 +176,10 @@ def run(self) -> None: self._deployment_client.update_function( self._function, self._benchmark, - self._benchmark.container_deployment, - self._benchmark.container_uri if self._benchmark.container_deployment else "", + self._benchmark.system_variant, + self._benchmark.container_uri + if self._benchmark.system_variant.is_container + else "", ) self._sebs_client.cache_client.update_function(self._function) # Run experiment with this memory configuration From 534b145c325985b4262dc93744806d0378440675 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:13:11 +0200 Subject: [PATCH 15/32] [system] Add default system variant --- sebs/cli.py | 6 ++++++ sebs/config.py | 16 ++++++++++++++++ sebs/sebs.py | 3 +++ 3 files changed, 25 insertions(+) diff --git a/sebs/cli.py b/sebs/cli.py index 6fb0f679..ec7bab57 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -189,6 +189,12 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "architecture"], architecture) update_nested_dict(config_obj, ["experiments", "system_variant"], system_variant) + selected_deployment = config_obj.get("deployment", {}).get("name") + if selected_deployment and "system_variant" not in config_obj.get("experiments", {}): + config_obj["experiments"]["system_variant"] = sebs_client.config.default_system_variant( + selected_deployment + ) + # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) diff --git a/sebs/config.py b/sebs/config.py index 6ffc9ff5..e23d460f 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -170,6 +170,22 @@ def supported_system_variants(self, deployment_name: str) -> List[str]: """Return the supported deployment variants for a platform.""" return self._system_config[deployment_name]["deployments"] + def default_system_variant(self, deployment_name: str) -> str: + """Return the default deployment variant for a platform. + + The default is the first declared variant in ``systems.json``. + + Args: + deployment_name: Name of the deployment platform. + + Returns: + Default deployment variant for the platform. + """ + variants = self.supported_system_variants(deployment_name) + if not variants: + raise RuntimeError(f"Deployment {deployment_name} has no configured system variants.") + return variants[0] + def benchmark_base_images( self, deployment_name: str, language_name: str, architecture: str ) -> Dict[str, str]: diff --git a/sebs/sebs.py b/sebs/sebs.py index 366445dc..d99aac4c 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -227,6 +227,9 @@ def get_deployment( ) ) + if "system_variant" not in config["experiments"]: + config["experiments"]["system_variant"] = self._config.default_system_variant(name) + selected_variant = config["experiments"]["system_variant"] if selected_variant not in self._config.supported_system_variants(name): raise RuntimeError(f"System variant {selected_variant} is not supported in {name}.") From bb0c30617bcd5cb2e8dae83223dba5e67318b5d3 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:16:17 +0200 Subject: [PATCH 16/32] [system] Simplifications and corrections --- configs/systems.json | 6 +++--- sebs/experiments/config.py | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/configs/systems.json b/configs/systems.json index 7b0beb0c..bf765298 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -185,8 +185,7 @@ "arm64" ], "deployments": [ - "function-gen1", - "function-gen2", + "package", "container" ] }, @@ -370,7 +369,8 @@ "arm64" ], "deployments": [ - "package", + "function-gen1", + "function-gen2", "container" ] }, diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index a8e919e4..20bbd2b6 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -12,6 +12,7 @@ The Config class handles serialization and deserialization of experiment configurations, allowing them to be loaded from and saved to configuration files. """ +from __future__ import annotations from typing import Dict @@ -48,10 +49,8 @@ def serialize(self) -> str: return self._value @staticmethod - def deserialize(value: str | "SystemVariant") -> "SystemVariant": - """Deserialize a deployment variant from a string or passthrough object.""" - if isinstance(value, SystemVariant): - return value + def deserialize(value: str) -> SystemVariant: + """Deserialize a deployment variant from a string.""" return SystemVariant(value) def __eq__(self, other: object) -> bool: From b4f07896648f9da35357c853e20b1fe4fe9306e9 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:18:46 +0200 Subject: [PATCH 17/32] [gcp] Adapt to the new interface --- sebs/gcp/config.py | 14 -------------- sebs/gcp/function.py | 9 --------- sebs/gcp/gcp.py | 41 ++++++++++++++++++++--------------------- 3 files changed, 20 insertions(+), 44 deletions(-) diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index 05da2bf8..3d05c664 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -437,7 +437,6 @@ def __init__(self) -> None: self._function_gen1_config: GCPFunctionGen1Config self._function_gen2_config: GCPFunctionGen2Config self._container_config: GCPContainerConfig - self._package_deployment_type: str = "function-gen1" @staticmethod def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: @@ -454,13 +453,6 @@ def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: config._function_gen1_config = GCPFunctionGen1Config.deserialize(dct["function-gen1"]) config._function_gen2_config = GCPFunctionGen2Config.deserialize(dct["function-gen2"]) config._container_config = GCPContainerConfig.deserialize(dct["container"]) - config._package_deployment_type = dct.get("package-deployment-type", "function-gen1") - - if config._package_deployment_type not in ("function-gen1", "function-gen2"): - raise ValueError( - "Invalid GCP package deployment type " - f"{config._package_deployment_type}. Expected function-gen1 or function-gen2." - ) return config @@ -474,7 +466,6 @@ def serialize(self) -> Dict: out["function-gen1"] = self._function_gen1_config.serialize() out["function-gen2"] = self._function_gen2_config.serialize() out["container"] = self._container_config.serialize() - out["package-deployment-type"] = self._package_deployment_type return out @property @@ -504,11 +495,6 @@ def container_config(self) -> GCPContainerConfig: """ return self._container_config - @property - def package_deployment_type(self) -> str: - """Get the package deployment selector used when container mode is disabled.""" - return self._package_deployment_type - class GCPResources(Resources): """Resource manager for serverless resources on Google Cloud Platform. diff --git a/sebs/gcp/function.py b/sebs/gcp/function.py index 51188426..5a71135c 100644 --- a/sebs/gcp/function.py +++ b/sebs/gcp/function.py @@ -47,15 +47,6 @@ def is_container(self) -> bool: """ return self == FunctionDeploymentType.CONTAINER - @staticmethod - def resolve( - container_deployment: bool, package_deployment_type: str - ) -> "FunctionDeploymentType": - """Resolve the effective GCP deployment type from experiment and GCP-local config.""" - if container_deployment: - return FunctionDeploymentType.CONTAINER - return FunctionDeploymentType.deserialize(package_deployment_type) - @staticmethod def deserialize(val: str) -> FunctionDeploymentType: """Deserialize a string value to a FunctionDeploymentEngine enum. diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index edbc9146..02c4f817 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -46,6 +46,7 @@ from sebs.cache import Cache from sebs.config import SeBSConfig from sebs.benchmark import Benchmark, BenchmarkConfig +from sebs.experiments.config import SystemVariant from sebs.faas.function import Function, FunctionConfig, Trigger from sebs.faas.config import Resources from sebs.faas.system import System @@ -2258,29 +2259,27 @@ def get_run_client(self): """ return self.run_container_strategy.run_client - def _resolve_deployment_type(self, container_deployment: bool) -> FunctionDeploymentType: + def _resolve_deployment_type(self, system_variant: SystemVariant) -> FunctionDeploymentType: """Resolve the effective GCP deployment type for a benchmark. Args: - container_deployment: Whether the experiment selected container mode. + system_variant: Experiment deployment variant. Returns: - Effective deployment type after applying GCP-local package settings. + Effective deployment type for GCP. """ - return FunctionDeploymentType.resolve( - container_deployment, self.config.deployment_config.package_deployment_type - ) + return FunctionDeploymentType.deserialize(system_variant.value) - def system_variant_suffix(self, container_deployment: bool) -> Optional[str]: + def system_variant_suffix(self, system_variant: SystemVariant) -> Optional[str]: """Return a provider-local system variant suffix for GCP package variants. Args: - container_deployment: Whether the benchmark uses container deployment. + system_variant: Selected deployment variant. Returns: Short suffix for GCP package variants, otherwise ``None``. """ - deployment_type = self._resolve_deployment_type(container_deployment) + deployment_type = self._resolve_deployment_type(system_variant) if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: return "gen1" if deployment_type == FunctionDeploymentType.FUNCTION_GEN2: @@ -2339,7 +2338,7 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma # Check if deployment config has changed cached_function = cast(GCPFunction, cached_function) - expected_deployment_type = self._resolve_deployment_type(benchmark.container_deployment) + expected_deployment_type = self._resolve_deployment_type(benchmark.system_variant) if cached_function.deployment_type != expected_deployment_type: self.logging.info( f"Deployment type has changed for {cached_function.name}: " @@ -2372,7 +2371,7 @@ def can_reuse_cached_function( str: if the cached function does not fit the requested deployment type. """ gcp_function = cast(GCPFunction, cached_function) - expected_deployment_type = self._resolve_deployment_type(benchmark.container_deployment) + expected_deployment_type = self._resolve_deployment_type(benchmark.system_variant) if gcp_function.deployment_type != expected_deployment_type: return ( @@ -2409,7 +2408,7 @@ def default_function_name( code_package.language_version, code_package.architecture, ) - deployment_type = self._resolve_deployment_type(code_package.container_deployment) + deployment_type = self._resolve_deployment_type(code_package.system_variant) if deployment_type == FunctionDeploymentType.CONTAINER: func_name = f"{func_name}-docker" elif deployment_type == FunctionDeploymentType.FUNCTION_GEN1: @@ -2540,7 +2539,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> GCPFunction: """Create a new GCP Cloud Function or update existing one. @@ -2553,14 +2552,14 @@ def create_function( Args: code_package: Benchmark package with code and configuration func_name: Name for the Cloud Function - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container image URI (unused for GCP) Returns: GCPFunction instance representing the deployed function Raises: - NotImplementedError: If container_deployment is True + NotImplementedError: If the deployment variant is unsupported RuntimeError: If function creation or IAM configuration fails """ @@ -2575,7 +2574,7 @@ def create_function( if architecture == "arm64": raise RuntimeError("GCP does not support arm64 deployments") - deployment_type = self._resolve_deployment_type(container_deployment) + deployment_type = self._resolve_deployment_type(system_variant) strategy = self._strategy_for_deployment_type(deployment_type) # Check if function/service already exists @@ -2635,7 +2634,7 @@ def create_function( ) strategy.allow_public_access(project_name, location, func_name) - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) # Add LibraryTrigger to a new function # Not supported on containers @@ -2713,7 +2712,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update an existing Cloud Function with new code and configuration. @@ -2725,11 +2724,11 @@ def update_function( Args: function: Existing function instance to update code_package: New benchmark package with updated code - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container image URI (unused) Raises: - NotImplementedError: If container_deployment is True + NotImplementedError: If the deployment variant is unsupported RuntimeError: If function update fails after maximum retries """ @@ -2746,7 +2745,7 @@ def update_function( # Update code using strategy strategy.update_code(function, code_package, envs, container_uri) - if container_deployment: + if system_variant.is_container: function.set_container_uri(container_uri) strategy.wait_for_deployment(function.name) From 8525df1bc366e831644949b8c876f6b22d9eb3e6 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:19:25 +0200 Subject: [PATCH 18/32] [docs] Update --- docs/modularity.md | 2 +- docs/platforms.md | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/docs/modularity.md b/docs/modularity.md index 8e1cad8e..650ab932 100644 --- a/docs/modularity.md +++ b/docs/modularity.md @@ -375,7 +375,7 @@ This function has been retrieved from the cache and requires refreshing function In practice, this is often limited to updating logging handlers - see existing implementations for details. ```python - def update_function(self, function: Function, code_package: Benchmark, container_deployment: bool, container_uri: str): + def update_function(self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str): ``` This function updates the function's code and configuration in the platform. diff --git a/docs/platforms.md b/docs/platforms.md index e3332543..14aae20c 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -217,12 +217,11 @@ or in the JSON input configuration: ### Deployment Modes -SeBS models two GCP deployment targets: +SeBS models three GCP deployment targets: 1. `function-gen1`: the first Google Cloud Functions Gen1 path. -2. `container`: direct container deployment to Cloud Run. - -We plan also to add support for `function-gen2`, the current Google Cloud Functions Gen2 path. +2. `function-gen2`: Google Cloud Functions Gen2 package deployment. +3. `container`: direct container deployment to Cloud Run. These deployment types intentionally share a single GCP backend in SeBS, but they are not identical in packaging, naming, scaling, or performance behavior. On GCP, there are two different concurrency layers that should not be confused: @@ -259,6 +258,37 @@ Gen1 configuration currently exposes instance-scaling controls: Use Gen1 when you want the most established GCP path in SeBS and do not need container-level runtime tuning. +### Function Gen2 + +Gen2 reuses the same local ZIP packaging flow as Gen1, but deploys through the Cloud Functions v2 API. It is selected directly through the experiment-level `system_variant`: + +```json +"deployment": { + "name": "gcp", + "gcp": { + "region": "europe-west1", + "project_name": "your-gcp-project-id", + "credentials": "/path/to/project-credentials.json", + "configuration": { + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } + } +} +``` + +Set `experiments.system_variant` to one of `function-gen1`, `function-gen2`, or `container`. From the CLI and regression workflows, the same selection is exposed through the generic `--system-variant` option. + +Gen1 and Gen2 package deployments use separate SeBS cache identities and separate cloud function names with short `-gen1` and `-gen2` suffixes. This avoids control-plane races when switching between package modes. In practice, regression and benchmark runs should still select one GCP package mode at a time for a given run. + ### Cloud Run Container Deployments Container deployments are the currently implemented Cloud Run-based path in SeBS. They are selected with container deployment and use a provider-specific function image built from `Dockerfile.function`. @@ -298,7 +328,8 @@ Cloud Run containers can [execute in two environments](https://docs.cloud.google The current GCP backend has the following practical limits: * Gen1 is the primary managed-functions deployment path today. -* Gen2 is planned and partially modeled in configuration, but not yet fully deployed through a dedicated strategy. +* Gen2 supports the ZIP package deployment path and HTTP triggers. +* Library-trigger direct invocation remains Gen1-only. * Cloud Run containers are implemented today and provide the most tuning control. * GCP deployments currently reject `arm64`, as arm64 instances are not available for GCR. * C++ packaging is not supported on GCP (but possible to be implemented on containers). From fa113288d14a21d577c40df19eb55013761ca1c3 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:29:21 +0200 Subject: [PATCH 19/32] [regression] Attempt to serialize gcp groups --- sebs/regression.py | 80 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/sebs/regression.py b/sebs/regression.py index 3dd62b26..193d12dd 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -74,7 +74,7 @@ # GCP-specific configurations architectures_gcp = ["x64"] -deployments_gcp = ["package", "container"] +deployments_gcp = ["function-gen1", "function-gen2", "container"] # Azure-specific configurations architectures_azure = ["x64"] @@ -92,6 +92,30 @@ LOGGING_REDACTOR: SensitiveDataFilter = SensitiveDataFilter() +def configure_regression_deployment( + config_copy: dict, deployment_name: str, deployment_type: str +) -> str: + """Inject provider-local deployment configuration for a regression variant.""" + config_copy["experiments"]["system_variant"] = deployment_type + return deployment_type + + +def execution_group_key(test: unittest.TestCase) -> tuple[str, str]: + """Return the execution group for a test case. + + GCP variants are split into separate groups so they do not execute at the same + time. All other providers share a single provider-wide group and may still run + concurrently with each other. + """ + deployment_name = test.deployment_name # type: ignore[attr-defined] + test_name = cast(unittest.TestCase, test)._testMethodName + deployment_type = getattr(test, test_name).test_deployment_type # type: ignore[attr-defined] + + if deployment_name == "gcp": + return deployment_name, deployment_type + return deployment_name, "all" + + class TestSequenceMeta(type): """Metaclass for dynamically generating regression test cases. @@ -196,7 +220,7 @@ def test(self): # Configure experiment settings self.experiment_config["architecture"] = architecture - self.experiment_config["container_deployment"] = deployment_type == "container" + self.experiment_config["system_variant"] = deployment_type # Get deployment client for the specific cloud provider deployment_client = self.get_deployment( @@ -352,7 +376,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create a log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -415,7 +439,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create a log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -470,7 +494,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" deployment_client = self.client.get_deployment( @@ -530,7 +554,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" deployment_client = self.client.get_deployment( @@ -616,7 +640,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -702,7 +726,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -781,7 +805,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -844,7 +868,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -907,7 +931,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -970,7 +994,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1037,7 +1061,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1095,7 +1119,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1149,7 +1173,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1280,7 +1304,7 @@ def filter_out_benchmarks( # Filter out image recognition on newer Python versions on GCP if (deployment_name == "gcp" and language == "python" and language_version in ["3.8", "3.9", "3.10", "3.11", "3.12"] - and deployment_type == "package"): + and deployment_type.startswith("package")): return "411.image-recognition" not in benchmark # fmt: on @@ -1425,13 +1449,31 @@ def regression_suite( else: print(f"Skip test {test_name}") - # Create a concurrent test suite for parallel execution - concurrent_suite = testtools.ConcurrentStreamTestSuite(lambda: ((test, None) for test in tests)) result = TracingStreamResult() # Run the tests result.startTestRun() - concurrent_suite.run(result) + gcp_grouped_tests: Dict[tuple[str, str], list] = {} + other_tests = [] + for test in tests: + group_key = execution_group_key(test) + if group_key[0] == "gcp": + gcp_grouped_tests.setdefault(group_key, []).append(test) + else: + other_tests.append(test) + + if other_tests: + concurrent_suite = testtools.ConcurrentStreamTestSuite( + lambda: ((test, None) for test in other_tests) + ) + concurrent_suite.run(result) + + for (_, deployment_type), group_tests in sorted(gcp_grouped_tests.items()): + print(f"Running regression group provider=gcp, system_variant={deployment_type}, tests={len(group_tests)}") + concurrent_suite = testtools.ConcurrentStreamTestSuite( + lambda group_tests=group_tests: ((test, None) for test in group_tests) + ) + concurrent_suite.run(result) result.stopTestRun() # Report results From 82330defa6b350e7736648fbb80bfd4cb7c6c8ba Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 00:31:22 +0200 Subject: [PATCH 20/32] [gcp] No ARM on GCP --- configs/systems.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/configs/systems.json b/configs/systems.json index bf765298..d649e0bc 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -365,8 +365,7 @@ } }, "architecture": [ - "x64", - "arm64" + "x64" ], "deployments": [ "function-gen1", From eeb813b811cbb3721ebdd5e9ddafe6d85de23607 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 10:22:30 +0200 Subject: [PATCH 21/32] [ci] Bump regression tests for GCP - serialized --- .github/workflows/_regression-job.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/_regression-job.yml b/.github/workflows/_regression-job.yml index d22cf270..296b2221 100644 --- a/.github/workflows/_regression-job.yml +++ b/.github/workflows/_regression-job.yml @@ -58,7 +58,7 @@ jobs: uv pip install . - name: Run regression tests - timeout-minutes: 10 + timeout-minutes: 20 run: | source .venv/bin/activate uv run sebs benchmark regression test \ From a3811fdcbd39eaf7eefc725fe55a90c12b4b2813 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:02:14 +0200 Subject: [PATCH 22/32] [dev] Linting --- sebs/regression.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sebs/regression.py b/sebs/regression.py index 193d12dd..661e9c62 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -25,7 +25,7 @@ import testtools import threading from time import sleep -from typing import cast, Dict, Optional, Set, TYPE_CHECKING +from typing import cast, Dict, List, Optional, Set, TYPE_CHECKING from sebs.faas.function import Trigger from sebs.utils import ColoredWrapper, SensitiveDataFilter, LoggingBase @@ -1418,12 +1418,13 @@ def regression_suite( ) # Prepare the list of tests to run - tests = [] + tests: List[unittest.TestCase] = [] # mypy is confused here about the type for case in suite: for test in case: # type: ignore # Get the test method name - test_name = cast(unittest.TestCase, test)._testMethodName + test_case = cast(unittest.TestCase, test) + test_name = test_case._testMethodName # Remove unsupported benchmarks test_architecture = getattr(test, test_name).test_architecture # type: ignore @@ -1445,7 +1446,7 @@ def regression_suite( # Set up test instance with client and config test.client = sebs_client # type: ignore test.experiment_config = experiment_config.copy() # type: ignore - tests.append(test) + tests.append(test_case) else: print(f"Skip test {test_name}") @@ -1469,9 +1470,12 @@ def regression_suite( concurrent_suite.run(result) for (_, deployment_type), group_tests in sorted(gcp_grouped_tests.items()): - print(f"Running regression group provider=gcp, system_variant={deployment_type}, tests={len(group_tests)}") + print( + f"Running regression group provider=gcp, " + f"system_variant={deployment_type}, tests={len(group_tests)}" + ) concurrent_suite = testtools.ConcurrentStreamTestSuite( - lambda group_tests=group_tests: ((test, None) for test in group_tests) + lambda group_tests=group_tests: ((test, None) for test in group_tests) # type: ignore ) concurrent_suite.run(result) result.stopTestRun() From 9df3d12e3e8a862b772952985c209d8bbeb5d4ad Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:11:35 +0200 Subject: [PATCH 23/32] [gcp] Fix regression --- sebs/regression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sebs/regression.py b/sebs/regression.py index 661e9c62..58920c79 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -1304,7 +1304,7 @@ def filter_out_benchmarks( # Filter out image recognition on newer Python versions on GCP if (deployment_name == "gcp" and language == "python" and language_version in ["3.8", "3.9", "3.10", "3.11", "3.12"] - and deployment_type.startswith("package")): + and deployment_type == "function-gen1"): return "411.image-recognition" not in benchmark # fmt: on From 31eeead73b4111a50d3a32a4458213f012124b12 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:27:36 +0200 Subject: [PATCH 24/32] [gcp] Additional check for correctness --- sebs/experiments/config.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index 20bbd2b6..d88ee6ea 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -26,6 +26,16 @@ class SystemVariant: property for logic that only cares about the package vs container split. """ + ALL_SYSTEM_VARIANTS = [ + # default everywhere but OpenWhisk and GCP + "package", + # Lambda, OpenWhisk, and GCP + "container", + # GCP specific + "function-gen1", + "function-gen2", + ] + def __init__(self, value: str): """Initialize the system variant. @@ -51,6 +61,8 @@ def serialize(self) -> str: @staticmethod def deserialize(value: str) -> SystemVariant: """Deserialize a deployment variant from a string.""" + if value not in SystemVariant.ALL_SYSTEM_VARIANTS: + raise ValueError(f"Invalid system variant: {value}") return SystemVariant(value) def __eq__(self, other: object) -> bool: From e72b864228a5e3f3e202ef6cb4df119e8ca5e0a3 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:30:28 +0200 Subject: [PATCH 25/32] [docs] Update signature --- docs/modularity.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modularity.md b/docs/modularity.md index 650ab932..7a1e4478 100644 --- a/docs/modularity.md +++ b/docs/modularity.md @@ -375,7 +375,7 @@ This function has been retrieved from the cache and requires refreshing function In practice, this is often limited to updating logging handlers - see existing implementations for details. ```python - def update_function(self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str): + def update_function(self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str | None): ``` This function updates the function's code and configuration in the platform. From 2bc79d0b1746b092988e825908b9bcb5fc8e47af Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:37:09 +0200 Subject: [PATCH 26/32] [gcp] Use previous language style always for storing container info --- sebs/benchmark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sebs/benchmark.py b/sebs/benchmark.py index d584f479..f8e2ea6a 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -761,7 +761,7 @@ def query_cache(self) -> None: self._code_package = self._cache_client.get_container( deployment=self._deployment_name, benchmark=self._benchmark, - language=self.cache_language_key, + language=self.language, language_version=self.language_version, architecture=self.architecture, ) From 754ed43076d7b7afcb6fac5577c097525ff0c622 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 17:58:03 +0200 Subject: [PATCH 27/32] [gcp] Fix envs for gen2 deployment --- sebs/gcp/gcp.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 02c4f817..06171c61 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -2036,9 +2036,11 @@ def generate_runtime_envs(self) -> Dict: Environment variables controlling Gunicorn worker settings. """ dep_config = self.config.deployment_config.function_gen2_config + # gen2 uses its own environment variables + # https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241 return { - "GUNICORN_WORKERS": str(dep_config.worker_concurrency), - "GUNICORN_THREADS": str(dep_config.worker_threads), + "WORKERS": str(dep_config.worker_concurrency), + "THREADS": str(dep_config.worker_threads), } def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: From 3a8b8ac17aea7ef882a572ae42dd89930ce67947 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 20:34:20 +0200 Subject: [PATCH 28/32] [system] Make code package size computing consistent --- sebs/aws/aws.py | 2 +- sebs/azure/azure.py | 2 +- sebs/benchmark.py | 29 ++++++++--------------------- sebs/faas/system.py | 4 ++-- sebs/gcp/gcp.py | 2 +- sebs/local/local.py | 2 +- sebs/openwhisk/openwhisk.py | 4 ++-- 7 files changed, 16 insertions(+), 29 deletions(-) diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index b4bae0f4..dd7077d1 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -192,7 +192,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Package code for deployment to AWS Lambda. diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 762b372e..6fbb7906 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -221,7 +221,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package function code for Azure Functions deployment. Creates the proper directory structure and configuration files diff --git a/sebs/benchmark.py b/sebs/benchmark.py index f8e2ea6a..5ddadfd6 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -443,7 +443,7 @@ def container_uri(self) -> str: return self._container_uri @property - def language(self) -> "Language": + def language(self) -> Language: """ Get the programming language for the benchmark. @@ -471,19 +471,6 @@ def language_variant(self) -> str: """ return self._language_variant - @property - def cache_language_key(self) -> str: - """ - Add language variant to the cache key so that different variants of - the same language don't conflict in cache. - """ - base_key = self._language.value - if self._language_variant != "default": - base_key = f"{base_key}_{self._language_variant}" - if self._system_variant_suffix: - return f"{base_key}_{self._system_variant_suffix}" - return base_key - @property def language_version(self) -> str: """ @@ -771,7 +758,7 @@ def query_cache(self) -> None: self._code_package = self._cache_client.get_code_package( deployment=self._deployment_name, benchmark=self._benchmark, - language=self.cache_language_key, + language=self.language, language_version=self.language_version, architecture=self.architecture, ) @@ -779,7 +766,7 @@ def query_cache(self) -> None: self._functions = self._cache_client.get_functions( deployment=self._deployment_name, benchmark=self._benchmark, - language=self.cache_language_key, + language=self.language, ) if self._code_package is not None: @@ -1150,7 +1137,7 @@ def add_deployment_package(self, output_dir: str) -> None: raise NotImplementedError @staticmethod - def directory_size(directory: str) -> int: + def directory_size(directory: str) -> float: """Calculate total size of all files in a directory. Recursively calculates the total size in bytes of all files @@ -1166,7 +1153,7 @@ def directory_size(directory: str) -> int: root = Path(directory) sizes = [f.stat().st_size for f in root.glob("**/*") if f.is_file()] - return sum(sizes) + return sum(sizes) / 1024.0 / 1024.0 def builder_image_name(self) -> Tuple[str, str]: """Image names of builder Docker images for preparing benchmarks. @@ -1356,7 +1343,7 @@ def ensure_image(name: str) -> None: self.logging.error(f"Docker mount volumes: {volumes}") raise e from None - def recalculate_code_size(self) -> int: + def recalculate_code_size(self) -> float: """Recalculate and update the code package size. Measures the current size of the output directory and updates @@ -1370,9 +1357,9 @@ def recalculate_code_size(self) -> int: def build( self, - package_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]], + package_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, float]], container_client: DockerContainer | None, - container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]] + container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None, ) -> Tuple[bool, str | None, SystemVariant, str | None]: """Build the complete benchmark deployment package. diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 3b8d1e0c..291e9ce7 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -289,7 +289,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Apply system-specific code packaging to prepare a deployment package. @@ -319,7 +319,7 @@ def package_code( def finalize_container_build( self, - ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None: + ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None: """Default behavior of container deployment is that no code package is needed. Thus, we return None to signal that. diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 06171c61..5dba4d2e 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -2451,7 +2451,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package benchmark code for GCP Cloud Functions deployment. Transforms the benchmark code directory structure to meet GCP Cloud Functions diff --git a/sebs/local/local.py b/sebs/local/local.py index 31fa13c1..99c03c7a 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -184,7 +184,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package function code for local execution. Creates a compatible code package structure for local execution that diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 4141ef61..7ed42953 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -205,7 +205,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Package benchmark code for OpenWhisk deployment. @@ -308,7 +308,7 @@ def package_code( def finalize_container_build( self, - ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None: + ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None: """ Regardless of Docker image status, we need to create .zip file to allow registration of function with OpenWhisk. From 0a908f95cf68693f03333b6397be9bbd51934bae Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 23:28:12 +0200 Subject: [PATCH 29/32] [system] Simplify and update caching major change - our previous caching attempt didn't take into consideration system variants and language variants. additionally, we simplified the implementation significantly --- sebs/benchmark.py | 21 ++--- sebs/cache.py | 232 +++++++++++++++++++++++----------------------- 2 files changed, 122 insertions(+), 131 deletions(-) diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 5ddadfd6..3a18db6d 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -526,6 +526,11 @@ def system_variant(self) -> SystemVariant: """Return the selected deployment variant for this benchmark.""" return self._system_variant + @property + def system_variant_suffix(self) -> str | None: + """Return the selected deployment variant for this benchmark.""" + return self._system_variant_suffix + @property # noqa: A003 def hash(self) -> str: """ @@ -745,23 +750,11 @@ def query_cache(self) -> None: whether the cache exists and if it's still valid (hash matches). """ if self.system_variant.is_container: - self._code_package = self._cache_client.get_container( - deployment=self._deployment_name, - benchmark=self._benchmark, - language=self.language, - language_version=self.language_version, - architecture=self.architecture, - ) + self._code_package = self._cache_client.get_container(self._deployment_name, self) if self._code_package is not None: self._container_uri = self._code_package["image-uri"] else: - self._code_package = self._cache_client.get_code_package( - deployment=self._deployment_name, - benchmark=self._benchmark, - language=self.language, - language_version=self.language_version, - architecture=self.architecture, - ) + self._code_package = self._cache_client.get_code_package(self._deployment_name, self) self._functions = self._cache_client.get_functions( deployment=self._deployment_name, diff --git a/sebs/cache.py b/sebs/cache.py index 7e769782..5df594bf 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -30,7 +30,7 @@ import shutil import threading import tempfile -from typing import Any, Callable, Dict, List, Mapping, Optional, TYPE_CHECKING # noqa +from typing import Any, Dict, List, Mapping, Optional, Tuple, TYPE_CHECKING # noqa from sebs.utils import LoggingBase, serialize @@ -97,6 +97,47 @@ def map_keys(obj: Dict[str, Any], val: Any, keys: List[str]) -> Dict[str, Any]: update(cfg, map_keys(cfg, val, keys)) +def keys_exist(obj: Dict, keys: List[Any]) -> bool: + """Find if a nested object exists in a dictionary. + + example: {key1: {key2: {key3: value}}} + for [key1, key2, key3] -> True + + Args: + obj: dictionary + keys: dynamic list of nested keys + + Returns: + true if the nested object exists + """ + for key in keys: + if isinstance(obj, Dict) and key in obj: + obj = obj[key] + else: + return False + return True + + +def keys_get(obj: Dict, keys: List[Any]) -> Any: + """Find if a nested object exists in a dictionary. + + example: {key1: {key2: {key3: value}}} + for [key1, key2, key3] -> True + + Args: + obj: dictionary + keys: dynamic list of nested keys + + Returns: + true if the nested object exists + """ + current = obj + for key in keys: + if isinstance(current, Dict) and key in current: + current = current[key] + return current + + class Cache(LoggingBase): """Persistent caching system for SeBS benchmark configurations and deployments. @@ -274,58 +315,42 @@ def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict return None def get_code_package( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, + self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached version of benchmark code package. Args: deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Code package configuration or None if not found. """ - cfg = self.get_benchmark_config(deployment, benchmark) + cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) - key = f"{language_version}-{architecture}" - if cfg and language in cfg and key in cfg[language]["code_package"]: - return cfg[language]["code_package"][key] + base_keys, extra_keys = self.code_cache_keys(code_package) + if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): + return keys_get(cfg, [*base_keys, *extra_keys]) else: return None def get_container( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, + self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached container configuration for a benchmark. Args: deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Container configuration or None if not found. """ - cfg = self.get_benchmark_config(deployment, benchmark) + cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) - key = f"{language_version}-{architecture}" - if cfg and language in cfg and key in cfg[language]["containers"]: - return cfg[language]["containers"][key] + base_keys, extra_keys = self.code_cache_keys(code_package) + if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): + return keys_get(cfg, [*base_keys, *extra_keys]) else: return None @@ -713,6 +738,24 @@ def _update_resources( self._write_json_atomic(config_file, cached_config) + @staticmethod + def code_cache_keys(code_package: "Benchmark") -> Tuple[List[str], List[str]]: + """ + Add language and system variant suffixes to the package cache key so + differing build artifacts do not conflict in cache. + """ + base_key = [code_package.language.value] + base_key.append( + "containers" if code_package.system_variant.is_container else "code_package" + ) + extra_keys = [] + extra_keys.append(code_package.language_variant) + if code_package.system_variant_suffix is not None: + extra_keys.append(code_package.system_variant_suffix) + extra_keys.append(code_package.language_version) + extra_keys.append(code_package.architecture) + return base_key, extra_keys + def add_code_package( self, deployment_name: str, @@ -733,23 +776,12 @@ def add_code_package( RuntimeError: If cached application already exists for the deployment. """ with self._lock: - language = code_package.cache_language_key - language_version = code_package.language_version - architecture = code_package.architecture - benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) os.makedirs(benchmark_dir, exist_ok=True) - package_type = "docker" if code_package.system_variant.is_container else "package" # Check if cache directory for this deployment exist - cached_dir = os.path.join( - benchmark_dir, - deployment_name, - language, - language_version, - architecture, - package_type, - ) + base_keys, extra_keys = self.code_cache_keys(code_package) + cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) if not os.path.exists(cached_dir): os.makedirs(cached_dir, exist_ok=True) @@ -784,61 +816,42 @@ def add_code_package( "modified": date, } - key = f"{language_version}-{architecture}" + config: Dict[str, Any] = { + "containers": {}, + "code_package": {}, + "functions": {}, + } if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) language_config["image-uri"] = code_package.container_uri language_config["image-id"] = image.id - config = { - deployment_name: { - language: { - "containers": {key: language_config}, - "code_package": {}, - "functions": {}, - } - } - } + update_dict(config["containers"], language_config, extra_keys) else: - config = { - deployment_name: { - language: { - "code_package": {key: language_config}, - "containers": {}, - "functions": {}, - } - } - } + update_dict(config["code_package"], language_config, extra_keys) # make sure to not replace other entries if os.path.exists(os.path.join(benchmark_dir, "config.json")): with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: cached_config = json.load(fp) - if deployment_name in cached_config: - # language known, platform known, extend dictionary - if language in cached_config[deployment_name]: - if code_package.system_variant.is_container: - cached_config[deployment_name][language]["containers"][ - key - ] = language_config - else: - cached_config[deployment_name][language]["code_package"][ - key - ] = language_config - - # language unknown, platform known - add new dictionary - else: - cached_config[deployment_name][language] = config[deployment_name][ - language - ] + + keys = [*base_keys, *extra_keys] + language = keys[0] + if language in cached_config[deployment_name]: + # language known - add code package, + # but do not overwrite existing entries + update_dict( + cached_config[deployment_name][language], language_config, keys[1:] + ) else: - # language unknown, platform unknown - add new dictionary - cached_config[deployment_name] = config[deployment_name] + # language unknown - add new dictionary + # everything else needs to be initialized + cached_config[deployment_name][language] = config + config = cached_config self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: - # TODO: update raise RuntimeError( "Cached application {} for {} already exists!".format( code_package.benchmark, deployment_name @@ -861,21 +874,11 @@ def update_code_package( code_package (Benchmark): The benchmark code package to update. """ with self._lock: - language = code_package.cache_language_key - language_version = code_package.language_version - architecture = code_package.architecture benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) - package_type = "docker" if code_package.system_variant.is_container else "package" # Check if cache directory for this deployment exist - cached_dir = os.path.join( - benchmark_dir, - deployment_name, - language, - language_version, - architecture, - package_type, - ) + base_keys, extra_keys = self.code_cache_keys(code_package) + cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) config_location = os.path.join(benchmark_dir, "config.json") @@ -892,22 +895,12 @@ def update_code_package( A simple check of directory existence is insufficient, as we might have created a code package earlier (which creates a directory), but not a container. """ - key = f"{language_version}-{architecture}" - if code_package.system_variant.is_container: - main_key = "containers" - else: - main_key = "code_package" - - package_exists = True - try: - config[deployment_name][language][main_key][key] - except KeyError: - package_exists = False - + package_exists = keys_exist(config, [deployment_name, *base_keys, *extra_keys]) + if not package_exists: """ - We have no such cache entry - fallback. - However, we still have directory, a possible leftover after crash. - Whatever was left, we remove it since we have no information what is there. + We have no such cache entry - fallback. + However, we still have directory, a possible leftover after crash. + Whatever was left, we remove it since we have no information what is there. """ if os.path.exists(cached_dir): shutil.rmtree(cached_dir) @@ -936,16 +929,15 @@ def update_code_package( else: self.logging.info(f"Caching container pushed to: {code_package.container_uri}") - config[deployment_name][language][main_key][key]["date"]["modified"] = date - config[deployment_name][language][main_key][key]["hash"] = code_package.hash - config[deployment_name][language][main_key][key]["size"] = code_package.code_size + cached_config = keys_get(config, [deployment_name, *base_keys, *extra_keys]) + cached_config["date"]["modified"] = date + cached_config["hash"] = code_package.hash + cached_config["size"] = code_package.code_size if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) - config[deployment_name][language][main_key][key]["image-id"] = image.id - config[deployment_name][language][main_key][key][ - "image-uri" - ] = code_package.container_uri + cached_config["image-id"] = image.id + cached_config["image-uri"] = code_package.container_uri self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: @@ -976,7 +968,7 @@ def add_function( return with self._lock: benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) - language = code_package.cache_language_key + language = language_name cache_config = os.path.join(benchmark_dir, "config.json") if os.path.exists(cache_config): @@ -984,7 +976,13 @@ def add_function( with open(cache_config, "r") as fp: cached_config = json.load(fp) - if "functions" not in cached_config[deployment_name][language]: + if language not in cached_config[deployment_name]: + cached_config[deployment_name][language] = { + "functions": functions_config, + "code_package": {}, + "containers": {}, + } + elif "functions" not in cached_config[deployment_name][language]: cached_config[deployment_name][language]["functions"] = functions_config else: cached_config[deployment_name][language]["functions"].update( From fa42a0a8a0ec4e6fbb352d7802f51ed104577a98 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Wed, 29 Apr 2026 23:48:02 +0200 Subject: [PATCH 30/32] [system] Fix bug in cache initialization --- sebs/cache.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sebs/cache.py b/sebs/cache.py index 5df594bf..e6921442 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -849,6 +849,10 @@ def add_code_package( cached_config[deployment_name][language] = config config = cached_config + else: + # entirely new entry + language = base_keys[0] + config = {deployment_name: {language: config}} self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: From 4557bb8fcc193006ce28f2c491f5f3d9091c3bcc Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Thu, 30 Apr 2026 00:14:14 +0200 Subject: [PATCH 31/32] [gcp] Improve metrics polling for function v1 --- sebs/gcp/gcp.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 5dba4d2e..a0adea56 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -959,7 +959,7 @@ def download_metrics( interval = monitoring_v3.TimeInterval( { - "end_time": {"seconds": int(end_time_seconds) + 60}, + "end_time": {"seconds": int(end_time_seconds) + 300}, "start_time": {"seconds": int(start_time_seconds)}, } ) @@ -967,23 +967,27 @@ def download_metrics( for metric in available_metrics: metrics[metric] = [] + flt = ( + f'metric.type = "cloudfunctions.googleapis.com/function/{metric}" ' + f'AND resource.type = "cloud_function" ' + f'AND resource.labels.function_name = "{function_name}"' + ) list_request = monitoring_v3.ListTimeSeriesRequest( name=project_name, - filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric), + filter=flt, interval=interval, ) results = client.list_time_series(list_request) for result in results: - if result.resource.labels.get("function_name") == function_name: - for point in result.points: - metrics[metric] += [ - { - "mean_value": point.value.distribution_value.mean, - "executions_count": point.value.distribution_value.count, - } - ] + for point in result.points: + metrics[metric] += [ + { + "mean_value": point.value.distribution_value.mean, + "executions_count": point.value.distribution_value.count, + } + ] @staticmethod def _extract_trace_id(entry) -> Optional[str]: From b6b68476e7b31512224356376e4c4d54e9a76b3f Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Thu, 30 Apr 2026 00:30:15 +0200 Subject: [PATCH 32/32] [system] Update cache modification methods --- sebs/benchmark.py | 5 +---- sebs/cache.py | 43 +++++++++++++++++++++++++------------------ 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 3a18db6d..32cf62b9 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -1390,10 +1390,7 @@ def build( ) self._cache_client.update_container_uri( self._deployment_name, - self._benchmark, - self.language_name, - self.language_version, - self.architecture, + self, self._container_uri, ) self.logging.info( diff --git a/sebs/cache.py b/sebs/cache.py index e6921442..6deccd4f 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -366,6 +366,20 @@ def invalidate_all_container_uris(self, deployment: str) -> None: Args: deployment (str): Deployment platform name. """ + + def clear_nested_container_uris(obj: Dict[str, Any]) -> bool: + """Internal method - recursively check for all image-uri. + Simpler method then walking all nested variants of containers.""" + modified = False + if "image-uri" in obj: + obj["image-uri"] = None + return True + + for value in obj.values(): + if isinstance(value, dict): + modified = clear_nested_container_uris(value) or modified + return modified + with self._lock: if not os.path.exists(self.cache_dir): return @@ -389,21 +403,13 @@ def invalidate_all_container_uris(self, deployment: str) -> None: containers = lang_cfg.get("containers") if containers is None: continue - for container_cfg in containers.values(): - container_cfg["image-uri"] = None - modified = True + modified = clear_nested_container_uris(containers) or modified if modified: self._write_json_atomic(config_path, config) def update_container_uri( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, - uri: str, + self, deployment_name: str, code_package: "Benchmark", uri: str ) -> None: """Update the image-uri for a specific cached container entry. @@ -411,23 +417,24 @@ def update_container_uri( the registry to be accessible for cloud deployment. Args: - deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + deployment_name (str): Deployment platform name. + code_package (Benchmark): Benchmark package identifying the cache entry. uri (str): New image URI to store. """ with self._lock: - config_path = os.path.join(self.cache_dir, benchmark, "config.json") + config_path = os.path.join(self.cache_dir, code_package.benchmark, "config.json") if not os.path.exists(config_path): return with open(config_path, "r") as fp: config = json.load(fp) - key = f"{language_version}-{architecture}" - config[deployment][language]["containers"][key]["image-uri"] = uri + base_keys, extra_keys = self.code_cache_keys(code_package) + keys = [deployment_name, *base_keys, *extra_keys] + if not keys_exist(config, keys): + return + + keys_get(config, keys)["image-uri"] = uri self._write_json_atomic(config_path, config)