diff --git a/.github/workflows/_regression-job.yml b/.github/workflows/_regression-job.yml index d22cf270..296b2221 100644 --- a/.github/workflows/_regression-job.yml +++ b/.github/workflows/_regression-job.yml @@ -58,7 +58,7 @@ jobs: uv pip install . - name: Run regression tests - timeout-minutes: 10 + timeout-minutes: 20 run: | source .venv/bin/activate uv run sebs benchmark regression test \ diff --git a/configs/cpp.json b/configs/cpp.json index e4715b6f..6fe14263 100644 --- a/configs/cpp.json +++ b/configs/cpp.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "system_variant": "package", "runtime": { "language": "cpp", "version": "all" diff --git a/configs/example.json b/configs/example.json index efcafb71..a3f3f9b2 100644 --- a/configs/example.json +++ b/configs/example.json @@ -4,8 +4,8 @@ "update_code": false, "update_storage": false, "download_results": false, - "architecture": "x64", - "container_deployment": true, + "architecture": "x64", + "system_variant": "package", "runtime": { "language": "python", "version": "3.11" diff --git a/configs/java.json b/configs/java.json index 5727d06a..f1b32df2 100644 --- a/configs/java.json +++ b/configs/java.json @@ -5,10 +5,10 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": false, + "system_variant": "package", "runtime": { - "language": "java", - "version": "11" + "language": "java", + "version": "11" }, "type": "invocation-overhead", "perf-cost": { diff --git a/configs/nodejs.json b/configs/nodejs.json index 8bdbefe1..2560a5b8 100644 --- a/configs/nodejs.json +++ b/configs/nodejs.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "system_variant": "package", "runtime": { "language": "nodejs", "version": "20" diff --git a/configs/python.json b/configs/python.json index f184d235..951cc760 100644 --- a/configs/python.json +++ b/configs/python.json @@ -5,7 +5,7 @@ "update_storage": false, "download_results": false, "architecture": "x64", - "container_deployment": true, + "system_variant": "package", "runtime": { "language": "python", "version": "3.11" diff --git a/configs/systems.json b/configs/systems.json index 019b2ad8..d649e0bc 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -365,11 +365,11 @@ } }, "architecture": [ - "x64", - "arm64" + "x64" ], "deployments": [ - "package", + "function-gen1", + "function-gen2", "container" ] }, diff --git a/docs/modularity.md b/docs/modularity.md index 8e1cad8e..7a1e4478 100644 --- a/docs/modularity.md +++ b/docs/modularity.md @@ -375,7 +375,7 @@ This function has been retrieved from the cache and requires refreshing function In practice, this is often limited to updating logging handlers - see existing implementations for details. ```python - def update_function(self, function: Function, code_package: Benchmark, container_deployment: bool, container_uri: str): + def update_function(self, function: Function, code_package: Benchmark, system_variant: SystemVariant, container_uri: str | None): ``` This function updates the function's code and configuration in the platform. diff --git a/docs/platforms.md b/docs/platforms.md index e3332543..14aae20c 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -217,12 +217,11 @@ or in the JSON input configuration: ### Deployment Modes -SeBS models two GCP deployment targets: +SeBS models three GCP deployment targets: 1. `function-gen1`: the first Google Cloud Functions Gen1 path. -2. `container`: direct container deployment to Cloud Run. - -We plan also to add support for `function-gen2`, the current Google Cloud Functions Gen2 path. +2. `function-gen2`: Google Cloud Functions Gen2 package deployment. +3. `container`: direct container deployment to Cloud Run. These deployment types intentionally share a single GCP backend in SeBS, but they are not identical in packaging, naming, scaling, or performance behavior. On GCP, there are two different concurrency layers that should not be confused: @@ -259,6 +258,37 @@ Gen1 configuration currently exposes instance-scaling controls: Use Gen1 when you want the most established GCP path in SeBS and do not need container-level runtime tuning. +### Function Gen2 + +Gen2 reuses the same local ZIP packaging flow as Gen1, but deploys through the Cloud Functions v2 API. It is selected directly through the experiment-level `system_variant`: + +```json +"deployment": { + "name": "gcp", + "gcp": { + "region": "europe-west1", + "project_name": "your-gcp-project-id", + "credentials": "/path/to/project-credentials.json", + "configuration": { + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } + } +} +``` + +Set `experiments.system_variant` to one of `function-gen1`, `function-gen2`, or `container`. From the CLI and regression workflows, the same selection is exposed through the generic `--system-variant` option. + +Gen1 and Gen2 package deployments use separate SeBS cache identities and separate cloud function names with short `-gen1` and `-gen2` suffixes. This avoids control-plane races when switching between package modes. In practice, regression and benchmark runs should still select one GCP package mode at a time for a given run. + ### Cloud Run Container Deployments Container deployments are the currently implemented Cloud Run-based path in SeBS. They are selected with container deployment and use a provider-specific function image built from `Dockerfile.function`. @@ -298,7 +328,8 @@ Cloud Run containers can [execute in two environments](https://docs.cloud.google The current GCP backend has the following practical limits: * Gen1 is the primary managed-functions deployment path today. -* Gen2 is planned and partially modeled in configuration, but not yet fully deployed through a dedicated strategy. +* Gen2 supports the ZIP package deployment path and HTTP triggers. +* Library-trigger direct invocation remains Gen1-only. * Cloud Run containers are implemented today and provide the most tuning control. * GCP deployments currently reject `arm64`, as arm64 instances are not available for GCR. * C++ packaging is not supported on GCP (but possible to be implemented on containers). diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index c001e651..dd7077d1 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -29,6 +29,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.utils import LoggingHandlers from sebs.faas.function import Function, ExecutionResult, Trigger, FunctionConfig from sebs.faas.system import System @@ -191,7 +192,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Package code for deployment to AWS Lambda. @@ -321,7 +322,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "LambdaFunction": """ @@ -333,8 +334,8 @@ def create_function( Args: code_package: Benchmark code package func_name: Name of the function - container_deployment: Whether to use container deployment - container_uri: URI of the container image (if container_deployment=True) + system_variant: Selected deployment variant + container_uri: URI of the container image (if container deployment is selected) Returns: LambdaFunction: The created or updated Lambda function @@ -366,7 +367,7 @@ def create_function( self.config.resources.lambda_role(self.session), function_cfg, ) - self.update_function(lambda_function, code_package, container_deployment, container_uri) + self.update_function(lambda_function, code_package, system_variant, container_uri) lambda_function.updated_code = True # TODO: get configuration of REST API except self.client.exceptions.ResourceNotFoundException: @@ -379,7 +380,7 @@ def create_function( "Code": {}, } - if container_deployment: + if system_variant.is_container: create_function_params["PackageType"] = "Image" create_function_params["Code"] = {"ImageUri": container_uri} self.logging.info( @@ -467,7 +468,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ): """ @@ -480,13 +481,13 @@ def update_function( Args: function: The function to update code_package: Benchmark code package - container_deployment: Whether to use container deployment - container_uri: URI of the container image (if container_deployment=True) + system_variant: Selected deployment variant + container_uri: URI of the container image (if container deployment is selected) """ name = function.name function = cast(LambdaFunction, function) - if container_deployment: + if system_variant.is_container: self.client.update_function_code(FunctionName=name, ImageUri=container_uri) else: code_size = code_package.code_size @@ -612,7 +613,7 @@ def default_function_name( code_package.language_version, code_package.architecture, ) - if code_package.container_deployment: + if code_package.system_variant.is_container: func_name = f"{func_name}-docker" return AWS.format_function_name(func_name) diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 9a55085e..6fbb7906 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -54,6 +54,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.utils import LoggingHandlers, execute from sebs.faas.function import Function, FunctionConfig, ExecutionResult from sebs.faas.system import System @@ -220,7 +221,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package function code for Azure Functions deployment. Creates the proper directory structure and configuration files @@ -237,7 +238,7 @@ def package_code( architecture: Target architecture (currently unused) benchmark: Name of the benchmark is_cached: Whether the package is from cache - container_deployment: Whether to use container deployment + system_variant: Selected deployment variant Returns: Tuple of (directory_path, code_size_bytes, container_uri) @@ -504,7 +505,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update existing Azure Function with new code. @@ -517,14 +518,14 @@ def update_function( Args: function: Function instance to update code_package: New benchmark code package - container_deployment: Whether using container deployment + system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Raises: NotImplementedError: If container deployment is requested. """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") assert code_package.has_input_processed @@ -752,7 +753,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> AzureFunction: """Create new Azure Function. @@ -764,7 +765,7 @@ def create_function( Args: code_package: Benchmark code package to deploy func_name: Name for the Azure Function App - container_deployment: Whether to use container deployment + system_variant: Selected deployment variant container_uri: Container URI (unused for Azure) Returns: @@ -775,7 +776,7 @@ def create_function( RuntimeError: If function creation fails. """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Azure") language = code_package.language_name @@ -852,7 +853,7 @@ def create_function( ) # update existing function app - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), diff --git a/sebs/benchmark.py b/sebs/benchmark.py index e0173e79..32cf62b9 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -33,7 +33,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from sebs.experiments.config import Config as ExperimentConfig + from sebs.experiments.config import Config as ExperimentConfig, SystemVariant class LanguageSpec: @@ -286,7 +286,7 @@ class Benchmark(LoggingBase): uses_storage: Whether the benchmark uses cloud storage uses_nosql: Whether the benchmark uses NoSQL databases architecture: CPU architecture of the deployment target - container_deployment: Whether using container deployment + system_variant: Deployment variant selected for the target provider """ @@ -443,7 +443,7 @@ def container_uri(self) -> str: return self._container_uri @property - def language(self) -> "Language": + def language(self) -> Language: """ Get the programming language for the benchmark. @@ -471,16 +471,6 @@ def language_variant(self) -> str: """ return self._language_variant - @property - def cache_language_key(self) -> str: - """ - Add language variant to the cache key so that different variants of - the same language don't conflict in cache. - """ - if self._language_variant == "default": - return self._language.value - return f"{self._language.value}_{self._language_variant}" - @property def language_version(self) -> str: """ @@ -532,14 +522,14 @@ def architecture(self) -> str: return self._architecture @property - def container_deployment(self) -> bool: - """ - Check if using container deployment. + def system_variant(self) -> SystemVariant: + """Return the selected deployment variant for this benchmark.""" + return self._system_variant - Returns: - bool: True if using container deployment, False otherwise - """ - return self._container_deployment + @property + def system_variant_suffix(self) -> str | None: + """Return the selected deployment variant for this benchmark.""" + return self._system_variant_suffix @property # noqa: A003 def hash(self) -> str: @@ -579,6 +569,7 @@ def __init__( output_dir: str, cache_client: Cache, docker_client: docker.client.DockerClient, + system_variant_suffix: Optional[str] = None, verbose: bool = False, ): """ @@ -596,6 +587,7 @@ def __init__( output_dir: Directory for output files cache_client: Cache client for caching code packages docker_client: Docker client for building dependencies + system_variant_suffix: Optional provider-local system variant suffix verbose: Print verbose build logs. Raises: @@ -610,7 +602,8 @@ def __init__( assert config.runtime.variant is not None self._language_variant = config.runtime.variant.value self._architecture = self._experiment_config.architecture - self._container_deployment = config.container_deployment + self._system_variant = config.system_variant + self._system_variant_suffix = system_variant_suffix self._verbose = verbose benchmark_path = find_benchmark(self.benchmark, "benchmarks") @@ -639,7 +632,15 @@ def __init__( self._language_variant, self._language_version, self._architecture, - "container" if self._container_deployment else "package", + ( + "container" + if self._system_variant.is_container + else ( + f"package_{self._system_variant_suffix}" + if self._system_variant_suffix + else "package" + ) + ), ) self._container_uri: Optional[str] = None @@ -748,29 +749,17 @@ def query_cache(self) -> None: and deployment combination. Updates the cache status fields based on whether the cache exists and if it's still valid (hash matches). """ - if self.container_deployment: - self._code_package = self._cache_client.get_container( - deployment=self._deployment_name, - benchmark=self._benchmark, - language=self.cache_language_key, - language_version=self.language_version, - architecture=self.architecture, - ) + if self.system_variant.is_container: + self._code_package = self._cache_client.get_container(self._deployment_name, self) if self._code_package is not None: self._container_uri = self._code_package["image-uri"] else: - self._code_package = self._cache_client.get_code_package( - deployment=self._deployment_name, - benchmark=self._benchmark, - language=self.cache_language_key, - language_version=self.language_version, - architecture=self.architecture, - ) + self._code_package = self._cache_client.get_code_package(self._deployment_name, self) self._functions = self._cache_client.get_functions( deployment=self._deployment_name, benchmark=self._benchmark, - language=self.cache_language_key, + language=self.language, ) if self._code_package is not None: @@ -1141,7 +1130,7 @@ def add_deployment_package(self, output_dir: str) -> None: raise NotImplementedError @staticmethod - def directory_size(directory: str) -> int: + def directory_size(directory: str) -> float: """Calculate total size of all files in a directory. Recursively calculates the total size in bytes of all files @@ -1157,7 +1146,7 @@ def directory_size(directory: str) -> int: root = Path(directory) sizes = [f.stat().st_size for f in root.glob("**/*") if f.is_file()] - return sum(sizes) + return sum(sizes) / 1024.0 / 1024.0 def builder_image_name(self) -> Tuple[str, str]: """Image names of builder Docker images for preparing benchmarks. @@ -1347,7 +1336,7 @@ def ensure_image(name: str) -> None: self.logging.error(f"Docker mount volumes: {volumes}") raise e from None - def recalculate_code_size(self) -> int: + def recalculate_code_size(self) -> float: """Recalculate and update the code package size. Measures the current size of the output directory and updates @@ -1361,11 +1350,11 @@ def recalculate_code_size(self) -> int: def build( self, - package_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]], + package_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, float]], container_client: DockerContainer | None, - container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, int]] + container_build_step: Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None, - ) -> Tuple[bool, str | None, bool, str | None]: + ) -> Tuple[bool, str | None, SystemVariant, str | None]: """Build the complete benchmark deployment package. Orchestrates the entire build process for a benchmark, including: @@ -1385,12 +1374,12 @@ def build( Tuple containing: - bool: Whether a new build was performed (False if cached) - str: Path to the built code package - - bool: Whether this is a container deployment + - SystemVariant: Selected deployment variant - str: Container URI (empty string if not container deployment) """ # Skip build if files are up to date and user didn't enforce rebuild if self.is_cached and self.is_cached_valid: - if self.container_deployment: + if self.system_variant.is_container: if self._container_uri is None: assert container_client is not None self._container_uri = container_client.push_to_registry( @@ -1401,10 +1390,7 @@ def build( ) self._cache_client.update_container_uri( self._deployment_name, - self._benchmark, - self.language_name, - self.language_version, - self.architecture, + self, self._container_uri, ) self.logging.info( @@ -1412,12 +1398,12 @@ def build( self.benchmark, self.container_uri ) ) - return False, None, self.container_deployment, self.container_uri + return False, None, self.system_variant, self.container_uri else: self.logging.info( "Using cached benchmark {} at {}".format(self.benchmark, self.code_location) ) - return False, self.code_location, self.container_deployment, None + return False, self.code_location, self.system_variant, None msg = ( "no cached code package/container." @@ -1447,7 +1433,7 @@ def build( """ self._container_uri = None - if self.container_deployment: + if self.system_variant.is_container: assert container_client is not None repo_name = self._system_config.docker_repository() @@ -1594,7 +1580,7 @@ def build( return ( True, self._code_location, - self._container_deployment, + self._system_variant, self._container_uri, ) @@ -1732,7 +1718,7 @@ def code_package_modify(self, filename: str, data: bytes) -> None: Raises: NotImplementedError: If the code package is not a ZIP archive """ - if not self.container_deployment and self.code_package_is_archive(): + if not self.system_variant.is_container and self.code_package_is_archive(): assert self.code_location is not None self._update_zip(self.code_location, filename, data) new_size = self.code_package_recompute_size() / 1024.0 / 1024.0 @@ -1750,7 +1736,7 @@ def code_package_is_archive(self) -> bool: bool: True if package is a ZIP archive, False if it's a directory """ - if self.container_deployment: + if self.system_variant.is_container: return False code_location = self.code_location @@ -1769,7 +1755,7 @@ def code_package_recompute_size(self) -> float: Returns: float: Updated package size in bytes """ - if self.container_deployment: + if self.system_variant.is_container: raise NotImplementedError() if self.code_location is None: diff --git a/sebs/cache.py b/sebs/cache.py index 78f4a8f4..6deccd4f 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -29,7 +29,8 @@ import os import shutil import threading -from typing import Any, Callable, Dict, List, Mapping, Optional, TYPE_CHECKING # noqa +import tempfile +from typing import Any, Dict, List, Mapping, Optional, Tuple, TYPE_CHECKING # noqa from sebs.utils import LoggingBase, serialize @@ -96,6 +97,47 @@ def map_keys(obj: Dict[str, Any], val: Any, keys: List[str]) -> Dict[str, Any]: update(cfg, map_keys(cfg, val, keys)) +def keys_exist(obj: Dict, keys: List[Any]) -> bool: + """Find if a nested object exists in a dictionary. + + example: {key1: {key2: {key3: value}}} + for [key1, key2, key3] -> True + + Args: + obj: dictionary + keys: dynamic list of nested keys + + Returns: + true if the nested object exists + """ + for key in keys: + if isinstance(obj, Dict) and key in obj: + obj = obj[key] + else: + return False + return True + + +def keys_get(obj: Dict, keys: List[Any]) -> Any: + """Find if a nested object exists in a dictionary. + + example: {key1: {key2: {key3: value}}} + for [key1, key2, key3] -> True + + Args: + obj: dictionary + keys: dynamic list of nested keys + + Returns: + true if the nested object exists + """ + current = obj + for key in keys: + if isinstance(current, Dict) and key in current: + current = current[key] + return current + + class Cache(LoggingBase): """Persistent caching system for SeBS benchmark configurations and deployments. @@ -113,6 +155,9 @@ class Cache(LoggingBase): docker_client (docker.DockerClient): Docker client for container operations. """ + _lock_registry_guard = threading.Lock() + _lock_registry: Dict[str, threading.RLock] = {} + def __init__(self, cache_dir: str, docker_client: docker.DockerClient) -> None: """Initialize the Cache with directory and Docker client. @@ -131,7 +176,7 @@ def __init__(self, cache_dir: str, docker_client: docker.DockerClient) -> None: self.cache_dir = os.path.abspath(cache_dir) self.ignore_functions: bool = False self.ignore_storage: bool = False - self._lock = threading.RLock() + self._lock = self._cache_dir_lock(self.cache_dir) if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir, exist_ok=True) else: @@ -146,6 +191,48 @@ def typename() -> str: """ return "Cache" + @classmethod + def _cache_dir_lock(cls, cache_dir: str) -> threading.RLock: + """Return a shared lock for all Cache instances pointing at one cache dir.""" + with cls._lock_registry_guard: + if cache_dir not in cls._lock_registry: + cls._lock_registry[cache_dir] = threading.RLock() + return cls._lock_registry[cache_dir] + + @staticmethod + def _write_json_atomic(path: str, data: Any) -> None: + """Atomically replace a JSON file after fully writing it to a temp file.""" + directory = os.path.dirname(path) + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") + try: + with os.fdopen(fd, "w") as fp: + json.dump(data, fp, indent=2) + fp.flush() + os.fsync(fp.fileno()) + os.replace(tmp_path, path) + except Exception: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + @staticmethod + def _write_serialized_atomic(path: str, data: Dict[str, Any]) -> None: + """Atomically replace a JSON file using the SeBS serializer.""" + directory = os.path.dirname(path) + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=directory, prefix=".tmp-", suffix=".json") + try: + with os.fdopen(fd, "w") as fp: + fp.write(serialize(data)) + fp.flush() + os.fsync(fp.fileno()) + os.replace(tmp_path, path) + except Exception: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + def load_config(self) -> None: """Load cached cloud configurations from disk. @@ -200,12 +287,12 @@ def shutdown(self) -> None: JSON files in the cache directory. """ if self.config_updated: - for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: - if cloud in self.cached_config: - cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) - self.logging.info("Update cached config {}".format(cloud_config_file)) - with open(cloud_config_file, "w") as out: - json.dump(self.cached_config[cloud], out, indent=2) + with self._lock: + for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]: + if cloud in self.cached_config: + cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) + self.logging.info("Update cached config {}".format(cloud_config_file)) + self._write_json_atomic(cloud_config_file, self.cached_config[cloud]) def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict[str, Any]]: """Access cached configuration of a benchmark. @@ -217,68 +304,53 @@ def get_benchmark_config(self, deployment: str, benchmark: str) -> Optional[Dict Returns: Optional[Dict[str, Any]]: Benchmark configuration or None if not found. """ - benchmark_dir = os.path.join(self.cache_dir, benchmark) - if os.path.exists(benchmark_dir): - config_file = os.path.join(benchmark_dir, "config.json") - if os.path.exists(config_file): - with open(config_file, "r") as fp: - cfg = json.load(fp) - return cfg[deployment] if deployment in cfg else None + with self._lock: + benchmark_dir = os.path.join(self.cache_dir, benchmark) + if os.path.exists(benchmark_dir): + config_file = os.path.join(benchmark_dir, "config.json") + if os.path.exists(config_file): + with open(config_file, "r") as fp: + cfg = json.load(fp) + return cfg[deployment] if deployment in cfg else None return None def get_code_package( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, + self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached version of benchmark code package. Args: deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Code package configuration or None if not found. """ - cfg = self.get_benchmark_config(deployment, benchmark) + cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) - key = f"{language_version}-{architecture}" - if cfg and language in cfg and key in cfg[language]["code_package"]: - return cfg[language]["code_package"][key] + base_keys, extra_keys = self.code_cache_keys(code_package) + if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): + return keys_get(cfg, [*base_keys, *extra_keys]) else: return None def get_container( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, + self, deployment_name: str, code_package: "Benchmark" ) -> Optional[Dict[str, Any]]: """Access cached container configuration for a benchmark. Args: deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + code_package (Benchmark): Benchmark package. Returns: Optional[Dict[str, Any]]: Container configuration or None if not found. """ - cfg = self.get_benchmark_config(deployment, benchmark) + cfg = self.get_benchmark_config(deployment_name, code_package.benchmark) - key = f"{language_version}-{architecture}" - if cfg and language in cfg and key in cfg[language]["containers"]: - return cfg[language]["containers"][key] + base_keys, extra_keys = self.code_cache_keys(code_package) + if cfg and keys_exist(cfg, [*base_keys, *extra_keys]): + return keys_get(cfg, [*base_keys, *extra_keys]) else: return None @@ -294,6 +366,20 @@ def invalidate_all_container_uris(self, deployment: str) -> None: Args: deployment (str): Deployment platform name. """ + + def clear_nested_container_uris(obj: Dict[str, Any]) -> bool: + """Internal method - recursively check for all image-uri. + Simpler method then walking all nested variants of containers.""" + modified = False + if "image-uri" in obj: + obj["image-uri"] = None + return True + + for value in obj.values(): + if isinstance(value, dict): + modified = clear_nested_container_uris(value) or modified + return modified + with self._lock: if not os.path.exists(self.cache_dir): return @@ -317,22 +403,13 @@ def invalidate_all_container_uris(self, deployment: str) -> None: containers = lang_cfg.get("containers") if containers is None: continue - for container_cfg in containers.values(): - container_cfg["image-uri"] = None - modified = True + modified = clear_nested_container_uris(containers) or modified if modified: - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def update_container_uri( - self, - deployment: str, - benchmark: str, - language: str, - language_version: str, - architecture: str, - uri: str, + self, deployment_name: str, code_package: "Benchmark", uri: str ) -> None: """Update the image-uri for a specific cached container entry. @@ -340,26 +417,26 @@ def update_container_uri( the registry to be accessible for cloud deployment. Args: - deployment (str): Deployment platform name. - benchmark (str): Benchmark name. - language (str): Programming language. - language_version (str): Language version. - architecture (str): Target architecture. + deployment_name (str): Deployment platform name. + code_package (Benchmark): Benchmark package identifying the cache entry. uri (str): New image URI to store. """ with self._lock: - config_path = os.path.join(self.cache_dir, benchmark, "config.json") + config_path = os.path.join(self.cache_dir, code_package.benchmark, "config.json") if not os.path.exists(config_path): return with open(config_path, "r") as fp: config = json.load(fp) - key = f"{language_version}-{architecture}" - config[deployment][language]["containers"][key]["image-uri"] = uri + base_keys, extra_keys = self.code_cache_keys(code_package) + keys = [deployment_name, *base_keys, *extra_keys] + if not keys_exist(config, keys): + return + + keys_get(config, keys)["image-uri"] = uri - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def get_functions( self, deployment: str, benchmark: str, language: str @@ -398,25 +475,26 @@ def get_all_functions(self, deployment: str) -> Dict[str, Any]: if not os.path.exists(self.cache_dir) or self.ignore_functions: return result - for entry in os.listdir(self.cache_dir): - config_path = os.path.join(self.cache_dir, entry, "config.json") - if not os.path.exists(config_path): - continue - - with open(config_path, "r") as fp: - config = json.load(fp) + with self._lock: + for entry in os.listdir(self.cache_dir): + config_path = os.path.join(self.cache_dir, entry, "config.json") + if not os.path.exists(config_path): + continue - dep_cfg = config.get(deployment) - if dep_cfg is None: - continue + with open(config_path, "r") as fp: + config = json.load(fp) - for lang_cfg in dep_cfg.values(): - if lang_cfg is None: + dep_cfg = config.get(deployment) + if dep_cfg is None: continue - functions = lang_cfg.get("functions") - if functions is not None: - result.update(functions) + for lang_cfg in dep_cfg.values(): + if lang_cfg is None: + continue + + functions = lang_cfg.get("functions") + if functions is not None: + result.update(functions) return result @@ -461,21 +539,22 @@ def get_nosql_configs(self, deployment: str) -> Dict[str, Any]: if not os.path.exists(self.cache_dir): return result - for entry in os.listdir(self.cache_dir): - config_path = os.path.join(self.cache_dir, entry, "config.json") - if not os.path.exists(config_path): - continue + with self._lock: + for entry in os.listdir(self.cache_dir): + config_path = os.path.join(self.cache_dir, entry, "config.json") + if not os.path.exists(config_path): + continue - with open(config_path, "r") as fp: - config = json.load(fp) + with open(config_path, "r") as fp: + config = json.load(fp) - dep_cfg = config.get(deployment) - if dep_cfg is None: - continue + dep_cfg = config.get(deployment) + if dep_cfg is None: + continue - nosql = dep_cfg.get("nosql") - if nosql is not None: - result.update(nosql) + nosql = dep_cfg.get("nosql") + if nosql is not None: + result.update(nosql) return result @@ -549,8 +628,7 @@ def remove_function(self, deployment: str, benchmark: str, language: str, functi self.logging.info(f"Deleting function {function_name} from cache") del lang_cfg["functions"][function_name] - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def remove_storage(self, deployment: str): """Remove storage config entries across all benchmarks for a deployment. @@ -589,8 +667,7 @@ def _remove_resource_config(self, deployment: str, resource: str): if deployment in config and resource in config[deployment]: del config[deployment][resource] - with open(config_path, "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(config_path, config) def get_config_key(self, keys: List[str]) -> Optional[Any]: """Return the value at a nested key path in the cached configuration. @@ -666,8 +743,25 @@ def _update_resources( else: cached_config[deployment] = {resource: config} - with open(config_file, "w") as fp: - json.dump(cached_config, fp, indent=2) + self._write_json_atomic(config_file, cached_config) + + @staticmethod + def code_cache_keys(code_package: "Benchmark") -> Tuple[List[str], List[str]]: + """ + Add language and system variant suffixes to the package cache key so + differing build artifacts do not conflict in cache. + """ + base_key = [code_package.language.value] + base_key.append( + "containers" if code_package.system_variant.is_container else "code_package" + ) + extra_keys = [] + extra_keys.append(code_package.language_variant) + if code_package.system_variant_suffix is not None: + extra_keys.append(code_package.system_variant_suffix) + extra_keys.append(code_package.language_version) + extra_keys.append(code_package.architecture) + return base_key, extra_keys def add_code_package( self, @@ -689,23 +783,12 @@ def add_code_package( RuntimeError: If cached application already exists for the deployment. """ with self._lock: - language = code_package.cache_language_key - language_version = code_package.language_version - architecture = code_package.architecture - benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) os.makedirs(benchmark_dir, exist_ok=True) - package_type = "docker" if code_package.container_deployment else "package" # Check if cache directory for this deployment exist - cached_dir = os.path.join( - benchmark_dir, - deployment_name, - language, - language_version, - architecture, - package_type, - ) + base_keys, extra_keys = self.code_cache_keys(code_package) + cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) if not os.path.exists(cached_dir): os.makedirs(cached_dir, exist_ok=True) @@ -740,62 +823,46 @@ def add_code_package( "modified": date, } - key = f"{language_version}-{architecture}" - if code_package.container_deployment: + config: Dict[str, Any] = { + "containers": {}, + "code_package": {}, + "functions": {}, + } + if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) language_config["image-uri"] = code_package.container_uri language_config["image-id"] = image.id - config = { - deployment_name: { - language: { - "containers": {key: language_config}, - "code_package": {}, - "functions": {}, - } - } - } + update_dict(config["containers"], language_config, extra_keys) else: - config = { - deployment_name: { - language: { - "code_package": {key: language_config}, - "containers": {}, - "functions": {}, - } - } - } + update_dict(config["code_package"], language_config, extra_keys) # make sure to not replace other entries if os.path.exists(os.path.join(benchmark_dir, "config.json")): with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: cached_config = json.load(fp) - if deployment_name in cached_config: - # language known, platform known, extend dictionary - if language in cached_config[deployment_name]: - if code_package.container_deployment: - cached_config[deployment_name][language]["containers"][ - key - ] = language_config - else: - cached_config[deployment_name][language]["code_package"][ - key - ] = language_config - - # language unknown, platform known - add new dictionary - else: - cached_config[deployment_name][language] = config[deployment_name][ - language - ] + + keys = [*base_keys, *extra_keys] + language = keys[0] + if language in cached_config[deployment_name]: + # language known - add code package, + # but do not overwrite existing entries + update_dict( + cached_config[deployment_name][language], language_config, keys[1:] + ) else: - # language unknown, platform unknown - add new dictionary - cached_config[deployment_name] = config[deployment_name] + # language unknown - add new dictionary + # everything else needs to be initialized + cached_config[deployment_name][language] = config + config = cached_config - with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: - json.dump(config, fp, indent=2) + else: + # entirely new entry + language = base_keys[0] + config = {deployment_name: {language: config}} + self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: - # TODO: update raise RuntimeError( "Cached application {} for {} already exists!".format( code_package.benchmark, deployment_name @@ -818,21 +885,11 @@ def update_code_package( code_package (Benchmark): The benchmark code package to update. """ with self._lock: - language = code_package.cache_language_key - language_version = code_package.language_version - architecture = code_package.architecture benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) - package_type = "docker" if code_package.container_deployment else "package" # Check if cache directory for this deployment exist - cached_dir = os.path.join( - benchmark_dir, - deployment_name, - language, - language_version, - architecture, - package_type, - ) + base_keys, extra_keys = self.code_cache_keys(code_package) + cached_dir = os.path.join(benchmark_dir, deployment_name, *base_keys, *extra_keys) config_location = os.path.join(benchmark_dir, "config.json") @@ -849,22 +906,12 @@ def update_code_package( A simple check of directory existence is insufficient, as we might have created a code package earlier (which creates a directory), but not a container. """ - key = f"{language_version}-{architecture}" - if code_package.container_deployment: - main_key = "containers" - else: - main_key = "code_package" - - package_exists = True - try: - config[deployment_name][language][main_key][key] - except KeyError: - package_exists = False - + package_exists = keys_exist(config, [deployment_name, *base_keys, *extra_keys]) + if not package_exists: """ - We have no such cache entry - fallback. - However, we still have directory, a possible leftover after crash. - Whatever was left, we remove it since we have no information what is there. + We have no such cache entry - fallback. + However, we still have directory, a possible leftover after crash. + Whatever was left, we remove it since we have no information what is there. """ if os.path.exists(cached_dir): shutil.rmtree(cached_dir) @@ -893,19 +940,17 @@ def update_code_package( else: self.logging.info(f"Caching container pushed to: {code_package.container_uri}") - config[deployment_name][language][main_key][key]["date"]["modified"] = date - config[deployment_name][language][main_key][key]["hash"] = code_package.hash - config[deployment_name][language][main_key][key]["size"] = code_package.code_size + cached_config = keys_get(config, [deployment_name, *base_keys, *extra_keys]) + cached_config["date"]["modified"] = date + cached_config["hash"] = code_package.hash + cached_config["size"] = code_package.code_size - if code_package.container_deployment: + if code_package.system_variant.is_container: image = self.docker_client.images.get(code_package.container_uri) - config[deployment_name][language][main_key][key]["image-id"] = image.id - config[deployment_name][language][main_key][key][ - "image-uri" - ] = code_package.container_uri + cached_config["image-id"] = image.id + cached_config["image-uri"] = code_package.container_uri - with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: - json.dump(config, fp, indent=2) + self._write_json_atomic(os.path.join(benchmark_dir, "config.json"), config) else: self.add_code_package(deployment_name, code_package) @@ -934,7 +979,7 @@ def add_function( return with self._lock: benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) - language = code_package.cache_language_key + language = language_name cache_config = os.path.join(benchmark_dir, "config.json") if os.path.exists(cache_config): @@ -942,15 +987,20 @@ def add_function( with open(cache_config, "r") as fp: cached_config = json.load(fp) - if "functions" not in cached_config[deployment_name][language]: + if language not in cached_config[deployment_name]: + cached_config[deployment_name][language] = { + "functions": functions_config, + "code_package": {}, + "containers": {}, + } + elif "functions" not in cached_config[deployment_name][language]: cached_config[deployment_name][language]["functions"] = functions_config else: cached_config[deployment_name][language]["functions"].update( functions_config ) config = cached_config - with open(cache_config, "w") as fp: - fp.write(serialize(config)) + self._write_serialized_atomic(cache_config, config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) @@ -986,8 +1036,7 @@ def update_function(self, function: "Function") -> None: cached_config[deployment][language]["functions"][ name ] = function.serialize() - with open(cache_config, "w") as fp: - fp.write(serialize(cached_config)) + self._write_serialized_atomic(cache_config, cached_config) else: raise RuntimeError( "Can't cache function {} for a non-existing code package!".format(function.name) diff --git a/sebs/cli.py b/sebs/cli.py index ea4d026c..ec7bab57 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -124,9 +124,10 @@ def common_params(func): help="Target architecture", ) @click.option( - "--container-deployment/--no-container-deployment", - default=False, - help="Deploy functions as container images (AWS only).", + "--system-variant", + default=None, + type=str, + help="Optional system-specific deployment variant interpreted by the selected platform.", ) @click.option( "--resource-prefix", @@ -157,7 +158,7 @@ def parse_common_params( language_version, language_variant, architecture, - container_deployment, + system_variant, resource_prefix: Optional[str] = None, initialize_deployment: bool = True, ignore_cache: bool = False, @@ -186,7 +187,13 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "update_code"], update_code) update_nested_dict(config_obj, ["experiments", "update_storage"], update_storage) update_nested_dict(config_obj, ["experiments", "architecture"], architecture) - update_nested_dict(config_obj, ["experiments", "container_deployment"], container_deployment) + update_nested_dict(config_obj, ["experiments", "system_variant"], system_variant) + + selected_deployment = config_obj.get("deployment", {}).get("name") + if selected_deployment and "system_variant" not in config_obj.get("experiments", {}): + config_obj["experiments"]["system_variant"] = sebs_client.config.default_system_variant( + selected_deployment + ) # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) @@ -695,7 +702,7 @@ def start( update_storage=False, deployment="local", storage_configuration=storage_configuration, - container_deployment=False, + system_variant="package", architecture=architecture, **kwargs, ) diff --git a/sebs/config.py b/sebs/config.py index 6f66e888..e23d460f 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -166,27 +166,25 @@ def supported_variants(self, deployment_name: str, language_name: str) -> List[s languages = self._system_config.get(deployment_name, {}).get("languages", {}) return languages.get(language_name, {}).get("supported_variants", ["default"]) - def supported_package_deployment(self, deployment_name: str) -> bool: - """Check if package-based deployment is supported for a platform. + def supported_system_variants(self, deployment_name: str) -> List[str]: + """Return the supported deployment variants for a platform.""" + return self._system_config[deployment_name]["deployments"] - Args: - deployment_name (str): Name of the deployment platform (e.g., 'aws', 'azure'). - - Returns: - bool: True if package deployment is supported, False otherwise. - """ - return "package" in self._system_config[deployment_name]["deployments"] + def default_system_variant(self, deployment_name: str) -> str: + """Return the default deployment variant for a platform. - def supported_container_deployment(self, deployment_name: str) -> bool: - """Check if container-based deployment is supported for a platform. + The default is the first declared variant in ``systems.json``. Args: - deployment_name (str): Name of the deployment platform (e.g., 'aws', 'azure'). + deployment_name: Name of the deployment platform. Returns: - bool: True if container deployment is supported, False otherwise. + Default deployment variant for the platform. """ - return "container" in self._system_config[deployment_name]["deployments"] + variants = self.supported_system_variants(deployment_name) + if not variants: + raise RuntimeError(f"Deployment {deployment_name} has no configured system variants.") + return variants[0] def benchmark_base_images( self, deployment_name: str, language_name: str, architecture: str diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index edde88de..d88ee6ea 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -12,12 +12,66 @@ The Config class handles serialization and deserialization of experiment configurations, allowing them to be loaded from and saved to configuration files. """ +from __future__ import annotations from typing import Dict from sebs.faas.function import Runtime +class SystemVariant: + """Deployment variant selected for an experiment. + + The variant is provider-specific, but exposes a shared ``is_container`` + property for logic that only cares about the package vs container split. + """ + + ALL_SYSTEM_VARIANTS = [ + # default everywhere but OpenWhisk and GCP + "package", + # Lambda, OpenWhisk, and GCP + "container", + # GCP specific + "function-gen1", + "function-gen2", + ] + + def __init__(self, value: str): + """Initialize the system variant. + + Args: + value: Provider-specific deployment variant name. + """ + self._value = value + + @property + def value(self) -> str: + """Get the provider-specific deployment variant name.""" + return self._value + + @property + def is_container(self) -> bool: + """Return whether this deployment variant uses containers.""" + return self._value == "container" + + def serialize(self) -> str: + """Serialize the deployment variant to a string.""" + return self._value + + @staticmethod + def deserialize(value: str) -> SystemVariant: + """Deserialize a deployment variant from a string.""" + if value not in SystemVariant.ALL_SYSTEM_VARIANTS: + raise ValueError(f"Invalid system variant: {value}") + return SystemVariant(value) + + def __eq__(self, other: object) -> bool: + """Compare two system variants.""" + if not isinstance(other, SystemVariant): + return False + return self.value == other.value + + class Config: """Configuration class for benchmark experiments. @@ -28,7 +82,7 @@ class Config: Attributes: _update_code: Whether to update function code _update_storage: Whether to update storage resources - _container_deployment: Whether to use container-based deployment + _system_variant: Deployment variant selected for the target provider _download_results: Whether to download experiment results _architecture: CPU architecture (e.g., "x64", "arm64") _flags: Dictionary of boolean flags for custom settings @@ -40,7 +94,7 @@ def __init__(self): """Initialize a new experiment configuration with default values.""" self._update_code: bool = False self._update_storage: bool = False - self._container_deployment: bool = False + self._system_variant = SystemVariant("package") self._download_results: bool = False self._architecture: str = "x64" self._flags: Dict[str, bool] = {} @@ -107,13 +161,9 @@ def architecture(self) -> str: return self._architecture @property - def container_deployment(self) -> bool: - """Get whether to use container-based deployment. - - Returns: - True if container-based deployment should be used, False otherwise - """ - return self._container_deployment + def system_variant(self) -> SystemVariant: + """Get the selected deployment variant.""" + return self._system_variant def experiment_settings(self, name: str) -> dict: """Get settings for a specific experiment. @@ -146,7 +196,7 @@ def serialize(self) -> dict: "flags": self._flags, "experiments": self._experiment_configs, "architecture": self._architecture, - "container_deployment": self._container_deployment, + "system_variant": self._system_variant.serialize(), } return out @@ -173,7 +223,11 @@ def deserialize(config: dict) -> "Config": cfg._update_code = config["update_code"] cfg._update_storage = config["update_storage"] cfg._download_results = config["download_results"] - cfg._container_deployment = config["container_deployment"] + if "system_variant" in config: + cfg._system_variant = SystemVariant.deserialize(config["system_variant"]) + else: + legacy_is_container = config.get("container_deployment", False) + cfg._system_variant = SystemVariant("container" if legacy_is_container else "package") cfg._runtime = Runtime.deserialize(config["runtime"]) cfg._flags = config["flags"] if "flags" in config else {} cfg._architecture = config["architecture"] diff --git a/sebs/experiments/invocation_overhead.py b/sebs/experiments/invocation_overhead.py index 150a4790..b1df8aae 100644 --- a/sebs/experiments/invocation_overhead.py +++ b/sebs/experiments/invocation_overhead.py @@ -92,7 +92,10 @@ def before_sample(self, size: int, input_benchmark: dict) -> None: arr = bytearray((random.getrandbits(8) for i in range(size))) self._benchmark.code_package_modify("randomdata.bin", bytes(arr)) function = self._deployment_client.get_function(self._benchmark) - self._deployment_client.update_function(function, self._benchmark, False, "") + # FIXME: we might want a change in the future - support containers + self._deployment_client.update_function( + function, self._benchmark, self._benchmark.system_variant, "" + ) class PayloadSize: diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index e8b2489a..e4421559 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -176,8 +176,10 @@ def run(self) -> None: self._deployment_client.update_function( self._function, self._benchmark, - self._benchmark.container_deployment, - self._benchmark.container_uri if self._benchmark.container_deployment else "", + self._benchmark.system_variant, + self._benchmark.container_uri + if self._benchmark.system_variant.is_container + else "", ) self._sebs_client.cache_client.update_function(self._function) # Run experiment with this memory configuration diff --git a/sebs/faas/system.py b/sebs/faas/system.py index d76d3e48..291e9ce7 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -20,6 +20,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.faas.container import DockerContainer from sebs.faas.resources import SystemResources from sebs.faas.config import Resources @@ -154,6 +155,10 @@ def container_client(self) -> DockerContainer | None: """ return None + def system_variant_suffix(self, system_variant: SystemVariant) -> Optional[str]: + """Return an optional provider-local system variant suffix.""" + return None + @property def system_resources(self) -> SystemResources: """ @@ -284,7 +289,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Apply system-specific code packaging to prepare a deployment package. @@ -314,7 +319,7 @@ def package_code( def finalize_container_build( self, - ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None: + ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None: """Default behavior of container deployment is that no code package is needed. Thus, we return None to signal that. @@ -330,7 +335,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> Function: """ @@ -341,14 +346,14 @@ def create_function( Args: code_package: Benchmark containing the function code func_name: Name of the function - container_deployment: Whether to deploy as a container + system_variant: Selected deployment variant container_uri: URI of the container image Returns: Function: Created function instance Raises: - NotImplementedError: If container deployment is requested but not supported + NotImplementedError: If the deployment variant is not supported """ pass @@ -372,7 +377,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ): """ @@ -381,11 +386,11 @@ def update_function( Args: function: Existing function instance to update code_package: New benchmark containing the function code - container_deployment: Whether to deploy as a container + system_variant: Selected deployment variant container_uri: URI of the container image Raises: - NotImplementedError: If container deployment is requested but not supported + NotImplementedError: If the deployment variant is not supported """ pass @@ -413,14 +418,14 @@ def build_function(self, code_package: Benchmark, func_name: Optional[str] = Non if not func_name: func_name = self.default_function_name(code_package) - _, code_package_loc, container_deployment, container_uri = code_package.build( + _, code_package_loc, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) if code_package_loc is not None: self.logging.info( f"Created code package for function {func_name} at {code_package_loc}" ) - if container_deployment: + if system_variant.is_container: self.logging.info( f"Created container deployment for function {func_name}: {container_uri}" ) @@ -468,7 +473,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) func_name = self.default_function_name(code_package) # Build the code package - rebuilt, _, container_deployment, container_uri = code_package.build( + rebuilt, _, system_variant, container_uri = code_package.build( self.package_code, self.container_client, self.finalize_container_build() ) @@ -489,6 +494,14 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) ) self.logging.error(e) is_function_cached = False + else: + err_msg = self.can_reuse_cached_function(function, code_package) + if err_msg is not None: + self.logging.info( + f"Cached function {func_name} is not compatible with the current " + f"deployment configuration and will be replaced. Reason: {err_msg}" + ) + is_function_cached = False # Create new function if not cached or deserialize failed if not is_function_cached: @@ -498,9 +511,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) - function = self.create_function( - code_package, func_name, container_deployment, container_uri - ) + function = self.create_function(code_package, func_name, system_variant, container_uri) self.cache_client.add_function( deployment_name=self.name(), language_name=code_package.language_name, @@ -512,7 +523,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else: assert function is not None self.cached_function(function) - if code_package.container_deployment: + if code_package.system_variant.is_container: self.logging.info( f"Using cached function {func_name} container {code_package.container_uri}" ) @@ -541,7 +552,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) ) # Update function code - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) function.code_package_hash = code_package.hash function.updated_code = True self.cache_client.add_function( @@ -614,6 +625,20 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma return changed + def can_reuse_cached_function( + self, cached_function: Function, benchmark: Benchmark + ) -> Optional[str]: + """Check whether a cached function can be reused as-is. + + Args: + cached_function: Cached function selected from SeBS cache. + benchmark: Benchmark requesting the function. + + Returns: + string explaining why the function cannot be reused, or None if it can be reused. + """ + return None + @abstractmethod def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index 5143431d..3d05c664 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -22,7 +22,7 @@ import json import os -from typing import cast, Dict, List, Optional, Tuple +from typing import cast, Any, Dict, List, Optional, Tuple import time from googleapiclient.errors import HttpError @@ -462,7 +462,7 @@ def serialize(self) -> Dict: Returns: Dictionary representation of resources for cache storage """ - out = {} + out: Dict[str, Any] = {} out["function-gen1"] = self._function_gen1_config.serialize() out["function-gen2"] = self._function_gen2_config.serialize() out["container"] = self._container_config.serialize() diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index ff3dcc05..a0adea56 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -33,6 +33,7 @@ import shutil import time import math +import urllib.request import zipfile from datetime import datetime, timezone from typing import Any, cast, Dict, Optional, Tuple, List, Type, Protocol, Union @@ -45,6 +46,7 @@ from sebs.cache import Cache from sebs.config import SeBSConfig from sebs.benchmark import Benchmark, BenchmarkConfig +from sebs.experiments.config import SystemVariant from sebs.faas.function import Function, FunctionConfig, Trigger from sebs.faas.config import Resources from sebs.faas.system import System @@ -334,6 +336,185 @@ def download_metrics( ... +class CloudRunMetricsHelper: + """Helpers shared by Cloud Run services and Cloud Functions gen2.""" + + @staticmethod + def download_execution_metrics( + logging: ColoredWrapper, + project_name: str, + service_name: str, + start_time: int, + end_time: int, + requests: Dict, + label: str, + ) -> None: + """Download execution times from Cloud Run request logs.""" + import google.cloud.logging as gcp_logging + + timestamps = [] + for timestamp in [start_time, end_time + 1]: + utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) + timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + + logging_client = gcp_logging.Client() + entries = logging_client.list_entries( + filter_=( + 'resource.type = "cloud_run_revision" ' + 'logName = "projects/' + f'{project_name}/logs/run.googleapis.com%2Frequests" ' + f'resource.labels.service_name = "{service_name}" ' + f'timestamp >= "{timestamps[0]}" ' + f'timestamp <= "{timestamps[1]}"' + ), + page_size=1000, + ) + + found_metrics = 0 + total_entries = 0 + for entry in entries: + total_entries += 1 + trace_id = CloudRunMetricsHelper.extract_trace_id(entry) + if trace_id is None or trace_id not in requests: + continue + + execution_time_us = CloudRunMetricsHelper.extract_latency_us(entry) + if execution_time_us is None: + continue + + requests[trace_id].provider_times.execution = execution_time_us + found_metrics += 1 + + logging.info( + f"{label}: Received {total_entries} log entries, found time metrics for " + f"{found_metrics} out of {len(requests.keys())} invocations." + ) + + @staticmethod + def download_metrics( + project_name: str, + service_name: str, + start_time: int, + end_time: int, + metrics: Dict, + ) -> None: + """Download Cloud Run monitoring metrics.""" + available_metrics = [ + ("container/billable_instance_time", "delta", "double"), + ("container/instance_count", "gauge", "int64"), + ("container/max_request_concurrencies", "delta", "distribution"), + ("container/memory/utilizations", "delta", "distribution"), + ("container/cpu/utilizations", "delta", "distribution"), + ("container/cpu/allocation_time", "delta", "double"), + ("container/memory/allocation_time", "delta", "double"), + ("container/network/sent_bytes_count", "delta", "int64"), + ("container/network/received_bytes_count", "delta", "int64"), + ("container/startup_latencies", "delta", "distribution"), + ("request_count", "delta", "int64"), + ("request_latencies", "distribution", "distribution"), + ("request_latency/e2e_latencies", "delta", "distribution"), + ("request_latency/ingress_to_region", "delta", "distribution"), + ("request_latency/pending", "delta", "distribution"), + ("request_latency/response_egress", "delta", "distribution"), + ("request_latency/routing", "delta", "distribution"), + ("request_latency/user_execution", "delta", "distribution"), + ] + + client = monitoring_v3.MetricServiceClient() + project_path = client.common_project_path(project_name) + + _, end_time_seconds = math.modf(end_time) + _, start_time_seconds = math.modf(start_time) + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": int(end_time_seconds) + 300}, + "start_time": {"seconds": int(start_time_seconds)}, + } + ) + + for metric, kind, value_type in available_metrics: + metrics[metric] = [] + flt = ( + f'metric.type = "run.googleapis.com/{metric}" ' + f'AND resource.type = "cloud_run_revision" ' + f'AND resource.labels.service_name = "{service_name}"' + ) + list_request = monitoring_v3.ListTimeSeriesRequest( + name=project_path, + filter=flt, + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + for result in client.list_time_series(list_request): + revision = result.resource.labels.get("revision_name") + for point in result.points: + if value_type == "distribution": + sq_dev = point.value.distribution_value.sum_of_squared_deviation + metrics[metric].append( + { + "kind": kind, + "revision": revision, + "mean_value": point.value.distribution_value.mean, + "squared_deviations": sq_dev, + "count": point.value.distribution_value.count, + "ts": point.interval.end_time.timestamp(), + } + ) + else: + value: int | float + value = ( + point.value.int64_value + if value_type == "int64" + else point.value.double_value + ) + metrics[metric].append( + { + "revision": revision, + "value": value, + "kind": kind, + "ts": point.interval.end_time.timestamp(), + } + ) + + @staticmethod + def extract_trace_id(entry) -> Optional[str]: + """Extract the trace ID from a Cloud Run log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Trace ID if present, otherwise ``None``. + """ + trace = getattr(entry, "trace", None) + if not isinstance(trace, str) or "/traces/" not in trace: + return None + return trace.rsplit("/traces/", 1)[1] + + @staticmethod + def extract_latency_us(entry) -> Optional[int]: + """Extract request latency from a Cloud Run log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Request latency in microseconds, or ``None`` if unavailable. + """ + http_request = getattr(entry, "http_request", None) + if http_request is None: + return None + + latency = http_request.get("latency") + if not isinstance(latency, str): + return None + + try: + return int(float(latency[:-1]) * 1_000_000) + except (ValueError, TypeError): + return None + + class CloudFunctionGen1Strategy(DeploymentStrategy): """Deployment strategy for Google Cloud Functions Gen1.""" @@ -778,7 +959,7 @@ def download_metrics( interval = monitoring_v3.TimeInterval( { - "end_time": {"seconds": int(end_time_seconds) + 60}, + "end_time": {"seconds": int(end_time_seconds) + 300}, "start_time": {"seconds": int(start_time_seconds)}, } ) @@ -786,23 +967,27 @@ def download_metrics( for metric in available_metrics: metrics[metric] = [] + flt = ( + f'metric.type = "cloudfunctions.googleapis.com/function/{metric}" ' + f'AND resource.type = "cloud_function" ' + f'AND resource.labels.function_name = "{function_name}"' + ) list_request = monitoring_v3.ListTimeSeriesRequest( name=project_name, - filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric), + filter=flt, interval=interval, ) results = client.list_time_series(list_request) for result in results: - if result.resource.labels.get("function_name") == function_name: - for point in result.points: - metrics[metric] += [ - { - "mean_value": point.value.distribution_value.mean, - "executions_count": point.value.distribution_value.count, - } - ] + for point in result.points: + metrics[metric] += [ + { + "mean_value": point.value.distribution_value.mean, + "executions_count": point.value.distribution_value.count, + } + ] @staticmethod def _extract_trace_id(entry) -> Optional[str]: @@ -1429,174 +1614,521 @@ def download_execution_metrics( requests: Dict, ) -> None: """Download execution times for Cloud Run from request logs.""" - import google.cloud.logging as gcp_logging - service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_execution_metrics( + self.logging, + self.config.project_name, + service_name, + start_time, + end_time, + requests, + "GCP Cloud Run", + ) - timestamps = [] - for timestamp in [start_time, end_time + 1]: - utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) - timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """Download monitoring metrics for a Cloud Run service. - logging_client = gcp_logging.Client() - entries = logging_client.list_entries( - filter_=( - 'resource.type = "cloud_run_revision" ' - 'logName = "projects/' - f'{self.config.project_name}/logs/run.googleapis.com%2Frequests" ' - f'resource.labels.service_name = "{service_name}" ' - f'timestamp >= "{timestamps[0]}" ' - f'timestamp <= "{timestamps[1]}"' - ), - page_size=1000, + Args: + function_name: Name of the deployed service. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + metrics: Dictionary to populate with monitoring samples. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_metrics( + self.config.project_name, service_name, start_time, end_time, metrics ) - found_metrics = 0 - total_entries = 0 - for entry in entries: - total_entries += 1 - trace_id = self._extract_trace_id(entry) - if trace_id is None or trace_id not in requests: - continue - execution_time_us = self._extract_latency_us(entry) - if execution_time_us is None: - continue +class CloudFunctionGen2Strategy(DeploymentStrategy): + """Deployment strategy for Google Cloud Functions Gen2 package deployments.""" - requests[trace_id].provider_times.execution = execution_time_us - found_metrics += 1 + def __init__(self, config: GCPConfig, logging_handlers: ColoredWrapper): + """Initialize the Gen2 deployment strategy. - self.logging.info( - f"GCP Cloud Run: Received {total_entries} log entries, found time metrics for " - f"{found_metrics} out of {len(requests.keys())} invocations." - ) + Args: + config: GCP deployment configuration. + logging_handlers: Logging wrapper used for status reporting. + """ + self.config = config + self.logging = logging_handlers + self.function_client = build("cloudfunctions", "v2", cache_discovery=False) + self.run_client = build("run", "v2", cache_discovery=False) - def download_metrics( - self, function_name: str, start_time: int, end_time: int, metrics: Dict - ) -> None: + @staticmethod + def get_full_function_name(project_name: str, location: str, func_name: str) -> str: + """Build the fully qualified Cloud Functions Gen2 resource name. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. + + Returns: + Fully qualified Cloud Functions Gen2 resource name. """ - Download monitoring metrics for Cloud Functions Gen1. - Use metrics to find estimated values for maximum memory used, active instances - and network traffic. - https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions + return f"projects/{project_name}/locations/{location}/functions/{func_name}" + + def function_exists(self, project_name: str, location: str, func_name: str) -> Any: + """Check whether the Cloud Functions Gen2 resource exists. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. + + Returns: + True if the function exists, otherwise False. """ - # (metric_path, kind) — kind is "distribution" or "int64" - available_metrics = [ - ("container/billable_instance_time", "delta", "double"), # seconds - ("container/instance_count", "gauge", "int64"), - ("container/max_request_concurrencies", "delta", "distribution"), - ("container/memory/utilizations", "delta", "distribution"), # fraction - ("container/cpu/utilizations", "delta", "distribution"), # fraction - ("container/cpu/allocation_time", "delta", "double"), # seconds - ("container/memory/allocation_time", "delta", "double"), # gigabyte-seconds - ("container/network/sent_bytes_count", "delta", "int64"), # bytes (delta) - ("container/network/received_bytes_count", "delta", "int64"), # bytes (delta) - ("container/startup_latencies", "delta", "distribution"), # ms, cold start - ("request_count", "delta", "int64"), - ("request_latencies", "distribution", "distribution"), # ms - ("request_latency/e2e_latencies", "delta", "distribution"), # ms - ("request_latency/ingress_to_region", "delta", "distribution"), # ms - ("request_latency/pending", "delta", "distribution"), # ms - ("request_latency/response_egress", "delta", "distribution"), # ms - ("request_latency/routing", "delta", "distribution"), # ms - ("request_latency/user_execution", "delta", "distribution"), # ms - ] + full_resource_name = self.get_full_function_name(project_name, location, func_name) + get_req = ( + self.function_client.projects().locations().functions().get(name=full_resource_name) + ) + try: + self._execute_with_retry(self.logging, get_req) + return True + except HttpError as e: + if e.resp.status == 404: + return False + raise RuntimeError(f"Error checking function existence: {e}") from None - client = monitoring_v3.MetricServiceClient() - project_name = client.common_project_path(self.config.project_name) + def _entry_point(self, code_package: Benchmark) -> str: + """Resolve the runtime entry point for a benchmark package. - _, end_time_seconds = math.modf(end_time) - _, start_time_seconds = math.modf(start_time) - interval = monitoring_v3.TimeInterval( - { - # some metrics are reported with a delay - "end_time": {"seconds": int(end_time_seconds) + 300}, - "start_time": {"seconds": int(start_time_seconds)}, - } + Args: + code_package: Benchmark package being deployed. + + Returns: + Entry point name expected by the deployed runtime. + """ + return ( + "org.serverlessbench.Handler" if code_package.language == Language.JAVA else "handler" ) - for metric, kind, value_type in available_metrics: - metrics[metric] = [] - # Filter on resource.type AND service_name server-side — much faster than - # pulling every revision in the project and filtering client-side. - flt = ( - f'metric.type = "run.googleapis.com/{metric}" ' - f'AND resource.type = "cloud_run_revision" ' - f'AND resource.labels.service_name = "{function_name}"' - ) - list_request = monitoring_v3.ListTimeSeriesRequest( - name=project_name, - filter=flt, - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + def _runtime(self, code_package: Benchmark) -> str: + """Resolve the Cloud Functions Gen2 runtime identifier. + + Args: + code_package: Benchmark package being deployed. + + Returns: + Runtime identifier string for the GCP API. + """ + return code_package.language_name + code_package.language_version.replace(".", "") + + def _service_config( + self, benchmark_config: BenchmarkConfig | FunctionConfig, envs: Dict + ) -> Dict: + """Build the Gen2 service configuration payload. + + Args: + benchmark_config: Benchmark or function configuration with memory and timeout. + envs: Environment variables to configure on the service. + + Returns: + Service configuration payload for Cloud Functions Gen2. + """ + dep_config = self.config.deployment_config.function_gen2_config + return { + "availableMemory": f"{benchmark_config.memory}Mi", + "timeoutSeconds": benchmark_config.timeout, + "environmentVariables": envs, + "minInstanceCount": dep_config.min_instances, + "maxInstanceCount": dep_config.max_instances, + "availableCpu": str(dep_config.vcpus), + "maxInstanceRequestConcurrency": dep_config.gcp_concurrency, + "ingressSettings": "ALLOW_ALL", + "allTrafficOnLatestRevision": True, + } + + def _build_body( + self, + func_name: str, + code_package: Benchmark, + envs: Dict, + storage_source: Dict, + ) -> Dict: + """Build the full Cloud Functions Gen2 create or patch payload. + + Args: + func_name: Function name. + code_package: Benchmark package being deployed. + envs: Environment variables for the service. + storage_source: Uploaded source archive descriptor. + + Returns: + Full function resource payload. + """ + return { + "name": self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ), + "buildConfig": { + "runtime": self._runtime(code_package), + "entryPoint": self._entry_point(code_package), + "source": {"storageSource": storage_source}, + }, + "serviceConfig": self._service_config(code_package.benchmark_config, envs), + } + + def _generate_upload_url(self) -> Dict: + """Request a signed upload URL for a Gen2 source archive. + + Returns: + Upload metadata returned by the Cloud Functions Gen2 API. + """ + parent = f"projects/{self.config.project_name}/locations/{self.config.region}" + req = ( + self.function_client.projects() + .locations() + .functions() + .generateUploadUrl(parent=parent, body={"environment": "GEN_2"}) + ) + return self._execute_with_retry(self.logging, req) + + def _upload_zip_archive(self, package_path: str) -> Dict: + """Upload a ZIP archive to the signed Gen2 source upload URL. + + Args: + package_path: Path to the ZIP archive to upload. + + Returns: + Storage source descriptor referencing the uploaded archive. + """ + upload_info = self._generate_upload_url() + with open(package_path, "rb") as package_fp: + request = urllib.request.Request( + upload_info["uploadUrl"], + data=package_fp.read(), + method="PUT", + headers={"Content-Type": "application/zip"}, ) - for result in client.list_time_series(list_request): - revision = result.resource.labels.get("revision_name") - for point in result.points: - if value_type == "distribution": - sq_dev = point.value.distribution_value.sum_of_squared_deviation - metrics[metric].append( - { - "kind": kind, - "revision": revision, - "mean_value": point.value.distribution_value.mean, - "squared_deviations": sq_dev, - "count": point.value.distribution_value.count, - "ts": point.interval.end_time.timestamp(), - } - ) - else: - value: int | float - if value_type == "int64": - value = point.value.int64_value - else: - value = point.value.double_value - metrics[metric].append( + with urllib.request.urlopen(request) as response: + if response.status not in (200, 201): + raise RuntimeError( + f"Upload of package archive failed with HTTP {response.status}" + ) + return cast(Dict, upload_info["storageSource"]) + + def create( + self, + func_name: str, + code_package: Benchmark, + function_cfg: FunctionConfig, + envs: Dict, + container_uri: str | None, + ) -> None: + """Create a new Cloud Functions Gen2 deployment. + + Args: + func_name: Function name to create. + code_package: Benchmark package with code to deploy. + function_cfg: Function configuration. + envs: Environment variables for the function service. + container_uri: Unused for package deployments. + """ + if code_package.code_location is None: + raise RuntimeError("Code location is not set for GCP deployment") + + parent = f"projects/{self.config.project_name}/locations/{self.config.region}" + storage_source = self._upload_zip_archive(code_package.code_location) + function_body = self._build_body(func_name, code_package, envs, storage_source) + create_req = ( + self.function_client.projects() + .locations() + .functions() + .create( + parent=parent, functionId=func_name, body=function_body # type: ignore[arg-type] + ) + ) + self._operation_response = self._execute_with_retry(self.logging, create_req) + self.logging.info(f"Function {func_name} is creating through Cloud Functions Gen2") + + def update_code( + self, + function: GCPFunction, + code_package: Benchmark, + envs: Dict, + container_uri: str | None, + ) -> None: + """Update the code of an existing Cloud Functions Gen2 deployment. + + Args: + function: Existing deployed function. + code_package: New benchmark package to upload. + envs: Environment variables for the updated service. + container_uri: Unused for package deployments. + """ + if code_package.code_location is None: + raise RuntimeError("Code location is not set for GCP deployment") + + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + storage_source = self._upload_zip_archive(code_package.code_location) + function_body = self._build_body(function.name, code_package, envs, storage_source) + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + body=function_body, # type: ignore[arg-type] + updateMask="buildConfig.runtime,buildConfig.entryPoint," + "buildConfig.source.storageSource,serviceConfig", + ) + ) + self._operation_response = self._execute_with_retry(self.logging, req) + self.logging.info(f"Function {function.name} code update initiated for Gen2") + + def update_config(self, function: GCPFunction, envs: Dict) -> int: + """Update configuration of an existing Cloud Functions Gen2 deployment. + + Args: + function: Deployed function to update. + envs: Full environment variable map to apply. + + Returns: + Placeholder version value for interface compatibility. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + body = {"serviceConfig": self._service_config(function.config, envs)} + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + body=body, # type: ignore[arg-type] + updateMask="serviceConfig", + ) + ) + self._operation_response = self._execute_with_retry(self.logging, req) + self.wait_for_deployment(function.name) + return 0 + + def wait_for_deployment(self, func_name: str) -> None: + """Wait for the active create or patch operation to complete. + + Args: + func_name: Function name being deployed. + """ + if not hasattr(self, "_operation_response"): + raise RuntimeError("No operation to wait for - create/update not called") + + op_name = self._operation_response["name"] + begin = time.time() + while True: + op_req = self.function_client.projects().locations().operations().get(name=op_name) + op_res = self._execute_with_retry(self.logging, op_req) + if op_res.get("done"): + if "error" in op_res: + raise RuntimeError(f"Cloud Functions Gen2 deployment failed: {op_res['error']}") + break + if time.time() - begin > 600: + raise RuntimeError(f"Timeout waiting for Cloud Functions Gen2 operation {op_name}") + time.sleep(3) + + self._wait_for_active_status(func_name) + delattr(self, "_operation_response") + + def _wait_for_active_status(self, func_name: str, timeout: int = 300) -> None: + """Poll the Gen2 function until it reaches the ACTIVE state. + + Args: + func_name: Function name to monitor. + timeout: Maximum wait time in seconds. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + begin = time.time() + last_state: Optional[str] = None + while True: + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + state = func_details.get("state") + if state != last_state: + self.logging.info(f"Function {func_name} state: {state}") + last_state = cast(Optional[str], state) + + if state == "ACTIVE" and func_details.get("serviceConfig", {}).get("uri"): + return + if state in ("FAILED", "UNKNOWN"): + raise RuntimeError( + f"Function {func_name} deployment failed with state {state}: " + f"{func_details.get('stateMessages', [])}" + ) + if time.time() - begin > timeout: + raise RuntimeError( + "Timeout waiting for function " + f"{func_name} to become ACTIVE. Last state: {state}" + ) + time.sleep(3) + + def allow_public_access(self, project_name: str, location: str, func_name: str) -> None: + """Grant public invocation access to the underlying Cloud Run service. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name whose backing service should be public. + """ + service_name = func_name.replace("_", "-").lower() + full_service_name = f"projects/{project_name}/locations/{location}/services/{service_name}" + req = ( + self.run_client.projects() + .locations() + .services() + .setIamPolicy( + resource=full_service_name, + body={ + "policy": { + "bindings": [ { - "revision": revision, - "value": value, - "kind": kind, - "ts": point.interval.end_time.timestamp(), + "role": "roles/run.invoker", + "members": ["allUsers"], } - ) + ] + } + }, + ) + ) + self._execute_with_retry(self.logging, req) - @staticmethod - def _extract_trace_id(entry) -> Optional[str]: - """Extract the trace ID from a Cloud Run log entry. + def create_trigger(self, func_name: str) -> str: + """Return the HTTPS trigger URL for a Gen2 function. Args: - entry: Log entry to inspect. + func_name: Function name. Returns: - Trace ID if present, otherwise ``None``. + Public invoke URL for the function. """ - trace = getattr(entry, "trace", None) - if not isinstance(trace, str) or "/traces/" not in trace: - return None - return trace.rsplit("/traces/", 1)[1] + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + invoke_url = func_details["serviceConfig"]["uri"] + self.logging.info(f"Function {func_name} - HTTP trigger URL: {invoke_url}") + return invoke_url - @staticmethod - def _extract_latency_us(entry) -> Optional[int]: - """Extract request latency from a Cloud Run log entry in microseconds. + def update_envs(self, full_function_name: str, envs: Dict) -> Dict: + """Merge new environment variables with existing Gen2 service variables. Args: - entry: Log entry to inspect. + full_function_name: Fully qualified function name. + envs: New environment variables to add or override. Returns: - Request latency in microseconds, or ``None`` if unavailable. + Merged environment variables dictionary. """ - http_request = getattr(entry, "http_request", None) - if http_request is None: - return None + req = self.function_client.projects().locations().functions().get(name=full_function_name) + response = self._execute_with_retry(self.logging, req) + existing_envs = response.get("serviceConfig", {}).get("environmentVariables", {}) + return {**existing_envs, **envs} - latency = http_request.get("latency") - if not isinstance(latency, str): - return None + def generate_runtime_envs(self) -> Dict: + """Return runtime environment overrides for Gen2 package deployments. + + Returns: + Environment variables controlling Gunicorn worker settings. + """ + dep_config = self.config.deployment_config.function_gen2_config + # gen2 uses its own environment variables + # https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241 + return { + "WORKERS": str(dep_config.worker_concurrency), + "THREADS": str(dep_config.worker_threads), + } + + def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: + """Check whether a Gen2 function is deployed and ready. + + Args: + func_name: Function name to inspect. + versionId: Unused for Gen2 deployments. + Returns: + Tuple of readiness flag and placeholder version value. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) try: - return int(float(latency[:-1]) * 1_000_000) - except (ValueError, TypeError): - return None + req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, req) + is_ready = func_details.get("state") == "ACTIVE" and "uri" in func_details.get( + "serviceConfig", {} + ) + return (is_ready, 0) + except HttpError: + return (False, -1) + + def delete_function(self, func_name: str) -> None: + """Delete a Cloud Functions Gen2 deployment. + + Args: + func_name: Function name to delete. + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + try: + req = ( + self.function_client.projects().locations().functions().delete(name=full_func_name) + ) + self._execute_with_retry(self.logging, req) + except HttpError as e: + if e.resp.status != 404: + raise + + def download_execution_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict, + ) -> None: + """Download execution timings for a Gen2 function from request logs. + + Args: + function_name: Function name whose backing service is queried. + start_time: Start timestamp for log collection. + end_time: End timestamp for log collection. + requests: Invocation results keyed by request ID. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_execution_metrics( + self.logging, + self.config.project_name, + service_name, + start_time, + end_time, + requests, + "GCP Cloud Functions Gen2", + ) + + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """Download monitoring metrics for a Gen2 function. + + Args: + function_name: Function name whose backing service is queried. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + metrics: Dictionary to populate with monitoring samples. + """ + service_name = function_name.replace("_", "-").lower() + CloudRunMetricsHelper.download_metrics( + self.config.project_name, service_name, start_time, end_time, metrics + ) class GCP(System): @@ -1703,6 +2235,7 @@ def initialize( self.cloud_function_gen1_strategy = CloudFunctionGen1Strategy( storage, self._config, self.logging ) + self.cloud_function_gen2_strategy = CloudFunctionGen2Strategy(self._config, self.logging) self.run_container_strategy = RunContainerStrategy(self._config, self.logging) self.gcr_client = GCRContainer(self.system_config, self.config, self.docker_client) @@ -1732,6 +2265,48 @@ def get_run_client(self): """ return self.run_container_strategy.run_client + def _resolve_deployment_type(self, system_variant: SystemVariant) -> FunctionDeploymentType: + """Resolve the effective GCP deployment type for a benchmark. + + Args: + system_variant: Experiment deployment variant. + + Returns: + Effective deployment type for GCP. + """ + return FunctionDeploymentType.deserialize(system_variant.value) + + def system_variant_suffix(self, system_variant: SystemVariant) -> Optional[str]: + """Return a provider-local system variant suffix for GCP package variants. + + Args: + system_variant: Selected deployment variant. + + Returns: + Short suffix for GCP package variants, otherwise ``None``. + """ + deployment_type = self._resolve_deployment_type(system_variant) + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + return "gen1" + if deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + return "gen2" + return None + + def _strategy_for_deployment_type(self, deployment_type: FunctionDeploymentType): + """Return the deployment strategy for a resolved GCP deployment type. + + Args: + deployment_type: Effective deployment type to dispatch. + + Returns: + Deployment strategy object handling the requested mode. + """ + if deployment_type == FunctionDeploymentType.CONTAINER: + return self.run_container_strategy + if deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + return self.cloud_function_gen2_strategy + return self.cloud_function_gen1_strategy + def _get_deployment_config( self, deployment_type: FunctionDeploymentType ) -> Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig]: @@ -1769,6 +2344,15 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma # Check if deployment config has changed cached_function = cast(GCPFunction, cached_function) + expected_deployment_type = self._resolve_deployment_type(benchmark.system_variant) + if cached_function.deployment_type != expected_deployment_type: + self.logging.info( + f"Deployment type has changed for {cached_function.name}: " + f"cached function uses {cached_function.deployment_type.value}, " + f"requested deployment is {expected_deployment_type.value}." + ) + changed = True + current_dep_config = self._get_deployment_config(cached_function.deployment_type) if cached_function.deployment_config != current_dep_config: @@ -1780,6 +2364,29 @@ def is_configuration_changed(self, cached_function: Function, benchmark: Benchma return changed + def can_reuse_cached_function( + self, cached_function: Function, benchmark: Benchmark + ) -> Optional[str]: + """Check whether a cached GCP function matches the requested deployment mode. + + Args: + cached_function: Cached function selected from SeBS cache. + benchmark: Benchmark requesting the function. + + Returns: + str: if the cached function does not fit the requested deployment type. + """ + gcp_function = cast(GCPFunction, cached_function) + expected_deployment_type = self._resolve_deployment_type(benchmark.system_variant) + + if gcp_function.deployment_type != expected_deployment_type: + return ( + f"cached deployment type {gcp_function.deployment_type.value} " + f"does not match requested deployment type {expected_deployment_type.value}" + ) + + return None + def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: @@ -1807,11 +2414,16 @@ def default_function_name( code_package.language_version, code_package.architecture, ) - if code_package.container_deployment: + deployment_type = self._resolve_deployment_type(code_package.system_variant) + if deployment_type == FunctionDeploymentType.CONTAINER: func_name = f"{func_name}-docker" + elif deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + func_name = f"{func_name}-gen1" + elif deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + func_name = f"{func_name}-gen2" return ( GCP.format_function_name(func_name) - if not code_package.container_deployment + if deployment_type != FunctionDeploymentType.CONTAINER else func_name.replace(".", "-") ) @@ -1843,7 +2455,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package benchmark code for GCP Cloud Functions deployment. Transforms the benchmark code directory structure to meet GCP Cloud Functions @@ -1933,7 +2545,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> GCPFunction: """Create a new GCP Cloud Function or update existing one. @@ -1946,14 +2558,14 @@ def create_function( Args: code_package: Benchmark package with code and configuration func_name: Name for the Cloud Function - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container image URI (unused for GCP) Returns: GCPFunction instance representing the deployed function Raises: - NotImplementedError: If container_deployment is True + NotImplementedError: If the deployment variant is unsupported RuntimeError: If function creation or IAM configuration fails """ @@ -1968,22 +2580,12 @@ def create_function( if architecture == "arm64": raise RuntimeError("GCP does not support arm64 deployments") - # Select deployment strategy - strategy = ( - self.run_container_strategy - if container_deployment - else self.cloud_function_gen1_strategy - ) + deployment_type = self._resolve_deployment_type(system_variant) + strategy = self._strategy_for_deployment_type(deployment_type) # Check if function/service already exists function_exists = strategy.function_exists(project_name, location, func_name) - deployment_type = ( - FunctionDeploymentType.CONTAINER - if container_deployment - else FunctionDeploymentType.FUNCTION_GEN1 - ) - dep_config = self._get_deployment_config(deployment_type) if not function_exists: # Create new function/service @@ -1998,7 +2600,7 @@ def create_function( strategy.wait_for_deployment(func_name) strategy.allow_public_access(project_name, location, func_name) - if not container_deployment: + if not deployment_type.is_container: storage_client = self._system_resources.get_storage() code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) function = GCPFunction( @@ -2038,11 +2640,11 @@ def create_function( ) strategy.allow_public_access(project_name, location, func_name) - self.update_function(function, code_package, container_deployment, container_uri) + self.update_function(function, code_package, system_variant, container_uri) # Add LibraryTrigger to a new function # Not supported on containers - if not container_deployment: + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: from sebs.gcp.triggers import LibraryTrigger trigger = LibraryTrigger(func_name, self, function.deployment_type) @@ -2077,11 +2679,7 @@ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) self.logging.info(f"Function {function.name} - waiting for deployment...") # Select deployment strategy - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) # Get trigger URL from strategy invoke_url = strategy.create_trigger(function.name) @@ -2120,7 +2718,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update an existing Cloud Function with new code and configuration. @@ -2132,22 +2730,18 @@ def update_function( Args: function: Existing function instance to update code_package: New benchmark package with updated code - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container image URI (unused) Raises: - NotImplementedError: If container_deployment is True + NotImplementedError: If the deployment variant is unsupported RuntimeError: If function update fails after maximum retries """ function = cast(GCPFunction, function) # Select deployment strategy - strategy = ( - self.run_container_strategy - if container_deployment - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) # Generate environment variables envs = { @@ -2157,7 +2751,7 @@ def update_function( # Update code using strategy strategy.update_code(function, code_package, envs, container_uri) - if container_deployment: + if system_variant.is_container: function.set_container_uri(container_uri) strategy.wait_for_deployment(function.name) @@ -2212,11 +2806,7 @@ def update_function_configuration( function = cast(GCPFunction, function) # Select deployment strategy - strategy = ( - self.run_container_strategy - if code_package.container_deployment - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) # Get full resource name for env merging full_func_name = strategy.get_full_function_name( @@ -2252,11 +2842,7 @@ def delete_function(self, func_name: str, function: Dict) -> None: # Select deployment strategy based on function name # v1 functions don't allow hyphens, new functions don't allow underscores gcp_function = GCPFunction.deserialize(function) - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) strategy.delete_function(func_name) @@ -2296,11 +2882,7 @@ def download_metrics( function = GCPFunction.deserialize(functions[function_name]) - strategy = ( - self.run_container_strategy - if function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(function.deployment_type) strategy.download_execution_metrics(function_name, start_time, end_time, requests) strategy.download_metrics(function_name, start_time, end_time, metrics) @@ -2414,11 +2996,7 @@ def is_deployed(self, function: Function, versionId: int = -1) -> Tuple[bool, in # Select deployment strategy based on function name # v1 functions don't allow hyphens, new functions don't allow underscores gcp_function = cast(GCPFunction, function) - strategy = ( - self.run_container_strategy - if gcp_function.deployment_type.is_container - else self.cloud_function_gen1_strategy - ) + strategy = self._strategy_for_deployment_type(gcp_function.deployment_type) return strategy.is_deployed(function.name, versionId) diff --git a/sebs/local/local.py b/sebs/local/local.py index 302cc95c..99c03c7a 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -29,6 +29,7 @@ from sebs.cache import Cache from sebs.config import SeBSConfig +from sebs.experiments.config import SystemVariant from sebs.storage.resources import SelfHostedSystemResources from sebs.utils import LoggingHandlers, is_linux from sebs.local.config import LocalConfig @@ -183,7 +184,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """Package function code for local execution. Creates a compatible code package structure for local execution that @@ -431,7 +432,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "LocalFunction": """Create a new function deployment. In practice, it starts a new Docker container. @@ -439,7 +440,7 @@ def create_function( Args: code_package: Benchmark code package to deploy func_name: Name for the function - container_deployment: Whether to use container deployment (unsupported) + system_variant: Selected deployment variant container_uri: Container URI (unused for local) Returns: @@ -448,7 +449,7 @@ def create_function( Raises: NotImplementedError: If container deployment is requested """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Local") return self._start_container(code_package, func_name, None) @@ -456,7 +457,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """Update an existing function with new code. @@ -466,10 +467,10 @@ def update_function( Args: function: Existing function to update code_package: New benchmark code package - container_deployment: Whether to use container deployment (unused) + system_variant: Selected deployment variant container_uri: Container URI (unused) """ - if container_deployment: + if system_variant.is_container: raise NotImplementedError("Container deployment is not supported in Local") func = cast(LocalFunction, function) diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 320510d7..7ed42953 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -16,6 +16,7 @@ from sebs.benchmark import Benchmark from sebs.cache import Cache +from sebs.experiments.config import SystemVariant from sebs.faas import System from sebs.faas.function import Function, ExecutionResult, Trigger from sebs.openwhisk.container import OpenWhiskContainer @@ -204,7 +205,7 @@ def package_code( architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + ) -> Tuple[str, float]: """ Package benchmark code for OpenWhisk deployment. @@ -307,7 +308,7 @@ def package_code( def finalize_container_build( self, - ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, int]] | None: + ) -> Callable[[str, Language, str, str, str, bool], Tuple[str, float]] | None: """ Regardless of Docker image status, we need to create .zip file to allow registration of function with OpenWhisk. @@ -368,7 +369,7 @@ def create_function( self, code_package: Benchmark, func_name: str, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> "OpenWhiskFunction": """ @@ -381,7 +382,7 @@ def create_function( Args: code_package: Benchmark configuration and code package func_name: Name for the OpenWhisk action - container_deployment: Whether to use container-based deployment + system_variant: Selected deployment variant container_uri: URI of the Docker image for the function Returns: @@ -391,7 +392,7 @@ def create_function( RuntimeError: If WSK CLI is not accessible or function creation fails """ - if not container_deployment: + if not system_variant.is_container: raise RuntimeError("Non-container deployment is not supported in OpenWhisk!") self.logging.info("Creating function as an action in OpenWhisk.") @@ -425,7 +426,7 @@ def create_function( ) # Update function - we don't know what version is stored self.logging.info(f"Retrieved existing OpenWhisk action {func_name}.") - self.update_function(res, code_package, container_deployment, container_uri) + self.update_function(res, code_package, system_variant, container_uri) else: try: self.logging.info(f"Creating new OpenWhisk action {func_name}") @@ -502,7 +503,7 @@ def update_function( self, function: Function, code_package: Benchmark, - container_deployment: bool, + system_variant: SystemVariant, container_uri: str | None, ) -> None: """ @@ -511,13 +512,13 @@ def update_function( Args: function: Existing function to update code_package: New benchmark configuration and code package - container_deployment: Whether to use container-based deployment + system_variant: Selected deployment variant container_uri: URI of the new Docker image Raises: RuntimeError: If WSK CLI is not accessible or update fails """ - if not container_deployment: + if not system_variant.is_container: raise RuntimeError( "Code location must be set for OpenWhisk action! " "OpenWhisk requires container deployment with a code package." diff --git a/sebs/regression.py b/sebs/regression.py index 3dd62b26..58920c79 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -25,7 +25,7 @@ import testtools import threading from time import sleep -from typing import cast, Dict, Optional, Set, TYPE_CHECKING +from typing import cast, Dict, List, Optional, Set, TYPE_CHECKING from sebs.faas.function import Trigger from sebs.utils import ColoredWrapper, SensitiveDataFilter, LoggingBase @@ -74,7 +74,7 @@ # GCP-specific configurations architectures_gcp = ["x64"] -deployments_gcp = ["package", "container"] +deployments_gcp = ["function-gen1", "function-gen2", "container"] # Azure-specific configurations architectures_azure = ["x64"] @@ -92,6 +92,30 @@ LOGGING_REDACTOR: SensitiveDataFilter = SensitiveDataFilter() +def configure_regression_deployment( + config_copy: dict, deployment_name: str, deployment_type: str +) -> str: + """Inject provider-local deployment configuration for a regression variant.""" + config_copy["experiments"]["system_variant"] = deployment_type + return deployment_type + + +def execution_group_key(test: unittest.TestCase) -> tuple[str, str]: + """Return the execution group for a test case. + + GCP variants are split into separate groups so they do not execute at the same + time. All other providers share a single provider-wide group and may still run + concurrently with each other. + """ + deployment_name = test.deployment_name # type: ignore[attr-defined] + test_name = cast(unittest.TestCase, test)._testMethodName + deployment_type = getattr(test, test_name).test_deployment_type # type: ignore[attr-defined] + + if deployment_name == "gcp": + return deployment_name, deployment_type + return deployment_name, "all" + + class TestSequenceMeta(type): """Metaclass for dynamically generating regression test cases. @@ -196,7 +220,7 @@ def test(self): # Configure experiment settings self.experiment_config["architecture"] = architecture - self.experiment_config["container_deployment"] = deployment_type == "container" + self.experiment_config["system_variant"] = deployment_type # Get deployment client for the specific cloud provider deployment_client = self.get_deployment( @@ -352,7 +376,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create a log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -415,7 +439,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create a log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -470,7 +494,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" deployment_client = self.client.get_deployment( @@ -530,7 +554,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" deployment_client = self.client.get_deployment( @@ -616,7 +640,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -702,7 +726,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -781,7 +805,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name and get deployment client f = f"regression_{deployment_name}_{benchmark_name}_" @@ -844,7 +868,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -907,7 +931,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -970,7 +994,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + configure_regression_deployment(config_copy, deployment_name, deployment_type) # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1037,7 +1061,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1095,7 +1119,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1149,7 +1173,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) config_copy["experiments"]["architecture"] = architecture - config_copy["experiments"]["container_deployment"] = deployment_type == "container" + config_copy["experiments"]["system_variant"] = deployment_type # Create log file name based on test parameters f = f"regression_{deployment_name}_{benchmark_name}_{architecture}_{deployment_type}.log" @@ -1280,7 +1304,7 @@ def filter_out_benchmarks( # Filter out image recognition on newer Python versions on GCP if (deployment_name == "gcp" and language == "python" and language_version in ["3.8", "3.9", "3.10", "3.11", "3.12"] - and deployment_type == "package"): + and deployment_type == "function-gen1"): return "411.image-recognition" not in benchmark # fmt: on @@ -1394,12 +1418,13 @@ def regression_suite( ) # Prepare the list of tests to run - tests = [] + tests: List[unittest.TestCase] = [] # mypy is confused here about the type for case in suite: for test in case: # type: ignore # Get the test method name - test_name = cast(unittest.TestCase, test)._testMethodName + test_case = cast(unittest.TestCase, test) + test_name = test_case._testMethodName # Remove unsupported benchmarks test_architecture = getattr(test, test_name).test_architecture # type: ignore @@ -1421,17 +1446,38 @@ def regression_suite( # Set up test instance with client and config test.client = sebs_client # type: ignore test.experiment_config = experiment_config.copy() # type: ignore - tests.append(test) + tests.append(test_case) else: print(f"Skip test {test_name}") - # Create a concurrent test suite for parallel execution - concurrent_suite = testtools.ConcurrentStreamTestSuite(lambda: ((test, None) for test in tests)) result = TracingStreamResult() # Run the tests result.startTestRun() - concurrent_suite.run(result) + gcp_grouped_tests: Dict[tuple[str, str], list] = {} + other_tests = [] + for test in tests: + group_key = execution_group_key(test) + if group_key[0] == "gcp": + gcp_grouped_tests.setdefault(group_key, []).append(test) + else: + other_tests.append(test) + + if other_tests: + concurrent_suite = testtools.ConcurrentStreamTestSuite( + lambda: ((test, None) for test in other_tests) + ) + concurrent_suite.run(result) + + for (_, deployment_type), group_tests in sorted(gcp_grouped_tests.items()): + print( + f"Running regression group provider=gcp, " + f"system_variant={deployment_type}, tests={len(group_tests)}" + ) + concurrent_suite = testtools.ConcurrentStreamTestSuite( + lambda group_tests=group_tests: ((test, None) for test in group_tests) # type: ignore + ) + concurrent_suite.run(result) result.stopTestRun() # Report results diff --git a/sebs/sebs.py b/sebs/sebs.py index 5d40b103..d99aac4c 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -227,17 +227,12 @@ def get_deployment( ) ) - # Validate deployment type - container - if config["experiments"][ - "container_deployment" - ] and not self._config.supported_container_deployment(name): - raise RuntimeError(f"Container deployment is not supported in {name}.") - - # Validate deployment type - package - if not config["experiments"][ - "container_deployment" - ] and not self._config.supported_package_deployment(name): - raise RuntimeError(f"Code package deployment is not supported in {name}.") + if "system_variant" not in config["experiments"]: + config["experiments"]["system_variant"] = self._config.default_system_variant(name) + + selected_variant = config["experiments"]["system_variant"] + if selected_variant not in self._config.supported_system_variants(name): + raise RuntimeError(f"System variant {selected_variant} is not supported in {name}.") # Set up logging and create deployment configuration handlers = self.generate_logging_handlers(logging_filename) @@ -368,6 +363,7 @@ def get_benchmark( self._output_dir, self.cache_client, self.docker_client, + deployment.system_variant_suffix(config.system_variant), self.verbose, )