diff --git a/README.md b/README.md index cfb12bf4..ad710bba 100644 --- a/README.md +++ b/README.md @@ -242,4 +242,5 @@ However, they can be missing some of the features from the `master` branch. * [Alexander Schlieper (ETH Zurich)](https://github.com/xSurus) - improved support for Java benchmarks. * [Laurin Jahns (ETH Zurich)](https://github.com/userlaurin) - support for language variants. * [Sharayu Rasal](https://github.com/Sharayu1418) - help with function URLs on AWS. -* [Livio D'Agostini](https://github.com/ldzgch) - new implementations of benchmarks in Node.js. +* [Livio D'Agostini (ETH Zurich)](https://github.com/ldzgch) - new implementations of benchmarks in Node.js. +* [toooadi (ETH Zurich)](https://github.com/toooadi) - container support for Google Cloud. diff --git a/benchmarks/wrappers/gcp/java/src/main/java/org/serverlessbench/Handler.java b/benchmarks/wrappers/gcp/java/src/main/java/org/serverlessbench/Handler.java index b118b37f..2fa8a3ed 100644 --- a/benchmarks/wrappers/gcp/java/src/main/java/org/serverlessbench/Handler.java +++ b/benchmarks/wrappers/gcp/java/src/main/java/org/serverlessbench/Handler.java @@ -17,12 +17,17 @@ public class Handler implements HttpFunction { private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String FUNCTION_EXECUTION_ID_HEADER = "Function-Execution-Id"; + private static final String TRACE_CONTEXT_HEADER = "X-Cloud-Trace-Context"; @Override public void service(HttpRequest request, HttpResponse response) throws IOException { Instant beginTs = Instant.now(); + String requestId = request.getFirstHeader(TRACE_CONTEXT_HEADER) + .or(() -> request.getFirstHeader(FUNCTION_EXECUTION_ID_HEADER)) + .orElse(""); // Normalize request from GCP HTTP format Map normalized = normalizeRequest(request); @@ -51,7 +56,7 @@ public void service(HttpRequest request, HttpResponse response) body.put("is_cold", ColdStartTracker.isCold()); body.put("container_id", containerId); body.put("cold_start_var", coldStartVar); - body.put("request_id", request.getFirstHeader("Function-Execution-Id").orElse("")); + body.put("request_id", requestId); // Write JSON response response.setContentType("application/json"); diff --git a/benchmarks/wrappers/gcp/nodejs/handler.js b/benchmarks/wrappers/gcp/nodejs/handler.js index de455097..a4fcb37c 100644 --- a/benchmarks/wrappers/gcp/nodejs/handler.js +++ b/benchmarks/wrappers/gcp/nodejs/handler.js @@ -10,6 +10,7 @@ if('NOSQL_STORAGE_DATABASE' in process.env) { exports.handler = async function(req, res) { var begin = Date.now()/1000; var start = process.hrtime(); + var requestId = req.headers["x-cloud-trace-context"] || req.headers["function-execution-id"]; var func = require('./function/function') var ret = func.handler(req.body); return ret.then( @@ -32,7 +33,7 @@ exports.handler = async function(req, res) { results_time: 0, result: result, is_cold: is_cold, - request_id: req.headers["function-execution-id"] + request_id: requestId }); }, (error) => { diff --git a/benchmarks/wrappers/gcp/python/handler.py b/benchmarks/wrappers/gcp/python/handler.py index 59199764..dabc82a5 100644 --- a/benchmarks/wrappers/gcp/python/handler.py +++ b/benchmarks/wrappers/gcp/python/handler.py @@ -11,11 +11,9 @@ os.environ['NOSQL_STORAGE_DATABASE'] ) - def handler(req): income_timestamp = datetime.datetime.now().timestamp() - req_id = req.headers.get('Function-Execution-Id') - + req_id = req.headers.get('X-Cloud-Trace-Context') or req.headers.get('Function-Execution-Id') req_json = req.get_json() req_json['request-id'] = req_id diff --git a/benchmarks/wrappers/gcp/python/setup.py b/benchmarks/wrappers/gcp/python/setup.py new file mode 100644 index 00000000..8a89dc5b --- /dev/null +++ b/benchmarks/wrappers/gcp/python/setup.py @@ -0,0 +1,9 @@ +from distutils.core import setup +from glob import glob + +setup( + name='function', + packages=['function'], + package_dir={'function': '.'}, + package_data={'function': glob('**', recursive=True)}, +) diff --git a/configs/cpp.json b/configs/cpp.json index 32d15863..e4715b6f 100644 --- a/configs/cpp.json +++ b/configs/cpp.json @@ -66,7 +66,34 @@ "gcp": { "region": "europe-west1", "project_name": "", - "credentials": "" + "credentials": "", + "configuration": { + "function-gen1": { + "min-instances": 0, + "max-instances": 20 + }, + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + }, + "container": { + "environment": "gen2", + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } }, "local": { "storage": { diff --git a/configs/java.json b/configs/java.json index d74f7378..5727d06a 100644 --- a/configs/java.json +++ b/configs/java.json @@ -58,7 +58,34 @@ "gcp": { "region": "europe-west1", "project_name": "", - "credentials": "" + "credentials": "", + "configuration": { + "function-gen1": { + "min-instances": 0, + "max-instances": 20 + }, + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + }, + "container": { + "environment": "gen2", + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } }, "local": { "storage": { diff --git a/configs/nodejs.json b/configs/nodejs.json index 151a9c8e..8bdbefe1 100644 --- a/configs/nodejs.json +++ b/configs/nodejs.json @@ -66,7 +66,34 @@ "gcp": { "region": "europe-west1", "project_name": "", - "credentials": "" + "credentials": "", + "configuration": { + "function-gen1": { + "min-instances": 0, + "max-instances": 20 + }, + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + }, + "container": { + "environment": "gen2", + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } }, "local": { "storage": { diff --git a/configs/python.json b/configs/python.json index 076bd705..f184d235 100644 --- a/configs/python.json +++ b/configs/python.json @@ -66,7 +66,34 @@ "gcp": { "region": "europe-west1", "project_name": "", - "credentials": "" + "credentials": "", + "configuration": { + "function-gen1": { + "min-instances": 0, + "max-instances": 20 + }, + "function-gen2": { + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + }, + "container": { + "environment": "gen2", + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 1, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } }, "local": { "storage": { diff --git a/configs/systems.json b/configs/systems.json index 8f62eddb..019b2ad8 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -71,8 +71,12 @@ } } }, - "architecture": ["x64"], - "deployments": ["package"] + "architecture": [ + "x64" + ], + "deployments": [ + "package" + ] }, "aws": { "languages": { @@ -176,8 +180,14 @@ } } }, - "architecture": ["x64", "arm64"], - "deployments": ["package", "container"] + "architecture": [ + "x64", + "arm64" + ], + "deployments": [ + "package", + "container" + ] }, "azure": { "languages": { @@ -262,17 +272,21 @@ "username": "docker_user" } }, - "architecture": ["x64"], - "deployments": ["package"] + "architecture": [ + "x64" + ], + "deployments": [ + "package" + ] }, "gcp": { "languages": { "python": { "base_images": { "x64": { - "3.10": "ubuntu:22.04", - "3.11": "ubuntu:22.04", - "3.12": "ubuntu:22.04" + "3.10": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/python310@sha256:88ac475abc994f60c6d156e2fcce443396e581fc984e154b5f8608e3298a7d18", + "3.11": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/python311@sha256:3c346ccf7b0de6c3df6cda05378e96532de1cfb82aaf1e823dc1cc0ca674f61c", + "3.12": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/python312@sha256:5aa81b77d8361b4b82e3d59f08c834ffb354ccbb7ddf4eff8bb22ce26dee477f" } }, "images": [ @@ -283,15 +297,16 @@ "files": [ "handler.py", "storage.py", - "nosql.py" + "nosql.py", + "setup.py" ], "packages": [], "module_packages": { "storage": [ - "google-cloud-storage" + "google-cloud-storage==3.10.1" ], "nosql": [ - "google-cloud-datastore" + "google-cloud-datastore==2.24.0" ] } } @@ -299,8 +314,9 @@ "nodejs": { "base_images": { "x64": { - "18": "ubuntu:22.04", - "20": "ubuntu:22.04" + "20": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/nodejs20@sha256:504eb1f482bb62a9c3524bfebada43db93105d7fe6447dc7c96a9f4423e79908", + "22": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/nodejs22@sha256:56a74d24510c0f4d0da44e13226a150821a84440ddfbf89b52ea620ca729db07", + "24": "us-central1-docker.pkg.dev/serverless-runtimes/google-24/runtimes/nodejs24@sha256:8309735602e6228412ba362959f63eebcac9fe5e9a114eb1e63b7febef13030c" } }, "images": [ @@ -314,8 +330,8 @@ "nosql.js" ], "packages": { - "@google-cloud/datastore": "^9.1.0", - "@google-cloud/storage": "^4.0.0", + "@google-cloud/datastore": "9.1.0", + "@google-cloud/storage": "4.0.0", "uuid": "3.4.0" } } @@ -324,7 +340,7 @@ "base_images": { "x64": { "11": "us-central1-docker.pkg.dev/serverless-runtimes/google-18-full/runtimes/java11:deprecated-public-image-java11_20260217_11_0_RC00", - "17": "us-central1-docker.pkg.dev/serverless-runtimes/google-22-full/runtimes/java17:java17_20260215_17_0_RC00" + "17": "us-central1-docker.pkg.dev/serverless-runtimes/google-22/runtimes/java17:java17_20260405_17_0_RC00" } }, "images": [ @@ -348,8 +364,14 @@ "username": "docker_user" } }, - "architecture": ["x64"], - "deployments": ["package"] + "architecture": [ + "x64", + "arm64" + ], + "deployments": [ + "package", + "container" + ] }, "openwhisk": { "languages": { @@ -430,7 +452,11 @@ } } }, - "architecture": ["x64"], - "deployments": ["container"] + "architecture": [ + "x64" + ], + "deployments": [ + "container" + ] } } diff --git a/dockerfiles/gcp/java/Dockerfile.function b/dockerfiles/gcp/java/Dockerfile.function new file mode 100644 index 00000000..22a64970 --- /dev/null +++ b/dockerfiles/gcp/java/Dockerfile.function @@ -0,0 +1,46 @@ +ARG BASE_IMAGE +FROM $BASE_IMAGE as builder +ARG VERSION +ENV JAVA_VERSION=${VERSION} +ENV JAVA_INVOKER_VERSION=2.0.1 + +WORKDIR /workspace + +COPY . function/ +USER root + +# Install Maven 3.x (maven package may be old, install from Apache directly) +RUN curl -fsSL https://archive.apache.org/dist/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz | tar -xz -C /opt && \ + ln -s /opt/apache-maven-3.9.6 /opt/maven && \ + ln -s /opt/maven/bin/mvn /usr/local/bin/mvn +ENV PATH=/opt/maven/bin:$PATH + +RUN POM_PATH=$(find /workspace/function -maxdepth 3 -name "pom.xml" | head -n1) && \ + if [ "$JAVA_VERSION" = "11" ]; then \ + export JAVA_INVOKER_VERSION="1.4.3"; \ + else \ + export JAVA_INVOKER_VERSION="2.0.1"; \ + fi && \ + if [ -n "${POM_PATH}" ]; then \ + mvn -f "${POM_PATH}" -DskipTests clean package && \ + POM_DIR=$(dirname "${POM_PATH}") && \ + cp "${POM_DIR}/target/function.jar" /workspace/function.jar && \ + mvn -q dependency:copy \ + -Dartifact=com.google.cloud.functions.invoker:java-function-invoker:${JAVA_INVOKER_VERSION} \ + -DoutputDirectory=/workspace; \ + else \ + echo "No pom.xml found!" && \ + exit 1; \ + fi && \ + mv /workspace/java-function-invoker-${JAVA_INVOKER_VERSION}.jar /workspace/java-function-invoker.jar + +FROM $BASE_IMAGE + +WORKDIR /workspace + +COPY --from=builder /workspace/function.jar /workspace/function.jar +COPY --from=builder /workspace/java-function-invoker.jar /workspace/java-function-invoker.jar + +ENV JAVA_TOOL_OPTIONS="-XX:+TieredCompilation -XX:TieredStopAtLevel=1" + +CMD ["sh", "-c", "exec java -jar /workspace/java-function-invoker.jar --classpath /workspace/function.jar --target org.serverlessbench.Handler --port ${PORT:-8080}"] diff --git a/dockerfiles/gcp/nodejs/Dockerfile.build b/dockerfiles/gcp/nodejs/Dockerfile.build index 477f236b..7bfe22e1 100755 --- a/dockerfiles/gcp/nodejs/Dockerfile.build +++ b/dockerfiles/gcp/nodejs/Dockerfile.build @@ -1,12 +1,10 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} ARG VERSION -ENV NVM_DIR=/nvm -#RUN install_node --ignore-verification-failure v${VERSION} -RUN apt-get update && apt-get install -y gosu wget -RUN mkdir -p ${NVM_DIR} && wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash -RUN . ${NVM_DIR}/nvm.sh && nvm install ${VERSION} && nvm alias default ${VERSION} && nvm use default +COPY --from=tianon/gosu:1.19-debian /usr/local/bin/gosu /usr/local/bin/gosu + +USER root RUN mkdir -p /sebs/ COPY dockerfiles/nodejs_installer.sh /sebs/installer.sh diff --git a/dockerfiles/gcp/nodejs/Dockerfile.function b/dockerfiles/gcp/nodejs/Dockerfile.function new file mode 100644 index 00000000..cc2f5212 --- /dev/null +++ b/dockerfiles/gcp/nodejs/Dockerfile.function @@ -0,0 +1,17 @@ +ARG BASE_IMAGE +FROM $BASE_IMAGE + +WORKDIR /workspace + +USER root + +COPY . /workspace/ +RUN mkdir function/ && mv function.js storage.js nosql.js function/ + +ENV NODE_ENV=production + +RUN npm install --no-package-lock --omit=dev \ + && npm install --no-save --no-package-lock --omit=dev @google-cloud/functions-framework@5.0.2 \ + && npm cache clean --force + +CMD ["node", "./node_modules/@google-cloud/functions-framework/build/src/main.js", "--target=handler", "--source=handler.js"] diff --git a/dockerfiles/gcp/python/Dockerfile.build b/dockerfiles/gcp/python/Dockerfile.build index 88554d23..094d5223 100755 --- a/dockerfiles/gcp/python/Dockerfile.build +++ b/dockerfiles/gcp/python/Dockerfile.build @@ -5,11 +5,12 @@ ENV PYTHON_VERSION=${VERSION} ENV DEBIAN_FRONTEND="noninteractive" ENV TZ="Europe/Zurich" +COPY --from=tianon/gosu:1.19-debian /usr/local/bin/gosu /usr/local/bin/gosu + +USER root + RUN apt-get update\ - && apt-get install -y --no-install-recommends gosu gcc build-essential libxml2 libxml2-dev zlib1g-dev software-properties-common gpg-agent zip\ - && add-apt-repository -y ppa:deadsnakes/ppa\ - && apt-get update\ - && apt-get install -y python${PYTHON_VERSION} python${PYTHON_VERSION}-venv python${PYTHON_VERSION}-dev\ + && apt-get install -y --no-install-recommends gcc build-essential libxml2 libxml2-dev zlib1g-dev gpg-agent zip\ && apt-get purge -y --auto-remove #RUN export PATH=/opt/python3.7/bin:/opt/python3.6/bin:/opt/python3.5/bin:/opt/python3.4/bin:$PATH diff --git a/dockerfiles/gcp/python/Dockerfile.function b/dockerfiles/gcp/python/Dockerfile.function new file mode 100644 index 00000000..6eff1f53 --- /dev/null +++ b/dockerfiles/gcp/python/Dockerfile.function @@ -0,0 +1,38 @@ +ARG BASE_IMAGE +FROM $BASE_IMAGE +ARG VERSION +ENV PYTHON_VERSION=${VERSION} +ARG TARGET_ARCHITECTURE + +COPY . function/ + +ENV PLATFORM_ARG="" + +USER root + +RUN pip install --no-cache-dir functions-framework==3.10.1 gunicorn==25.3.0 \ + && pip cache purge + +RUN touch function/__init__.py \ + && if test -f "function/requirements.txt.${PYTHON_VERSION}"; then \ + pip install --no-cache-dir ${PLATFORM_ARG} --target . \ + -r function/requirements.txt \ + -r function/requirements.txt.${PYTHON_VERSION} \ + function/ && \ + pip cache purge; \ + else \ + pip install --no-cache-dir ${PLATFORM_ARG} --target . \ + -r function/requirements.txt \ + function/ && \ + pip cache purge; \ + fi + +RUN printf '%s\n' \ + 'from functions_framework import create_app' \ + 'app = create_app(target="handler", source="function/handler.py")' \ + > /ff_app.py + + +ENV PYTHONPATH="/" + +CMD ["sh", "-c", "exec gunicorn --bind :${PORT:-8080} --workers ${GUNICORN_WORKERS:-1} --threads ${GUNICORN_THREADS:-8} --timeout 0 ff_app:app"] diff --git a/docs/platforms.md b/docs/platforms.md index 9bda1fa7..e3332543 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -209,11 +209,100 @@ or in the JSON input configuration: "name": "gcp", "gcp": { "region": "europe-west1", + "project_name": "your-gcp-project-id", "credentials": "/path/to/project-credentials.json" } } ``` +### Deployment Modes + +SeBS models two GCP deployment targets: + +1. `function-gen1`: the first Google Cloud Functions Gen1 path. +2. `container`: direct container deployment to Cloud Run. + +We plan also to add support for `function-gen2`, the current Google Cloud Functions Gen2 path. +These deployment types intentionally share a single GCP backend in SeBS, but they are not identical in packaging, naming, scaling, or performance behavior. + +On GCP, there are two different concurrency layers that should not be confused: +* platform concurrency: how many requests GCP may send to one instance (`gcp-concurrency`) +* runtime concurrency: how many requests the language server is prepared to process internally (`worker-concurrency`, `worker-threads`) +This design is intentional. A single Cloud Run concurrency number is not enough to reason about performance if the application server is underprovisioned or overprovisioned relative to the platform. + +### Function Gen1 + +Gen1 is the currently implemented Google-managed function deployment path in SeBS. The packaging flow is ZIP-based: +* benchmark sources are moved into a `function/` subdirectory, +* the language wrapper file is renamed to the GCP-required entrypoint name (`main.py` for Python, `index.js` for Node.js), +* the whole directory is archived and uploaded to Cloud Storage, +* Cloud Functions Gen1 is then updated from the uploaded archive. + +Gen1 configuration currently exposes instance-scaling controls: + +```json +"deployment": { + "name": "gcp", + "gcp": { + "region": "europe-west1", + "project_name": "your-gcp-project-id", + "credentials": "/path/to/project-credentials.json", + "configuration": { + "function-gen1": { + "min-instances": 0, + "max-instances": 20 + } + } + } +} +``` + +Use Gen1 when you want the most established GCP path in SeBS and do not need container-level runtime tuning. + +### Cloud Run Container Deployments + +Container deployments are the currently implemented Cloud Run-based path in SeBS. They are selected with container deployment and use a provider-specific function image built from `Dockerfile.function`. +At the deployment level, SeBS configures Cloud Run service properties: + +```json +"deployment": { + "name": "gcp", + "gcp": { + "region": "europe-west1", + "project_name": "your-gcp-project-id", + "credentials": "/path/to/project-credentials.json", + "configuration": { + "container": { + "environment": "gen2", + "vcpus": 1, + "gcp-concurrency": 80, + "worker-concurrency": 80, + "worker-threads": 8, + "min-instances": 0, + "max-instances": 20, + "cpu-boost": false, + "cpu-throttle": true + } + } + } +} +``` + +For Python, SeBS uses [`functions-framework`](https://github.com/GoogleCloudPlatform/functions-framework-python) behind `gunicorn` rather than the framework's default development server, as [recommended by Cloud Run performance guidance](https://docs.cloud.google.com/run/docs/tips/python). +For Node.js, SeBS uses [`@google-cloud/functions-framework`](https://github.com/GoogleCloudPlatform/functions-framework-nodejs) started directly with `node` rather than via `npm start`, [as recommended by Cloud Run performance guidance](https://docs.cloud.google.com/run/docs/tips/nodejs). +For Java, we use [`java-function-invoker`](https://mvnrepository.com/artifact/com.google.cloud.functions.invoker/java-function-invoker), and we disable tiered compilation to speed up startup time, [as recommended by Cloud Run performance guidance](https://docs.cloud.google.com/run/docs/tips/java). + +Cloud Run containers can [execute in two environments](https://docs.cloud.google.com/run/docs/configuring/execution-environment): gVisor-based gen1, and VM-based gen2. + +### Current Limitations + +The current GCP backend has the following practical limits: +* Gen1 is the primary managed-functions deployment path today. +* Gen2 is planned and partially modeled in configuration, but not yet fully deployed through a dedicated strategy. +* Cloud Run containers are implemented today and provide the most tuning control. +* GCP deployments currently reject `arm64`, as arm64 instances are not available for GCR. +* C++ packaging is not supported on GCP (but possible to be implemented on containers). + ## OpenWhisk SeBS expects users to deploy and configure an OpenWhisk instance. diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index ad68d967..c001e651 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -634,7 +634,7 @@ def format_function_name(func_name: str) -> str: func_name = func_name.replace(".", "_") return func_name - def delete_function(self, func_name: str) -> None: + def delete_function(self, func_name: str, function: Dict) -> None: """Delete an AWS Lambda function. Args: diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 798f29d1..9a55085e 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -668,7 +668,7 @@ def update_function_configuration(self, function: Function, code_package: Benchm "Updating function's memory and timeout configuration is not supported." ) - def delete_function(self, func_name: str) -> None: + def delete_function(self, func_name: str, function: Dict) -> None: """Delete an Azure Function App and its associated storage account. Args: @@ -680,14 +680,7 @@ def delete_function(self, func_name: str) -> None: For Azure, we need to retrieve the associated storage account. Each function has its own storage account. """ - all_functions = self.cache_client.get_all_functions(self.name()) - if func_name not in all_functions: - self.logging.error( - f"Failed to find function {func_name} in functions: {all_functions.keys()}." - ) - raise RuntimeError(f"Failed to find function {func_name} in cache.") - - function = cast(AzureFunction, self.function_type().deserialize(all_functions[func_name])) + function_obj = cast(AzureFunction, self.function_type().deserialize(function)) try: self.cli_instance.execute( @@ -700,10 +693,12 @@ def delete_function(self, func_name: str) -> None: raise e self.logging.info( - f"Deleting storage account {function.function_storage.account_name} " + f"Deleting storage account {function_obj.function_storage.account_name} " f"associated with function {func_name}" ) - self.config.resources.delete_storage_account(self.cli_instance, function.function_storage) + self.config.resources.delete_storage_account( + self.cli_instance, function_obj.function_storage + ) def _mount_function_code(self, code_package: Benchmark) -> str: """Mount function code package in Azure CLI container. diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 72bf5b0d..d76d3e48 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -719,7 +719,7 @@ def name() -> str: """ pass - def delete_function(self, func_name: str) -> None: + def delete_function(self, func_name: str, function: Dict) -> None: """Delete cloud deployment of a function. Args: @@ -741,7 +741,7 @@ def cleanup_functions(self, dry_run: bool) -> List[str]: for name, func in functions.items(): if not dry_run: - self.delete_function(name) + self.delete_function(name, func) deleted.append(name) if dry_run: diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index 857bf105..533bc9f9 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -18,10 +18,13 @@ resources = GCPResources() config = GCPConfig(credentials, resources) """ +from __future__ import annotations import json import os from typing import cast, Dict, List, Optional, Tuple +import time +from googleapiclient.errors import HttpError from sebs.cache import Cache from sebs.faas.config import Config, Credentials, Resources @@ -178,6 +181,321 @@ def update_cache(self, cache: Cache) -> None: cache.update_config(val=self._project_id, keys=["gcp", "credentials", "project_id"]) +class GCPFunctionGen1Config: + """Configuration for Cloud Functions Gen1 deployments.""" + + def __init__(self, min_instances: int = 0, max_instances: int = 20): + """Initialize Gen1 scaling settings. + + Args: + min_instances: Minimum number of instances to keep warm. + max_instances: Maximum number of instances allowed. + """ + self.min_instances = min_instances + self.max_instances = max_instances + + def serialize(self) -> Dict: + """Serialize the Gen1 configuration. + + Returns: + Dictionary containing the serialized Gen1 settings. + """ + return {"min-instances": self.min_instances, "max-instances": self.max_instances} + + @staticmethod + def deserialize(dct: Dict) -> "GCPFunctionGen1Config": + """Deserialize a Gen1 configuration. + + Args: + dct: Serialized Gen1 configuration. + + Returns: + Deserialized Gen1 configuration object. + """ + return GCPFunctionGen1Config( + min_instances=dct.get("min-instances", 0), max_instances=dct.get("max-instances", 20) + ) + + def __eq__(self, other: object) -> bool: + """Compare two Gen1 configurations. + + Args: + other: Configuration object to compare against. + + Returns: + True if both objects have the same Gen1 settings, otherwise False. + """ + if not isinstance(other, GCPFunctionGen1Config): + return False + return ( + self.min_instances == other.min_instances and self.max_instances == other.max_instances + ) + + +class GCPFunctionGen2Config: + """Configuration for Cloud Functions Gen2 deployments.""" + + def __init__( + self, + vcpus: int = 1, + gcp_concurrency: int = 80, + worker_concurrency: int = 80, + worker_threads: int = 8, + min_instances: int = 0, + max_instances: int = 20, + cpu_boost: bool = False, + cpu_throttle: bool = True, + ): + """Initialize Gen2 runtime and scaling settings. + + Args: + vcpus: Number of virtual CPUs assigned to the function. + gcp_concurrency: Cloud Functions concurrency setting. + worker_concurrency: Worker count used by the runtime server. + worker_threads: Thread count used by the runtime server. + min_instances: Minimum number of warm instances. + max_instances: Maximum number of instances allowed. + cpu_boost: Whether startup CPU boost is enabled. + cpu_throttle: Whether CPU throttling is enabled when idle. + """ + self.vcpus = vcpus + self.gcp_concurrency = gcp_concurrency + self.worker_concurrency = worker_concurrency + self.worker_threads = worker_threads + self.min_instances = min_instances + self.max_instances = max_instances + self.cpu_boost = cpu_boost + self.cpu_throttle = cpu_throttle + + def serialize(self) -> Dict: + """Serialize the Gen2 configuration. + + Returns: + Dictionary containing the serialized Gen2 settings. + """ + return { + "vcpus": self.vcpus, + "gcp-concurrency": self.gcp_concurrency, + "worker-concurrency": self.worker_concurrency, + "worker-threads": self.worker_threads, + "min-instances": self.min_instances, + "max-instances": self.max_instances, + "cpu-boost": self.cpu_boost, + "cpu-throttle": self.cpu_throttle, + } + + @staticmethod + def deserialize(dct: Dict) -> GCPFunctionGen2Config: + """Deserialize a Gen2 configuration. + + Args: + dct: Serialized Gen2 configuration. + + Returns: + Deserialized Gen2 configuration object. + """ + return GCPFunctionGen2Config( + vcpus=dct["vcpus"], + gcp_concurrency=dct["gcp-concurrency"], + worker_concurrency=dct["worker-concurrency"], + worker_threads=dct["worker-threads"], + min_instances=dct["min-instances"], + max_instances=dct["max-instances"], + cpu_boost=dct["cpu-boost"], + cpu_throttle=dct["cpu-throttle"], + ) + + def __eq__(self, other: object) -> bool: + """Compare two Gen2 configurations. + + Args: + other: Configuration object to compare against. + + Returns: + True if both objects have the same Gen2 settings, otherwise False. + """ + if not isinstance(other, GCPFunctionGen2Config): + return False + return ( + self.vcpus == other.vcpus + and self.gcp_concurrency == other.gcp_concurrency + and self.worker_concurrency == other.worker_concurrency + and self.worker_threads == other.worker_threads + and self.min_instances == other.min_instances + and self.max_instances == other.max_instances + and self.cpu_boost == other.cpu_boost + and self.cpu_throttle == other.cpu_throttle + ) + + +class GCPContainerConfig(GCPFunctionGen2Config): + """Configuration for Cloud Run container deployments.""" + + def __init__( + self, + environment: str = "gen1", + vcpus: int = 1, + gcp_concurrency: int = 80, + worker_concurrency: int = 80, + worker_threads: int = 8, + min_instances: int = 0, + max_instances: int = 20, + cpu_boost: bool = False, + cpu_throttle: bool = True, + ): + """Initialize Cloud Run container settings. + + Args: + environment: Cloud Run execution environment name. + vcpus: Number of virtual CPUs assigned to the container. + gcp_concurrency: Cloud Run request concurrency limit. + worker_concurrency: Worker count used by the runtime server. + worker_threads: Thread count used by the runtime server. + min_instances: Minimum number of warm instances. + max_instances: Maximum number of instances allowed. + cpu_boost: Whether startup CPU boost is enabled. + cpu_throttle: Whether CPU throttling is enabled when idle. + """ + super().__init__( + vcpus, + gcp_concurrency, + worker_concurrency, + worker_threads, + min_instances, + max_instances, + cpu_boost, + cpu_throttle, + ) + self.environment = environment + + def serialize(self) -> Dict: + """Serialize the container configuration. + + Returns: + Dictionary containing the serialized container settings. + """ + return {**super().serialize(), "environment": self.environment} + + @staticmethod + def deserialize(dct: Dict) -> GCPContainerConfig: + """Deserialize a container configuration. + + Args: + dct: Serialized container configuration. + + Returns: + Deserialized container configuration object. + """ + return GCPContainerConfig( + environment=dct["environment"], + vcpus=dct["vcpus"], + gcp_concurrency=dct["gcp-concurrency"], + worker_concurrency=dct["worker-concurrency"], + worker_threads=dct["worker-threads"], + min_instances=dct["min-instances"], + max_instances=dct["max-instances"], + cpu_boost=dct["cpu-boost"], + cpu_throttle=dct["cpu-throttle"], + ) + + def __eq__(self, other: object) -> bool: + """Compare two container configurations. + + Args: + other: Configuration object to compare against. + + Returns: + True if both objects have the same container settings, otherwise False. + """ + if not isinstance(other, GCPContainerConfig): + return False + return ( + self.environment == other.environment + and self.vcpus == other.vcpus + and self.gcp_concurrency == other.gcp_concurrency + and self.worker_concurrency == other.worker_concurrency + and self.worker_threads == other.worker_threads + and self.min_instances == other.min_instances + and self.max_instances == other.max_instances + and self.cpu_boost == other.cpu_boost + and self.cpu_throttle == other.cpu_throttle + ) + + +class GCPConfiguration: + """User-provided configuration of workloads on the GCP. + + Currently, this class primarily inherits functionality from the base `Resources` + class, as we do not need more GCP-specific resources beyond standard storage buckets. + + Attributes: + Inherits all attributes from the base Resources class + """ + + def __init__(self) -> None: + """Initialize default GCP deployment configuration.""" + self._function_gen1_config = GCPFunctionGen1Config() + self._function_gen2_config = GCPFunctionGen2Config() + self._container_config = GCPContainerConfig() + + @staticmethod + def initialize(config: GCPConfiguration, dct: Dict) -> GCPConfiguration: + """Populate a configuration object from serialized data. + + Args: + config: Configuration object to populate. + dct: Serialized deployment configuration. + + Returns: + The populated configuration object. + """ + + config._function_gen1_config = GCPFunctionGen1Config.deserialize(dct["function-gen1"]) + config._function_gen2_config = GCPFunctionGen2Config.deserialize(dct["function-gen2"]) + config._container_config = GCPContainerConfig.deserialize(dct["container"]) + + return config + + def serialize(self) -> Dict: + """Serialize resources to dictionary for cache storage. + + Returns: + Dictionary representation of resources for cache storage + """ + out = {} + out["function-gen1"] = self._function_gen1_config.serialize() + out["function-gen2"] = self._function_gen2_config.serialize() + out["container"] = self._container_config.serialize() + return out + + @property + def function_gen1_config(self) -> GCPFunctionGen1Config: + """Get the Gen1 deployment configuration. + + Returns: + Gen1 deployment configuration object. + """ + return self._function_gen1_config + + @property + def function_gen2_config(self) -> GCPFunctionGen2Config: + """Get the Gen2 deployment configuration. + + Returns: + Gen2 deployment configuration object. + """ + return self._function_gen2_config + + @property + def container_config(self) -> GCPContainerConfig: + """Get the Cloud Run container configuration. + + Returns: + Cloud Run container deployment configuration object. + """ + return self._container_config + + class GCPResources(Resources): """Resource manager for serverless resources on Google Cloud Platform. @@ -191,6 +509,7 @@ class GCPResources(Resources): def __init__(self) -> None: """Initialize GCP resources manager.""" super().__init__(name="gcp") + self._container_repository: Optional[str] = None @staticmethod def initialize(res: Resources, dct: Dict) -> "GCPResources": @@ -205,6 +524,7 @@ def initialize(res: Resources, dct: Dict) -> "GCPResources": """ ret = cast(GCPResources, res) super(GCPResources, GCPResources).initialize(ret, dct) + return ret def serialize(self) -> Dict: @@ -213,7 +533,9 @@ def serialize(self) -> Dict: Returns: Dictionary representation of resources for cache storage """ - return super().serialize() + out = super().serialize() + out["container_repository"] = self._container_repository + return out @staticmethod def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": @@ -258,6 +580,73 @@ def update_cache(self, cache: Cache) -> None: """ super().update_cache(cache) + @property + def container_repository(self) -> str: + """Get the Artifact Registry repository name for containers.""" + assert self._container_repository is not None, "Container repository has not been set yet!" + return self._container_repository + + def check_container_repository_exists(self, config: Config, ar_client): + """Check whether the configured container repository exists.""" + try: + credentials = cast(GCPCredentials, config.credentials) + parent = f"projects/{credentials.project_name}/locations/{config.region}" + repo_full_name = f"{parent}/repositories/{self._container_repository}" + self.logging.info("Checking if container repository exists...") + ar_client.projects().locations().repositories().get(name=repo_full_name).execute() + return True + except HttpError as e: + if e.resp.status == 404: + self.logging.info("Container repository does not exist.") + return False + else: + raise e + + def create_container_repository(self, ar_client, parent): + """Create the Artifact Registry repository for benchmark containers.""" + request_body = {"format": "DOCKER", "description": "Container repository for SEBS"} + self._container_repository = f"sebs-benchmarks-{self._resources_id}" + operation = ( + ar_client.projects() + .locations() + .repositories() + .create(parent=parent, body=request_body, repositoryId=self._container_repository) + .execute() + ) + + attempt = 0 + max_retries = 10 + + while attempt < max_retries: + # Operations for AR are global or location specific + op_name = operation["name"] + op = ar_client.projects().locations().operations().get(name=op_name).execute() + + if op.get("done"): + if "error" in op: + raise Exception(f"Failed to create repo: {op['error']}") + self.logging.info("Repository created successfully.") + break + time.sleep(2) + attempt += 1 + + if attempt == max_retries: + raise TimeoutError("Timed out waiting for container repository creation.") + + def get_container_repository(self, config: Config, ar_client): + """Get or create the Artifact Registry repository for container images.""" + if self._container_repository is not None: + return self._container_repository + + self._container_repository = f"sebs-benchmarks-{self._resources_id}" + if self.check_container_repository_exists(config, ar_client): + return self._container_repository + + credentials = cast(GCPCredentials, config.credentials) + parent = f"projects/{credentials.project_name}/locations/{config.region}" + self.create_container_repository(ar_client, parent) + return self._container_repository + class GCPConfig(Config): """Main configuration class for Google Cloud Platform deployment. @@ -289,6 +678,8 @@ def __init__(self, credentials: GCPCredentials, resources: GCPResources) -> None self._credentials = credentials self._resources = resources + self._deployment_config = GCPConfiguration() + @property def region(self) -> str: """Get the GCP region for resource deployment. @@ -325,6 +716,15 @@ def resources(self) -> GCPResources: """ return self._resources + @property + def deployment_config(self) -> GCPConfiguration: + """Get the deployment configuration for GCP workloads. + + Returns: + GCPConfiguration instance containing workload deployment settings + """ + return self._deployment_config + @staticmethod def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> "Config": """Deserialize GCP configuration from dictionary and cache. @@ -354,6 +754,10 @@ def deserialize(config: Dict, cache: Cache, handlers: LoggingHandlers) -> "Confi config_obj.logging.info("Using user-provided config for GCP") GCPConfig.initialize(config_obj, config) + # deployment configuration is never loaded from cache - always fresh! + if "configuration" in config: + GCPConfiguration.initialize(config_obj.deployment_config, config["configuration"]) + # mypy makes a mistake here updated_keys: List[Tuple[str, List[str]]] = [("region", ["gcp", "region"])] # type: ignore # for each attribute here, check if its version is different than the one provided by diff --git a/sebs/gcp/container.py b/sebs/gcp/container.py new file mode 100644 index 00000000..81146791 --- /dev/null +++ b/sebs/gcp/container.py @@ -0,0 +1,234 @@ +"""Cloud Run container helpers for Google Cloud Platform deployments.""" + +import docker +from typing import cast, Tuple + +from sebs.gcp.config import GCPConfig, GCPCredentials +from sebs.config import SeBSConfig +from sebs.faas.container import DockerContainer +from googleapiclient.discovery import build +from google.oauth2 import service_account +from googleapiclient.errors import HttpError +from google.auth.transport.requests import Request + + +class GCRContainer(DockerContainer): + """Cloud Run container helper for building and pushing GCP images.""" + + @staticmethod + def name(): + """Return the deployment name used for GCP container images. + + Returns: + The string ``"gcp"``. + """ + return "gcp" + + @staticmethod + def typename() -> str: + """Return the runtime type name for serialization. + + Returns: + Runtime type identifier used by SeBS serialization. + """ + return "GCP.GCRContainer" + + def __init__( + self, + system_config: SeBSConfig, + config: GCPConfig, + docker_client: docker.client.DockerClient, + ): + """Initialize the GCP container helper. + + Args: + system_config: SeBS system configuration. + config: GCP deployment configuration. + docker_client: Docker client used for local image operations. + """ + super().__init__(system_config, docker_client) + self.config: GCPConfig = config + self.creds = service_account.Credentials.from_service_account_file( + self.config.credentials.gcp_credentials, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + self.ar_client = build("artifactregistry", "v1", credentials=self.creds) + + def registry_name( + self, benchmark: str, language_name: str, language_version: str, architecture: str + ) -> Tuple[str, str, str, str]: + """Build the Artifact Registry path for a benchmark image. + + Args: + benchmark: Benchmark name. + language_name: Programming language name. + language_version: Runtime version string. + architecture: Target CPU architecture. + + Returns: + Tuple of registry name, repository name, image tag, and full image URI. + """ + + project_id = self.config.credentials.project_name + region = self.config.region + registry_name = f"{region}-docker.pkg.dev/{project_id}" + repository_name = self.config.resources.get_container_repository( + self.config, self.ar_client + ) + + image_tag = self.system_config.benchmark_image_tag( + self.name(), benchmark, language_name, language_version, architecture + ) + image_uri = f"{registry_name}/{repository_name}/{benchmark}:{image_tag}" + + return registry_name, repository_name, image_tag, image_uri + + def find_image(self, repository_name, image_tag) -> bool: + """Check whether a tagged image already exists in Artifact Registry. + + Args: + repository_name: Artifact Registry repository name. + image_tag: Docker image tag to look up. + + Returns: + True if the image tag exists in the repository, otherwise False. + """ + try: + credentials = cast(GCPCredentials, self.config.credentials) + parent = ( + f"projects/{credentials.project_name}" + f"/locations/{self.config.region}" + f"/repositories/{repository_name}" + ) + response = ( + self.ar_client.projects() + .locations() + .repositories() + .dockerImages() + .list(parent=parent) + .execute() + ) + if "dockerImages" in response: + for image in response["dockerImages"]: + if image_tag in image.get("tags", []): + return True + except HttpError as e: + if e.resp.status == 404: + return False + raise e + return False + + def push_to_registry( + self, + benchmark: str, + language_name: str, + language_version: str, + architecture: str, + ) -> str: + """Push a benchmark image and resolve it to an immutable digest URI. + + Args: + benchmark: Benchmark name. + language_name: Programming language name. + language_version: Runtime version string. + architecture: Target CPU architecture. + + Returns: + Immutable image URI if Docker exposes a digest, otherwise the tag URI. + """ + image_uri = super().push_to_registry( + benchmark, language_name, language_version, architecture + ) + return self.resolve_image_uri(image_uri) + + def build_base_image( + self, + directory: str, + language, + language_version: str, + architecture: str, + benchmark: str, + is_cached: bool, + builder_image: str, + ) -> Tuple[bool, str, float]: + """Build the benchmark image and resolve the final image URI. + + Args: + directory: Benchmark source directory. + language: Benchmark language enum. + language_version: Runtime version string. + architecture: Target CPU architecture. + benchmark: Benchmark name. + is_cached: Whether the build can reuse a cached image. + builder_image: Builder image to use for the build stage. + + Returns: + Tuple of rebuild flag, image URI, and image size in MB. + """ + rebuilt, image_uri, size_mb = super().build_base_image( + directory, + language, + language_version, + architecture, + benchmark, + is_cached, + builder_image, + ) + return rebuilt, self.resolve_image_uri(image_uri), size_mb + + def resolve_image_uri(self, image_uri: str) -> str: + """Resolve a tag URI to an immutable digest URI when Docker exposes one. + + Args: + image_uri: Image URI to inspect. + + Returns: + Digest URI if available, otherwise the original tag URI. + """ + if "@sha256:" in image_uri: + return image_uri + + repository = image_uri.rsplit(":", 1)[0] + try: + image = self.docker_client.images.get(image_uri) + except docker.errors.ImageNotFound: + self.logging.warning( + f"Could not inspect pushed image {image_uri}; deploying mutable tag reference." + ) + return image_uri + + repo_digests = image.attrs.get("RepoDigests", []) + for digest_uri in repo_digests: + if digest_uri.split("@", 1)[0] == repository: + self.logging.info(f"Resolved image {image_uri} to digest {digest_uri}") + return digest_uri + + self.logging.warning( + f"No registry digest found for {image_uri}; deploying mutable tag reference." + ) + return image_uri + + def push_image(self, repository_uri, image_tag): + """Authenticate to Artifact Registry and push the built image. + + Args: + repository_uri: Artifact Registry repository URI. + image_tag: Docker image tag to push. + + Raises: + RuntimeError: If the push operation fails. + """ + self.logging.info("Authenticating Docker against Artifact Registry...") + self.creds.refresh(Request()) + auth_token = self.creds.token + + try: + self.docker_client.login( + username="oauth2accesstoken", password=auth_token, registry=repository_uri + ) + super().push_image(repository_uri, image_tag) + self.logging.info(f"Successfully pushed the image to registry {repository_uri}.") + except docker.errors.DockerException as e: + self.logging.error(f"Failed to push the image to registry {repository_uri}.") + self.logging.error(f"Error: {str(e)}") + raise RuntimeError("Couldn't push to registry.") diff --git a/sebs/gcp/function.py b/sebs/gcp/function.py index 6f1c8ec8..5a71135c 100644 --- a/sebs/gcp/function.py +++ b/sebs/gcp/function.py @@ -14,13 +14,57 @@ config = FunctionConfig(memory=256, timeout=60, runtime="python39") function = GCPFunction("my-function", "benchmark-name", "hash123", config) """ +from __future__ import annotations -from typing import cast, Dict, Optional +from enum import Enum +from typing import cast, Dict, Optional, Union from sebs.faas.config import Resources from sebs.faas.function import Function, FunctionConfig from sebs.gcp.storage import GCPStorage +from sebs.gcp.config import GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig + + +class FunctionDeploymentType(str, Enum): + """Enumeration of deployment methods on GCP. + + - FUNCTION_GEN1: Original Google Cloud Functions. + - FUNCTION_GEN2: Google Cloud Functions gen2, based on Cloud Run. + - CONTAINER: Google Cloud Run containers. + """ + + FUNCTION_GEN1 = "function-gen1" + FUNCTION_GEN2 = "function-gen2" + CONTAINER = "container" + + @property + def is_container(self) -> bool: + """Return whether the deployment type is container-based. + + Returns: + True if the deployment uses Cloud Run containers, otherwise False. + """ + return self == FunctionDeploymentType.CONTAINER + + @staticmethod + def deserialize(val: str) -> FunctionDeploymentType: + """Deserialize a string value to a FunctionDeploymentEngine enum. + + Args: + val: String value to convert to enum + + Returns: + FunctionDeploymentEngine: Corresponding enum value + + Raises: + Exception: If the value doesn't match any enum member + """ + for member in FunctionDeploymentType: + if member.value == val: + return member + raise Exception(f"Unknown GCP function deployment type {val}") + class GCPFunction(Function): """Represents a Google Cloud Function with GCP-specific functionality. @@ -38,7 +82,10 @@ def __init__( benchmark: str, code_package_hash: str, cfg: FunctionConfig, + deployment_type: FunctionDeploymentType, + deployment_config: Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig], bucket: Optional[str] = None, + container_uri: Optional[str] = None, ) -> None: """Initialize a GCP Cloud Function instance. @@ -47,10 +94,15 @@ def __init__( benchmark: Name of the benchmark this function implements code_package_hash: Hash of the code package for version tracking cfg: Function configuration (memory, timeout, etc.) + deployment_type: Type of deployment (function-gen1, container-gen1, etc.) bucket: Optional Cloud Storage bucket name for code storage + deployment_config: Deployment-specific configuration """ super().__init__(benchmark, name, code_package_hash, cfg) self.bucket = bucket + self._container_uri = container_uri + self._deployment_type = deployment_type + self._deployment_config = deployment_config @staticmethod def typename() -> str: @@ -61,6 +113,26 @@ def typename() -> str: """ return "GCP.GCPFunction" + @property + def deployment_type(self) -> FunctionDeploymentType: + """Get the deployment type for this function. + + Returns: + Deployment type enum for the function. + """ + return self._deployment_type + + @property + def deployment_config( + self, + ) -> Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig]: + """Get the deployment-specific configuration for this function. + + Returns: + Deployment-specific configuration object. + """ + return self._deployment_config + def serialize(self) -> Dict: """Serialize function to dictionary for cache storage. Adds code bucket in cloud storage. @@ -68,10 +140,14 @@ def serialize(self) -> Dict: Returns: Dictionary containing function state including bucket information """ - return { + out = { **super().serialize(), + "container-uri": self._container_uri, "bucket": self.bucket, + "deployment_type": self.deployment_type, + "deployment_config": self._deployment_config.serialize(), } + return out @staticmethod def deserialize(cached_config: Dict) -> "GCPFunction": @@ -91,14 +167,33 @@ def deserialize(cached_config: Dict) -> "GCPFunction": """ from sebs.faas.function import Trigger from sebs.gcp.triggers import LibraryTrigger, HTTPTrigger + from sebs.gcp.config import ( + GCPFunctionGen1Config, + GCPFunctionGen2Config, + GCPContainerConfig, + ) cfg = FunctionConfig.deserialize(cached_config["config"]) + deployment_type = FunctionDeploymentType.deserialize(cached_config["deployment_type"]) + + dep_cfg_dict = cached_config["deployment_config"] + deployment_config: Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig] + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + deployment_config = GCPFunctionGen1Config.deserialize(dep_cfg_dict) + elif deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + deployment_config = GCPFunctionGen2Config.deserialize(dep_cfg_dict) + else: + deployment_config = GCPContainerConfig.deserialize(dep_cfg_dict) + ret = GCPFunction( cached_config["name"], cached_config["benchmark"], cached_config["hash"], cfg, + deployment_type, + deployment_config, cached_config["bucket"], + cached_config["container-uri"], ) for trigger in cached_config["triggers"]: trigger_type = cast( @@ -109,6 +204,22 @@ def deserialize(cached_config: Dict) -> "GCPFunction": ret.add_trigger(trigger_type.deserialize(trigger)) return ret + def container_uri(self) -> str | None: + """Return the container image URI if this function uses one. + + Returns: + Container image URI, or ``None`` for non-container deployments. + """ + return self._container_uri + + def set_container_uri(self, container_uri: str | None) -> None: + """Update the container image URI for this function. + + Args: + container_uri: Container image URI to store, or ``None``. + """ + self._container_uri = container_uri + def code_bucket(self, benchmark: str, storage_client: GCPStorage) -> str: """Get or create the Cloud Storage bucket for function code. diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index ff941c83..833661e4 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -35,7 +35,7 @@ import math import zipfile from datetime import datetime, timezone -from typing import cast, Dict, Optional, Tuple, List, Type +from typing import Any, cast, Dict, Optional, Tuple, List, Type, Protocol, Union from googleapiclient.discovery import build from googleapiclient.errors import HttpError @@ -44,18 +44,1561 @@ from sebs.cache import Cache from sebs.config import SeBSConfig -from sebs.benchmark import Benchmark +from sebs.benchmark import Benchmark, BenchmarkConfig from sebs.faas.function import Function, FunctionConfig, Trigger from sebs.faas.config import Resources from sebs.faas.system import System -from sebs.gcp.config import GCPConfig +from sebs.gcp.config import ( + GCPConfig, + GCPFunctionGen1Config, + GCPFunctionGen2Config, + GCPContainerConfig, +) from sebs.gcp.resources import GCPSystemResources from sebs.gcp.storage import GCPStorage -from sebs.gcp.function import GCPFunction -from sebs.utils import LoggingHandlers +from sebs.gcp.function import GCPFunction, FunctionDeploymentType +from sebs.gcp.container import GCRContainer +from sebs.utils import LoggingHandlers, ColoredWrapper from sebs.sebs_types import Language +class DeploymentStrategy(Protocol): + """Protocol defining the interface for GCP deployment strategies. + + Different deployment types (Cloud Function Gen1, Cloud Run, etc.) implement + this protocol to handle their specific deployment, update, and management logic. + """ + + """ + Google API is not the most robust - sometimes we need to retry REST operations. + """ + TRANSIENT_HTTP_CODES: frozenset[int] = frozenset({429, 503}) + + @staticmethod + def _execute_with_retry( + logging: ColoredWrapper, + request, + max_retries: int = 5, + base_delay: float = 1.0, + max_delay: float = 32.0, + ) -> Dict: + """Execute a googleapiclient request with retry logic for transient errors. + + Handles transient HTTP errors (503, 429) by retrying with exponential backoff + and jitter. Non-transient errors are raised immediately without retry. + + Args: + request: googleapiclient request object to execute + max_retries: Maximum number of retry attempts (default: 5) + base_delay: Base delay in seconds for exponential backoff (default: 1.0) + max_delay: Maximum delay between retries in seconds (default: 32.0) + + Returns: + Response dictionary from the API call + + Raises: + HttpError: If the request fails with a non-transient error or after + exhausting all retry attempts + """ + attempt = 0 + last_error = None + + while attempt <= max_retries: + try: + result = request.execute() + if attempt > 0: + logging.info(f"Request succeeded after {attempt} retries") + return result + except HttpError as e: + status_code = e.resp.status + last_error = e + + # Only retry on transient errors + if status_code not in DeploymentStrategy.TRANSIENT_HTTP_CODES: + raise + + # Check if we have retries left + if attempt >= max_retries: + logging.error( + f"Max retries ({max_retries}) exhausted, failing with status {status_code}" + ) + raise + + # Calculate delay with exponential backoff and jitter + delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) + + if attempt == 0: + logging.warning( + f"Transient error {status_code}, retrying " + f"(attempt {attempt + 1}/{max_retries})" + ) + else: + logging.info(f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s backoff") + + time.sleep(delay) + attempt += 1 + + # This should not be reached, but just in case + if last_error: + raise last_error + raise RuntimeError("Unexpected state in retry logic") + + def create( + self, + func_name: str, + code_package: Benchmark, + function_cfg: FunctionConfig, + envs: Dict, + container_uri: str | None, + ) -> None: + """Create function/service without waiting for deployment to complete. + + Args: + func_name: Name for the function/service + code_package: Benchmark package with code + function_cfg: Function configuration (memory, timeout, etc.) + envs: Environment variables + container_uri: Container image URI (for container deployments) + """ + ... + + def update_code( + self, + function: "GCPFunction", + code_package: Benchmark, + envs: Dict, + container_uri: str | None, + ) -> None: + """Update function/service code without waiting for deployment to complete. + + Args: + function: Existing function instance + code_package: New benchmark package + envs: Environment variables + container_uri: Container image URI (for container deployments) + """ + ... + + def update_config( + self, + function: "GCPFunction", + envs: Dict, + ) -> int: + """Update function/service configuration (memory, timeout, env vars). + + Args: + function: Function instance to update + envs: Environment variables + + Returns: + Version number after update + """ + ... + + def wait_for_deployment( + self, + func_name: str, + ) -> None: + """Wait for deployment to complete (build polling for Gen1, operation wait for Run). + + Args: + func_name: Name of the function/service to wait for + """ + ... + + def allow_public_access(self, project_name: str, location: str, func_name: str) -> None: + """Set IAM policy for public access. + + Args: + func_name: Function/service name + full_resource_name: Full GCP resource name + """ + ... + + def create_trigger( + self, + func_name: str, + ) -> str: + """Create HTTP trigger and return the invoke URL. + + Args: + func_name: Function/service name + + Returns: + HTTP trigger URL + """ + ... + + def update_envs( + self, + full_function_name: str, + envs: Dict, + ) -> Dict: + """Merge new environment variables with existing ones. + + Args: + full_function_name: Full GCP resource name + envs: New environment variables to add/update + + Returns: + Merged environment variables dictionary + """ + ... + + def generate_runtime_envs(self) -> Dict: + """Generate deployment-runtime environment variables.""" + ... + + def is_deployed( + self, + func_name: str, + versionId: int = -1, + ) -> Tuple[bool, int]: + """Check if function/service is deployed. + + Args: + func_name: Function/service name + versionId: Optional specific version ID to verify (-1 to check any) + + Returns: + Tuple of (is_deployed, current_version_id) + """ + ... + + def delete_function( + self, + func_name: str, + ) -> None: + """Delete the function/service. + + Args: + func_name: Function/service name to delete + """ + ... + + @staticmethod + def get_full_function_name(project_name: str, location: str, func_name: str) -> str: + """Generate the fully qualified function name for GCP API calls. + + Args: + project_name: GCP project ID + location: GCP region/location + func_name: Function name + + Returns: + Fully qualified function name in GCP format + """ + ... + + def function_exists(self, project_name: str, location: str, func_name: str) -> Any: + """Check whether the function or service exists. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function or service name. + + Returns: + True if the resource exists, otherwise False. + """ + ... + + def download_execution_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict, + ) -> None: + """Populate provider execution times for completed invocations. + + Args: + function_name: Function or service name. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + requests: Invocation results keyed by request ID. + """ + ... + + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """Populate metrics with data from cloud monitoring. + + Args: + function_name: Function or service name. + start_time: Start timestamp for metric collection. + end_time: End timestamp for metric collection. + metrics: Dictionary mapping metric names to found values. + """ + ... + + +class CloudFunctionGen1Strategy(DeploymentStrategy): + """Deployment strategy for Google Cloud Functions Gen1.""" + + def __init__(self, storage: GCPStorage, config: GCPConfig, logging_handlers: ColoredWrapper): + """Initialize strategy with reference to config + and main GCP instance loggers. + + Args: + storage: GCP storage instance + config: GPC configuration + logging: main logging handlers for status reporting + """ + super().__init__() + self.storage = storage + self.config = config + self.logging = logging_handlers + + self.function_client = build("cloudfunctions", "v1", cache_discovery=False) + + @staticmethod + def get_full_function_name(project_name: str, location: str, func_name: str) -> str: + """Build the fully qualified Cloud Functions resource name. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. + + Returns: + Fully qualified Cloud Functions resource name. + """ + return f"projects/{project_name}/locations/{location}/functions/{func_name}" + + def function_exists(self, project_name: str, location: str, func_name: str) -> Any: + """Check whether the Cloud Function exists. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Function name. + + Returns: + True if the function exists, otherwise False. + """ + full_resource_name = self.get_full_function_name(project_name, location, func_name) + get_req = ( + self.function_client.projects().locations().functions().get(name=full_resource_name) + ) + + try: + self._execute_with_retry(self.logging, get_req) + return True + except HttpError as e: + if e.resp.status == 404: + return False + raise RuntimeError(f"Error checking function existence: {e}") from None + + def create( + self, + func_name: str, + code_package: Benchmark, + function_cfg: FunctionConfig, + envs: Dict, + container_uri: str | None, + ) -> None: + """Create a Cloud Function Gen1.""" + project_name = self.config.project_name + location = self.config.region + full_func_name = self.get_full_function_name(project_name, location, func_name) + language_runtime = code_package.language_version + timeout = code_package.benchmark_config.timeout + memory = code_package.benchmark_config.memory + architecture = function_cfg.architecture.value + + package = code_package.code_location + if package is None: + raise RuntimeError("Code location is not set for GCP deployment") + + code_package_name = cast(str, os.path.basename(package)) + code_package_name = f"{architecture}-{code_package_name}" + code_bucket = self.storage.get_bucket(Resources.StorageBucketType.DEPLOYMENT) + code_prefix = os.path.join(code_package.benchmark, code_package_name) + self.storage.upload(code_bucket, package, code_prefix) + + self.logging.info("Uploading function {} code to {}".format(func_name, code_bucket)) + + dep_config = self.config.deployment_config.function_gen1_config + + function_body = { + "name": full_func_name, + "entryPoint": ( + "org.serverlessbench.Handler" + if code_package.language == Language.JAVA + else "handler" + ), + "runtime": code_package.language_name + language_runtime.replace(".", ""), + "availableMemoryMb": memory, + "timeout": str(timeout) + "s", + "httpsTrigger": {}, + "ingressSettings": "ALLOW_ALL", + "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix, + "environmentVariables": envs, + "minInstances": dep_config.min_instances, + "maxInstances": dep_config.max_instances, + } + + create_req = ( + self.function_client.projects() + .locations() + .functions() + .create( + location="projects/{project_name}/locations/{location}".format( + project_name=project_name, location=location + ), + body=function_body, # type: ignore[arg-type] + ) + ) + + self._execute_with_retry(self.logging, create_req) + self.logging.info(f"Function {func_name} is creating - GCP build&deployment is started!") + + def update_code( + self, + function: "GCPFunction", + code_package: Benchmark, + envs: Dict, + container_uri: str | None, + ) -> None: + """Update Cloud Function Gen1 code.""" + if code_package.code_location is None: + raise RuntimeError("Code location is not set for GCP deployment") + + language_runtime = code_package.language_version + function_cfg = FunctionConfig.from_benchmark(code_package) + architecture = function_cfg.architecture.value + code_package_name = os.path.basename(code_package.code_location) + code_package_name = f"{architecture}-{code_package_name}" + + bucket = function.code_bucket(code_package.benchmark, self.storage) + self.storage.upload(bucket, code_package.code_location, code_package_name) + + self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}") + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + + dep_config = self.config.deployment_config.function_gen1_config + + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + body={ + "name": full_func_name, + "entryPoint": ( + "org.serverlessbench.Handler" + if code_package.language == Language.JAVA + else "handler" + ), + "runtime": code_package.language_name + language_runtime.replace(".", ""), + "availableMemoryMb": function.config.memory, + "timeout": str(function.config.timeout) + "s", + "httpsTrigger": {}, + "sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name, + "environmentVariables": envs, + "minInstances": dep_config.min_instances, + "maxInstances": dep_config.max_instances, + }, + ) + ) + + self._execute_with_retry(self.logging, req) + self.logging.info(f"Function {function.name} code update initiated") + + def update_config(self, function: "GCPFunction", envs: Dict) -> int: + """Update Cloud Function Gen1 configuration.""" + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + + dep_config = self.config.deployment_config.function_gen1_config + + body = { + "availableMemoryMb": function.config.memory, + "timeout": str(function.config.timeout) + "s", + "minInstances": dep_config.min_instances, + "maxInstances": dep_config.max_instances, + } + + if len(envs) > 0: + body["environmentVariables"] = envs + update_mask = "availableMemoryMb,timeout,environmentVariables,minInstances,maxInstances" + else: + update_mask = "availableMemoryMb,timeout,minInstances,maxInstances" + + req = ( + self.function_client.projects() + .locations() + .functions() + .patch( + name=full_func_name, + updateMask=update_mask, + body=body, # type: ignore[arg-type] + ) + ) + + res = self._execute_with_retry(self.logging, req) + expected_version = int(res["metadata"]["versionId"]) + + self.logging.info(f"Function {function.name} configuration update initiated") + + # Wait for deployment to become ACTIVE with expected version + # Configuration updates don't trigger builds but still need deployment time + current_version = self._wait_for_active_status(function.name, expected_version, timeout=60) + self.logging.info("Published new function configuration.") + + return current_version + + def wait_for_deployment( + self, + func_name: str, + ) -> None: + """Wait for Cloud Function Gen1 deployment via build polling.""" + # Poll build status until completion or failure + build_found = self._wait_for_build_and_poll(func_name) + if not build_found: + raise RuntimeError(f"No build operation found for {func_name}!") + + # Wait for deployment to become ACTIVE + self._wait_for_active_status(func_name) + + def allow_public_access(self, project_name: str, location: str, func_name: str) -> None: + """Set IAM policy for public access on Cloud Function Gen1.""" + + full_resource_name = self.get_full_function_name(project_name, location, func_name) + + allow_unauthenticated_req = ( + self.function_client.projects() + .locations() + .functions() + .setIamPolicy( + resource=full_resource_name, + body={ + "policy": { + "bindings": [ + { + "role": "roles/cloudfunctions.invoker", + "members": ["allUsers"], + } + ] + } + }, + ) + ) + + try: + self._execute_with_retry(self.logging, allow_unauthenticated_req) + except HttpError as e: + raise RuntimeError( + f"Failed to configure function {full_resource_name} " + f"for unauthenticated invocations! Error: {e}" + ) + + def create_trigger(self, func_name: str) -> str: + """Create HTTP trigger and return the invoke URL for Cloud Function. + + Args: + func_name: Function name + + Returns: + HTTP trigger URL + """ + project_name = self.config.project_name + location = self.config.region + full_func_name = self.get_full_function_name(project_name, location, func_name) + get_req = self.function_client.projects().locations().functions().get(name=full_func_name) + func_details = self._execute_with_retry(self.logging, get_req) + invoke_url = func_details["httpsTrigger"]["url"] + self.logging.info(f"Function {func_name} - HTTP trigger URL: {invoke_url}") + return invoke_url + + def update_envs(self, full_function_name: str, envs: Dict) -> Dict: + """Merge new environment variables with existing Cloud Function environment. + + Args: + full_function_name: Fully qualified function name + envs: New environment variables to add/update + + Returns: + Merged environment variables dictionary + """ + get_req = ( + self.function_client.projects().locations().functions().get(name=full_function_name) + ) + response = self._execute_with_retry(self.logging, get_req) + + # preserve old variables while adding new ones. + # but for conflict, we select the new one + if "environmentVariables" in response: + envs = {**response["environmentVariables"], **envs} # type: ignore[typeddict-item] + + return envs + + def generate_runtime_envs(self) -> Dict: + """Return runtime environment variables for Gen1 deployments. + + Returns: + Empty dictionary because Gen1 does not require runtime overrides. + """ + return {} + + def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: + """Check if Cloud Function is deployed and optionally verify its version. + + Args: + func_name: Name of the function to check + versionId: Optional specific version ID to verify (-1 to check any) + + Returns: + Tuple of (is_deployed, current_version_id) + """ + name = self.get_full_function_name(self.config.project_name, self.config.region, func_name) + status_req = self.function_client.projects().locations().functions().get(name=name) + status_res = self._execute_with_retry(self.logging, status_req) + if versionId == -1: + return (status_res["status"] == "ACTIVE", status_res["versionId"]) + else: + return (status_res["versionId"] == versionId, status_res["versionId"]) + + def delete_function(self, func_name: str) -> None: + """Delete a Google Cloud Function. + + Args: + func_name: Name of the function to delete + """ + self.logging.info(f"Deleting function {func_name}") + + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + + try: + delete_req = ( + self.function_client.projects().locations().functions().delete(name=full_func_name) + ) + self._execute_with_retry(self.logging, delete_req) + self.logging.info(f"Function {func_name} deleted successfully") + except HttpError as e: + if e.resp.status == 404: + self.logging.error(f"Function {func_name} does not exist!") + else: + self.logging.error(f"Failed to delete function {func_name}: {e}") + raise + + def download_execution_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict, + ) -> None: + """Download execution times for Cloud Functions Gen1 from Cloud Logging.""" + + from google.api_core import exceptions + from time import sleep + import google.cloud.logging as gcp_logging + + def wrapper(gen): + """Yield entries while backing off on transient quota errors.""" + while True: + try: + yield next(gen) + except StopIteration: + break + except exceptions.ResourceExhausted: + self.logging.info("Google Cloud resources exhausted, sleeping 30s") + sleep(30) + + timestamps = [] + for timestamp in [start_time, end_time + 5]: + utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) + timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + + logging_client = gcp_logging.Client() + logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions") + invocations = logger.list_entries( + filter_=( + f'resource.labels.function_name = "{function_name}" ' + f'timestamp >= "{timestamps[0]}" ' + f'timestamp <= "{timestamps[1]}"' + ), + page_size=1000, + ) + + invocations_processed = 0 + if hasattr(invocations, "pages"): + pages = list(wrapper(invocations.pages)) + else: + pages = [list(wrapper(invocations))] + + entries = 0 + for page in pages: + for invoc in page: + entries += 1 + if "execution took" not in invoc.payload: + continue + trace_id = self._extract_trace_id(invoc) + if trace_id is None or trace_id not in requests: + continue + + regex_result = re.search(r"\d+ ms", invoc.payload) + assert regex_result + exec_time = regex_result.group().split()[0] + requests[trace_id].provider_times.execution = int(exec_time) * 1000 + invocations_processed += 1 + + self.logging.info( + f"GCP Gen1: Received {entries} entries, found time metrics for " + f"{invocations_processed} out of {len(requests.keys())} invocations." + ) + + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """ + Download monitoring metrics for Cloud Functions Gen1. + Use metrics to find estimated values for maximum memory used, active instances + and network traffic. + https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions + """ + + # Set expected metrics here + available_metrics = ["execution_times", "user_memory_bytes", "network_egress"] + + client = monitoring_v3.MetricServiceClient() + project_name = client.common_project_path(self.config.project_name) + + end_time_nanos, end_time_seconds = math.modf(end_time) + start_time_nanos, start_time_seconds = math.modf(start_time) + + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": int(end_time_seconds) + 60}, + "start_time": {"seconds": int(start_time_seconds)}, + } + ) + + for metric in available_metrics: + + metrics[metric] = [] + + list_request = monitoring_v3.ListTimeSeriesRequest( + name=project_name, + filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric), + interval=interval, + ) + + results = client.list_time_series(list_request) + for result in results: + if result.resource.labels.get("function_name") == function_name: + for point in result.points: + metrics[metric] += [ + { + "mean_value": point.value.distribution_value.mean, + "executions_count": point.value.distribution_value.count, + } + ] + + @staticmethod + def _extract_trace_id(entry) -> Optional[str]: + """Extract the trace ID from a Cloud Functions log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Trace ID if present, otherwise ``None``. + """ + trace = getattr(entry, "trace", None) + if not isinstance(trace, str) or "/traces/" not in trace: + return None + return trace.rsplit("/traces/", 1)[1] + + def _wait_for_build_and_poll( + self, func_name: str, timeout: int = 300, poll_interval: int = 2 + ) -> bool: + """Wait for build to start, get build name, and poll until completion. + + Since GCP operations typically don't immediately return a build name, this function + waits for the build to start, retrieves the build name from the function's + metadata, and then polls the build status. + + Args: + func_name: Name of the function being built + timeout: Maximum time to wait in seconds (default: 300) + poll_interval: Seconds between polling attempts (default: 2) + + Returns: + True if a build was found and completed successfully, False if no build was found + + Raises: + RuntimeError: If build fails + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + begin = time.time() + build_name = None + previous_build_id = None + + # First, try to get the current build ID to compare against + try: + get_req = ( + self.function_client.projects().locations().functions().get(name=full_func_name) + ) + func_details = self._execute_with_retry(self.logging, get_req) + if "buildId" in func_details: + previous_build_id = func_details["buildId"] + except HttpError: + pass + + # Wait for build to start and get build name + self.logging.info(f"Waiting for build to start for function {func_name}...") + while build_name is None: + if time.time() - begin > timeout: + self.logging.warning( + f"No build found for {func_name} after {timeout}s - " + "might be a configuration-only update" + ) + return False + + try: + # Get function details to find the build + get_req = ( + self.function_client.projects().locations().functions().get(name=full_func_name) + ) + func_details = self._execute_with_retry(self.logging, get_req) + + # Check if there's a new build in progress + if "buildId" in func_details: + build_id = func_details["buildId"] + # Only consider it a new build if it's different from the previous one + if previous_build_id is None or build_id != previous_build_id: + # Construct build name from build ID + build_name = ( + f"projects/{self.config.project_name}/locations/" + f"{self.config.region}/builds/{build_id}" + ) + self.logging.info(f"Found build {build_id} for function {func_name}!") + break + except HttpError as e: + self.logging.debug(f"Error getting function details: {e}") + + time.sleep(poll_interval) + + # Now poll the build status + if build_name: + self._poll_build_status(build_name, func_name, timeout) + return True + + return False + + def _wait_for_active_status( + self, func_name: str, expected_version: Optional[int] = None, timeout: int = 60 + ) -> int: + """Wait for function to reach ACTIVE status after build completes. + + After a build completes, the function may be in DEPLOY_IN_PROGRESS state + for a short time. This function polls until the status becomes ACTIVE. + Furthermore, we handle HTTP errors: + * 503 / 429 transient backend errors — GCP Cloud Functions v1 + can periodically returns these; they are not deployment failures. + + Args: + func_name: Name of the function to check + expected_version: Optional version ID to verify (None to skip version check) + timeout: Maximum time to wait in seconds (default: 60) + + Returns: + Current version ID of the function + + Raises: + RuntimeError: If deployment fails or timeout is reached + """ + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + begin = time.time() + last_status: Optional[str] = None + + self.logging.info(f"Waiting for function {func_name} to become ACTIVE...") + + while True: + + elapsed = time.time() - begin + if elapsed > timeout: + raise RuntimeError( + f"Timeout waiting for function {func_name} to become ACTIVE " + f"after {elapsed:.0f}s. Last status: {last_status}" + ) + + get_req = ( + self.function_client.projects().locations().functions().get(name=full_func_name) + ) + func_details = self._execute_with_retry(self.logging, get_req) + + status = func_details["status"] + current_version = int(func_details["versionId"]) + + if status != last_status: + last_status = status + + if status == "ACTIVE": + # Check version if specified + if expected_version is not None and current_version != expected_version: + self.logging.warning( + f"Function {func_name} is ACTIVE but version mismatch: " + f"expected {expected_version}, got {current_version}" + ) + # Continue waiting as version might still be updating + else: + self.logging.info(f"Function {func_name} is ACTIVE (version {current_version})") + return current_version + elif status == "DEPLOY_IN_PROGRESS": + self.logging.debug(f"Function {func_name} deployment in progress...") + else: + # Unexpected status + self.logging.error(f"Function {func_name} has unexpected status: {status}") + raise RuntimeError( + f"Function {func_name} deployment failed with status: {status}" + ) from None + + time.sleep(2) + + def _poll_build_status(self, build_name: str, func_name: str, timeout: int = 300) -> None: + """Poll build operation until completion or failure. + + Monitors a Cloud Build operation, waiting for it to complete successfully + or fail. Provides detailed error information if the build fails. + + Args: + build_name: Fully qualified build name from GCP API + func_name: Function name for logging purposes + timeout: Maximum time to wait in seconds (default: 300) + + Raises: + RuntimeError: If build fails or timeout is reached + """ + build_client = cloudbuild_v1.CloudBuildClient() + begin = time.time() + + while True: + build_status = build_client.get_build(name=build_name) + + if build_status.status == cloudbuild_v1.Build.Status.SUCCESS: + self.logging.info(f"Function {func_name} - build completed successfully!") + break + elif build_status.status == cloudbuild_v1.Build.Status.FAILURE: + self.logging.error(f"Failed to build function: {func_name}") + self.logging.error(f"Reasons: {build_status.failure_info.detail}") + self.logging.error(f"URL for detailed error: {build_status.log_url}") + raise RuntimeError(f"Build failed for function {func_name}!") from None + elif build_status.status in ( + cloudbuild_v1.Build.Status.CANCELLED, + cloudbuild_v1.Build.Status.TIMEOUT, + ): + self.logging.error(f"Build was cancelled or timed out for function: {func_name}") + self.logging.error(f"URL for detailed error: {build_status.log_url}") + raise RuntimeError(f"Build failed for function {func_name}!") from None + + if time.time() - begin > timeout: + self.logging.error( + f"Failed to build function: {func_name} after {timeout} seconds!" + ) + raise RuntimeError(f"Build timeout for function {func_name}!") from None + + time.sleep(3) + + +class RunContainerStrategy(DeploymentStrategy): + """Deployment strategy for Google Cloud Run containers (Gen1).""" + + def __init__(self, config: GCPConfig, logging_handlers: ColoredWrapper): + """Initialize strategy with reference to config + and main GCP instance loggers. + + Args: + config: GPC configuration + logging: main logging handlers for status reporting + """ + # Container-based functions are created via run-client + self.run_client = build("run", "v2", cache_discovery=False) + self.config = config + self.logging = logging_handlers + + @staticmethod + def get_full_function_name(project_name: str, location: str, service_name: str) -> str: + """Build the fully qualified Cloud Run service resource name. + + Args: + project_name: GCP project ID. + location: GCP region/location. + service_name: Cloud Run service name. + + Returns: + Fully qualified Cloud Run service resource name. + """ + return f"projects/{project_name}/locations/{location}/services/{service_name}" + + def function_exists(self, project_name: str, location: str, func_name: str) -> Any: + """Check whether the Cloud Run service exists. + + Args: + project_name: GCP project ID. + location: GCP region/location. + func_name: Cloud Run service name. + + Returns: + True if the service exists, otherwise False. + """ + full_resource_name = self.get_full_function_name(project_name, location, func_name) + get_req = self.run_client.projects().locations().services().get(name=full_resource_name) + + try: + self._execute_with_retry(self.logging, get_req) + return True + except HttpError: + return False + + def _transform_service_envs(self, envs: dict) -> list: + """Convert environment variables into the Cloud Run API format. + + Args: + envs: Key-value environment mapping. + + Returns: + List of Cloud Run environment variable descriptors. + """ + return [{"name": k, "value": v} for k, v in envs.items()] + + def _service_body( + self, + benchmark_config: BenchmarkConfig | FunctionConfig, + envs_list: Dict, + container_uri: str, + ) -> Dict: + """Build the Cloud Run service body for create and update requests. + + Args: + benchmark_config: Benchmark or function configuration providing memory and timeout. + envs_list: Environment variables to inject into the container. + container_uri: Container image URI to deploy. + + Returns: + Cloud Run service body payload. + """ + + dep_config = self.config.deployment_config.container_config + + timeout = benchmark_config.timeout + memory = benchmark_config.memory + + execution_environment = f"EXECUTION_ENVIRONMENT_{dep_config.environment.upper()}" + + return { + "template": { + "containers": [ + { + "image": container_uri, # type: ignore[typeddict-item] + "ports": [{"containerPort": 8080}], + "env": self._transform_service_envs(envs_list), + "resources": { + "limits": { + "memory": f"{memory if memory >= 512 else 512}Mi", + "cpu": str(dep_config.vcpus), + }, + "cpuIdle": dep_config.cpu_throttle, + "startupCpuBoost": dep_config.cpu_boost, + }, + } + ], + "timeout": f"{timeout}s", + "maxInstanceRequestConcurrency": dep_config.gcp_concurrency, + "execution_environment": execution_environment, + }, + "scaling": { + "minInstanceCount": dep_config.min_instances, + "maxInstanceCount": dep_config.max_instances, + }, + "ingress": "INGRESS_TRAFFIC_ALL", + } + + def create( + self, + func_name: str, + code_package: Benchmark, + function_cfg: FunctionConfig, + envs: Dict, + container_uri: str | None, + ) -> None: + """Create a Cloud Run service.""" + if container_uri is None: + raise RuntimeError("Container URI is required for Cloud Run deployment") + + project_name = self.config.project_name + location = self.config.region + + self.logging.info( + f"Deploying GCP Cloud Run container service {func_name} from {container_uri}" + ) + + parent = f"projects/{project_name}/locations/{location}" + service_body = self._service_body(code_package.benchmark_config, envs, container_uri) + create_req = ( + self.run_client.projects() + .locations() + .services() + .create( + parent=parent, + serviceId=func_name, + body=service_body, # type: ignore[arg-type] + ) + ) + + self._operation_response = create_req.execute() + self.logging.info( + f"Creating Cloud Run service {func_name}, waiting for operation completion..." + ) + + def update_code( + self, + function: GCPFunction, + code_package: Benchmark, + envs: Dict, + container_uri: str | None, + ) -> None: + """Update Cloud Run service code.""" + if container_uri is None: + raise RuntimeError("Container URI is required for Cloud Run deployment") + + full_service_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + + self.logging.info(f"Updating Cloud Run service {function.name} with image: {container_uri}") + + service_body = self._service_body(code_package.benchmark_config, envs, container_uri) + + # We are using the broad "template" for updateMask. + # We noticed that when using selective updates with `template.containers`, + # GCP would not create a new revision when using the same image tag - + # even when the tag now links to a different digest. + req = ( + self.run_client.projects() + .locations() + .services() + .patch( # type: ignore[arg-type] + name=full_service_name, + body=service_body, # type: ignore[arg-type] + updateMask="template", + ) + ) + + self._operation_response = req.execute() + self.logging.info( + f"Patch request sent for Cloud Run service {function.name}, waiting for operation..." + ) + + def update_config(self, function: GCPFunction, envs: Dict) -> int: + """Update Cloud Run service configuration.""" + + full_func_name = self.get_full_function_name( + self.config.project_name, self.config.region, function.name + ) + + # We are using the broad "template" for updateMask. + # We noticed that when using selective updates with `template.containers`, + # GCP would not create a new revision when using the same image tag - + # even when the tag now links to a different digest. + + container_uri = function.container_uri() + if container_uri is None: + raise RuntimeError("Container URI is required for Cloud Run deployment") + + service_body = self._service_body(function.config, envs, container_uri) + req = ( + self.run_client.projects() + .locations() + .services() + .patch( # type: ignore[arg-type] + name=full_func_name, + body=service_body, # type: ignore[arg-type] + updateMask="template", + ) + ) + + self._operation_response = req.execute() + self.logging.info( + f"Patch request sent for Cloud Run config {function.name}, waiting for operation..." + ) + + self.wait_for_deployment(function.name) + + return 0 + + def wait_for_deployment( + self, + func_name: str, + ) -> None: + """Wait for Cloud Run deployment via operation wait.""" + if not hasattr(self, "_operation_response"): + raise RuntimeError("No operation to wait for - create/update not called") + + op_name = self._operation_response["name"] + self.logging.info(f"Waiting for operation: {op_name}") + op_res = self.run_client.projects().locations().operations().wait(name=op_name).execute() + + if "error" in op_res: + raise RuntimeError(f"Cloud Run deployment failed: {op_res['error']}") + + # Get service details to check revision + full_service_name = self.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + svc = ( + self.run_client.projects().locations().services().get(name=full_service_name).execute() + ) + + latest_revision = svc.get("latestReadyRevision", "unknown") + self.logging.info( + f"Cloud Run service {func_name} deployed. " f"Latest revision: {latest_revision}" + ) + + delattr(self, "_operation_response") + + def allow_public_access(self, project_name: str, location: str, func_name: str) -> None: + """Set IAM policy for public access on Cloud Run.""" + + full_resource_name = self.get_full_function_name(project_name, location, func_name) + + allow_unauthenticated_req = ( + self.run_client.projects() + .locations() + .services() + .setIamPolicy( + resource=full_resource_name, + body={ + "policy": {"bindings": [{"role": "roles/run.invoker", "members": ["allUsers"]}]} + }, + ) + ) + + try: + self._execute_with_retry(self.logging, allow_unauthenticated_req) + except HttpError as e: + raise RuntimeError( + f"Failed to configure function {full_resource_name} " + f"for unauthenticated invocations! Error: {e}" + ) + + def create_trigger(self, func_name: str) -> str: + """Create HTTP trigger and return the invoke URL for Cloud Run service. + + Args: + func_name: Service name + + Returns: + HTTP trigger URL + """ + project_name = self.config.project_name + location = self.config.region + service_id = func_name.lower() + full_service_name = self.get_full_function_name(project_name, location, service_id) + + self.logging.info(f"Waiting for service {full_service_name} to be ready...") + deployed = False + begin = time.time() + while not deployed: + svc = ( + self.run_client.projects() + .locations() + .services() + .get(name=full_service_name) + .execute() + ) + condition = svc.get("terminalCondition", {}) + if condition.get("type") == "Ready" and condition.get("state") == "CONDITION_SUCCEEDED": + deployed = True + else: + time.sleep(3) + + if time.time() - begin > 300: + self.logging.error(f"Failed to deploy service: {func_name}") + raise RuntimeError("Deployment timeout!") + + self.logging.info(f"Service {func_name} - deployed!") + invoke_url = svc["uri"] + return invoke_url + + def update_envs(self, full_function_name: str, envs: Dict) -> Dict: + """Merge new environment variables with existing Cloud Run service environment. + + Args: + full_function_name: Fully qualified service name + envs: New environment variables to add/update + + Returns: + Merged environment variables dictionary + """ + get_req = ( + self.run_client.projects().locations().services().get(name=full_function_name) + ) # type: ignore + response = get_req.execute() + + existing_envs = {} + if "template" in response and "containers" in response["template"]: + container = response["template"]["containers"][0] + if "env" in container: + for e in container["env"]: + existing_envs[e["name"]] = e["value"] + + # Merge: new overrides old + envs = {**existing_envs, **envs} + return envs + + def generate_runtime_envs(self) -> Dict: + """Return runtime environment variables for Cloud Run deployments. + + Returns: + Runtime environment variables for Gunicorn worker configuration. + """ + dep_config = self.config.deployment_config.container_config + return { + "GUNICORN_WORKERS": str(dep_config.worker_concurrency), + "GUNICORN_THREADS": str(dep_config.worker_threads), + } + + def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: + """Check if Cloud Run service is deployed. + + Args: + func_name: Name of the service to check + versionId: Ignored for Cloud Run (always returns 0) + + Returns: + Tuple of (is_deployed, 0) + """ + service_name = func_name.replace("_", "-").lower() + name = self.get_full_function_name( + self.config.project_name, self.config.region, service_name + ) + try: + svc = self.run_client.projects().locations().services().get(name=name).execute() + conditions = svc.get("terminalCondition", {}) + is_ready = conditions.get("type", "") == "Ready" + return (is_ready, 0) + except HttpError: + return (False, -1) + + def delete_function(self, func_name: str) -> None: + """Delete a Cloud Run service. + + Args: + func_name: Name of the service to delete + """ + self.logging.info(f"Deleting Cloud Run service {func_name}") + + service_name = func_name.replace("_", "-").lower() + full_service_name = self.get_full_function_name( + self.config.project_name, self.config.region, service_name + ) + + try: + delete_req = ( + self.run_client.projects().locations().services().delete(name=full_service_name) + ) + delete_req.execute() + self.logging.info(f"Cloud Run service {func_name} deleted successfully") + except HttpError as e: + if e.resp.status == 404: + self.logging.error(f"Cloud Run service {func_name} does not exist!") + else: + self.logging.error(f"Failed to delete Cloud Run service {func_name}: {e}") + raise + + def download_execution_metrics( + self, + function_name: str, + start_time: int, + end_time: int, + requests: Dict, + ) -> None: + """Download execution times for Cloud Run from request logs.""" + import google.cloud.logging as gcp_logging + + service_name = function_name.replace("_", "-").lower() + + timestamps = [] + for timestamp in [start_time, end_time + 1]: + utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) + timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) + + logging_client = gcp_logging.Client() + entries = logging_client.list_entries( + filter_=( + 'resource.type = "cloud_run_revision" ' + 'logName = "projects/' + f'{self.config.project_name}/logs/run.googleapis.com%2Frequests" ' + f'resource.labels.service_name = "{service_name}" ' + f'timestamp >= "{timestamps[0]}" ' + f'timestamp <= "{timestamps[1]}"' + ), + page_size=1000, + ) + + found_metrics = 0 + total_entries = 0 + for entry in entries: + total_entries += 1 + trace_id = self._extract_trace_id(entry) + if trace_id is None or trace_id not in requests: + continue + + execution_time_us = self._extract_latency_us(entry) + if execution_time_us is None: + continue + + requests[trace_id].provider_times.execution = execution_time_us + found_metrics += 1 + + self.logging.info( + f"GCP Cloud Run: Received {total_entries} log entries, found time metrics for " + f"{found_metrics} out of {len(requests.keys())} invocations." + ) + + def download_metrics( + self, function_name: str, start_time: int, end_time: int, metrics: Dict + ) -> None: + """ + Download monitoring metrics for Cloud Functions Gen1. + Use metrics to find estimated values for maximum memory used, active instances + and network traffic. + https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions + """ + # Set expected metrics here + available_metrics = ["execution_times", "user_memory_bytes", "network_egress"] + # (metric_path, kind) — kind is "distribution" or "int64" + available_metrics = [ + ("container/billable_instance_time", "delta", "double"), # seconds + ("container/instance_count", "gauge", "int64"), + ("container/max_request_concurrencies", "delta", "distribution"), + ("container/memory/utilizations", "delta", "distribution"), # fraction + ("container/cpu/utilizations", "delta", "distribution"), # fraction + ("container/cpu/allocation_time", "delta", "double"), # seconds + ("container/memory/allocation_time", "delta", "double"), # gigabyte-seconds + ("container/network/sent_bytes_count", "delta", "int64"), # bytes (delta) + ("container/network/received_bytes_count", "delta", "int64"), # bytes (delta) + ("container/startup_latencies", "delta", "distribution"), # ms, cold start + ("request_count", "delta", "int64"), + ("request_latencies", "distribution", "distribution"), # ms + ("request_latency/e2e_latencies", "delta", "distribution"), # ms + ("request_latency/ingress_to_region", "delta", "distribution"), # ms + ("request_latency/pending", "delta", "distribution"), # ms + ("request_latency/response_egress", "delta", "distribution"), # ms + ("request_latency/routing", "delta", "distribution"), # ms + ("request_latency/user_execution", "delta", "distribution"), # ms + ] + + client = monitoring_v3.MetricServiceClient() + project_name = client.common_project_path(self.config.project_name) + + _, end_time_seconds = math.modf(end_time) + _, start_time_seconds = math.modf(start_time) + interval = monitoring_v3.TimeInterval( + { + # some metrics are reported with a delay + "end_time": {"seconds": int(end_time_seconds) + 300}, + "start_time": {"seconds": int(start_time_seconds)}, + } + ) + + for metric, kind, value_type in available_metrics: + metrics[metric] = [] + # Filter on resource.type AND service_name server-side — much faster than + # pulling every revision in the project and filtering client-side. + flt = ( + f'metric.type = "run.googleapis.com/{metric}" ' + f'AND resource.type = "cloud_run_revision" ' + f'AND resource.labels.service_name = "{function_name}"' + ) + list_request = monitoring_v3.ListTimeSeriesRequest( + name=project_name, + filter=flt, + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + for result in client.list_time_series(list_request): + revision = result.resource.labels.get("revision_name") + for point in result.points: + if value_type == "distribution": + metrics[metric].append( + { + "kind": kind, + "revision": revision, + "mean_value": point.value.distribution_value.mean, + "squared_deviations": point.value.distribution_value.sum_of_squared_deviation, + "count": point.value.distribution_value.count, + "ts": point.interval.end_time.timestamp(), + } + ) + else: + if value_type == "int64": + value = point.value.int64_value + else: + value = point.value.double_value + metrics[metric].append( + { + "revision": revision, + "value": value, + "kind": kind, + "ts": point.interval.end_time.timestamp(), + } + ) + + @staticmethod + def _extract_trace_id(entry) -> Optional[str]: + """Extract the trace ID from a Cloud Run log entry. + + Args: + entry: Log entry to inspect. + + Returns: + Trace ID if present, otherwise ``None``. + """ + trace = getattr(entry, "trace", None) + if not isinstance(trace, str) or "/traces/" not in trace: + return None + return trace.rsplit("/traces/", 1)[1] + + @staticmethod + def _extract_latency_us(entry) -> Optional[int]: + """Extract request latency from a Cloud Run log entry in microseconds. + + Args: + entry: Log entry to inspect. + + Returns: + Request latency in microseconds, or ``None`` if unavailable. + """ + http_request = getattr(entry, "http_request", None) + if http_request is None: + return None + + latency = http_request.get("latency") + if not isinstance(latency, str): + return None + + try: + return int(float(latency[:-1]) * 1_000_000) + except (ValueError, TypeError): + return None + + class GCP(System): """Google Cloud Platform serverless system implementation. @@ -70,11 +1613,6 @@ class GCP(System): logging_handlers: Logging configuration for status reporting """ - """ - Google API is not the most robust - sometimes we need to retry REST operations. - """ - TRANSIENT_HTTP_CODES = frozenset({429, 503}) - def __init__( self, system_config: SeBSConfig, @@ -156,88 +1694,91 @@ def initialize( config: Additional system-specific configuration parameters resource_prefix: Optional prefix for resource naming to avoid conflicts """ - self.function_client = build("cloudfunctions", "v1", cache_discovery=False) + self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) - def get_function_client(self): - """Get the Google Cloud Functions API client. + storage = cast(GCPStorage, self._system_resources.get_storage()) - The client is initialized during the `initialize` call. + # Initialize deployment strategies + self.cloud_function_gen1_strategy = CloudFunctionGen1Strategy( + storage, self._config, self.logging + ) + self.run_container_strategy = RunContainerStrategy(self._config, self.logging) + + self.gcr_client = GCRContainer(self.system_config, self.config, self.docker_client) + + @property + def container_client(self) -> GCRContainer | None: + """Get the GCP-specific container manager that uses Artifact Registry. Returns: - Initialized Cloud Functions API client + Container manager instance. """ - return self.function_client + return self.gcr_client - def _execute_with_retry( - self, - request, - max_retries: int = 5, - base_delay: float = 1.0, - max_delay: float = 32.0, - ) -> Dict: - """Execute a googleapiclient request with retry logic for transient errors. + def get_function_client(self): + """Get the Cloud Functions v1 API client. - Handles transient HTTP errors (503, 429) by retrying with exponential backoff - and jitter. Non-transient errors are raised immediately without retry. + Returns: + Google Cloud Functions v1 API client + """ + return self.cloud_function_gen1_strategy.function_client - Args: - request: googleapiclient request object to execute - max_retries: Maximum number of retry attempts (default: 5) - base_delay: Base delay in seconds for exponential backoff (default: 1.0) - max_delay: Maximum delay between retries in seconds (default: 32.0) + def get_run_client(self): + """Get the Cloud Run v2 API client. Returns: - Response dictionary from the API call + Google Cloud Run v2 API client + """ + return self.run_container_strategy.run_client - Raises: - HttpError: If the request fails with a non-transient error or after - exhausting all retry attempts + def _get_deployment_config( + self, deployment_type: FunctionDeploymentType + ) -> Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig]: + """Return the deployment config that matches the requested deployment type. + + Args: + deployment_type: Deployment type to resolve. + + Returns: + Deployment configuration object for the requested type. """ - attempt = 0 - last_error = None + if deployment_type.is_container: + return self.config.deployment_config.container_config + else: + if deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + return self.config.deployment_config.function_gen1_config + else: + return self.config.deployment_config.function_gen2_config - while attempt <= max_retries: - try: - result = request.execute() - if attempt > 0: - self.logging.info(f"Request succeeded after {attempt} retries") - return result - except HttpError as e: - status_code = e.resp.status - last_error = e + def is_configuration_changed(self, cached_function: Function, benchmark: Benchmark) -> bool: + """ + Override the default implementation. - # Only retry on transient errors - if status_code not in GCP.TRANSIENT_HTTP_CODES: - raise + In addition to checking for timeout and language runtime - the shared config + - we also check if the GCP-specific config has changed. - # Check if we have retries left - if attempt >= max_retries: - self.logging.error( - f"Max retries ({max_retries}) exhausted, failing with status {status_code}" - ) - raise + Args: + cached_function: Previously cached function configuration + benchmark: Current benchmark configuration to compare against - # Calculate delay with exponential backoff and jitter - delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) + Returns: + True if configuration has changed and function needs updating + """ + changed = super().is_configuration_changed(cached_function, benchmark) - if attempt == 0: - self.logging.warning( - f"Transient error {status_code}, retrying " - f"(attempt {attempt + 1}/{max_retries})" - ) - else: - self.logging.info( - f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s backoff" - ) + # Check if deployment config has changed + cached_function = cast(GCPFunction, cached_function) + current_dep_config = self._get_deployment_config(cached_function.deployment_type) - time.sleep(delay) - attempt += 1 + if cached_function.deployment_config != current_dep_config: + self.logging.info( + f"Deployment config has changed for {cached_function.name}, " + "will update configuration." + ) + changed = True - # This should not be reached, but just in case - if last_error: - raise last_error - raise RuntimeError("Unexpected state in retry logic") + return changed def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None @@ -257,13 +1798,22 @@ def default_function_name( """ # Create function name resource_id = resources.resources_id if resources else self.config.resources.resources_id - func_name = "sebs-{}-{}-{}-{}".format( + # Extract benchmark number (e.g., "110" from "110.dynamic-html") + benchmark_number = code_package.benchmark.split(".")[0] + func_name = "sebs-{}-{}-{}-{}-{}".format( resource_id, - code_package.benchmark, + benchmark_number, code_package.language_name, code_package.language_version, + code_package.architecture, + ) + if code_package.container_deployment: + func_name = f"{func_name}-docker" + return ( + GCP.format_function_name(func_name) + if not code_package.container_deployment + else func_name.replace(".", "-") ) - return GCP.format_function_name(func_name) @staticmethod def format_function_name(func_name: str) -> str: @@ -285,201 +1835,6 @@ def format_function_name(func_name: str) -> str: func_name = func_name.replace(".", "_") return func_name - def _poll_build_status(self, build_name: str, func_name: str, timeout: int = 300) -> None: - """Poll build operation until completion or failure. - - Monitors a Cloud Build operation, waiting for it to complete successfully - or fail. Provides detailed error information if the build fails. - - Args: - build_name: Fully qualified build name from GCP API - func_name: Function name for logging purposes - timeout: Maximum time to wait in seconds (default: 300) - - Raises: - RuntimeError: If build fails or timeout is reached - """ - build_client = cloudbuild_v1.CloudBuildClient() - begin = time.time() - - while True: - build_status = build_client.get_build(name=build_name) - - if build_status.status == cloudbuild_v1.Build.Status.SUCCESS: - self.logging.info(f"Function {func_name} - build completed successfully!") - break - elif build_status.status == cloudbuild_v1.Build.Status.FAILURE: - self.logging.error(f"Failed to build function: {func_name}") - self.logging.error(f"Reasons: {build_status.failure_info.detail}") - self.logging.error(f"URL for detailed error: {build_status.log_url}") - raise RuntimeError(f"Build failed for function {func_name}!") from None - elif build_status.status in ( - cloudbuild_v1.Build.Status.CANCELLED, - cloudbuild_v1.Build.Status.TIMEOUT, - ): - self.logging.error(f"Build was cancelled or timed out for function: {func_name}") - self.logging.error(f"URL for detailed error: {build_status.log_url}") - raise RuntimeError(f"Build failed for function {func_name}!") from None - - if time.time() - begin > timeout: - self.logging.error( - f"Failed to build function: {func_name} after {timeout} seconds!" - ) - raise RuntimeError(f"Build timeout for function {func_name}!") from None - - time.sleep(3) - - def _wait_for_build_and_poll( - self, func_name: str, timeout: int = 300, poll_interval: int = 2 - ) -> bool: - """Wait for build to start, get build name, and poll until completion. - - Since GCP operations typically don't immediately return a build name, this function - waits for the build to start, retrieves the build name from the function's - metadata, and then polls the build status. - - Args: - func_name: Name of the function being built - timeout: Maximum time to wait in seconds (default: 300) - poll_interval: Seconds between polling attempts (default: 2) - - Returns: - True if a build was found and completed successfully, False if no build was found - - Raises: - RuntimeError: If build fails - """ - full_func_name = GCP.get_full_function_name( - self.config.project_name, self.config.region, func_name - ) - begin = time.time() - build_name = None - previous_build_id = None - - # First, try to get the current build ID to compare against - try: - get_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) - ) - func_details = self._execute_with_retry(get_req) - if "buildId" in func_details: - previous_build_id = func_details["buildId"] - except HttpError: - pass - - # Wait for build to start and get build name - self.logging.info(f"Waiting for build to start for function {func_name}...") - while build_name is None: - if time.time() - begin > timeout: - self.logging.warning( - f"No build found for {func_name} after {timeout}s - " - "might be a configuration-only update" - ) - return False - - try: - # Get function details to find the build - get_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) - ) - func_details = self._execute_with_retry(get_req) - - # Check if there's a new build in progress - if "buildId" in func_details: - build_id = func_details["buildId"] - # Only consider it a new build if it's different from the previous one - if previous_build_id is None or build_id != previous_build_id: - # Construct build name from build ID - build_name = ( - f"projects/{self.config.project_name}/locations/" - f"{self.config.region}/builds/{build_id}" - ) - self.logging.info(f"Found build {build_id} for function {func_name}!") - break - except HttpError as e: - self.logging.debug(f"Error getting function details: {e}") - - time.sleep(poll_interval) - - # Now poll the build status - if build_name: - self._poll_build_status(build_name, func_name, timeout) - return True - - return False - - def _wait_for_active_status( - self, func_name: str, expected_version: Optional[int] = None, timeout: int = 60 - ) -> int: - """Wait for function to reach ACTIVE status after build completes. - - After a build completes, the function may be in DEPLOY_IN_PROGRESS state - for a short time. This function polls until the status becomes ACTIVE. - Furthermore, we handle HTTP errors: - * 503 / 429 transient backend errors — GCP Cloud Functions v1 - can periodically returns these; they are not deployment failures. - - Args: - func_name: Name of the function to check - expected_version: Optional version ID to verify (None to skip version check) - timeout: Maximum time to wait in seconds (default: 60) - - Returns: - Current version ID of the function - - Raises: - RuntimeError: If deployment fails or timeout is reached - """ - full_func_name = GCP.get_full_function_name( - self.config.project_name, self.config.region, func_name - ) - begin = time.time() - last_status: Optional[str] = None - - self.logging.info(f"Waiting for function {func_name} to become ACTIVE...") - - while True: - - elapsed = time.time() - begin - if elapsed > timeout: - raise RuntimeError( - f"Timeout waiting for function {func_name} to become ACTIVE " - f"after {elapsed:.0f}s. Last status: {last_status}" - ) - - get_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) - ) - func_details = self._execute_with_retry(get_req) - - status = func_details["status"] - current_version = int(func_details["versionId"]) - - if status != last_status: - last_status = status - - if status == "ACTIVE": - # Check version if specified - if expected_version is not None and current_version != expected_version: - self.logging.warning( - f"Function {func_name} is ACTIVE but version mismatch: " - f"expected {expected_version}, got {current_version}" - ) - # Continue waiting as version might still be updating - else: - self.logging.info(f"Function {func_name} is ACTIVE (version {current_version})") - return current_version - elif status == "DEPLOY_IN_PROGRESS": - self.logging.debug(f"Function {func_name} deployment in progress...") - else: - # Unexpected status - self.logging.error(f"Function {func_name} has unexpected status: {status}") - raise RuntimeError( - f"Function {func_name} deployment failed with status: {status}" - ) from None - - time.sleep(2) - def package_code( self, directory: str, @@ -574,51 +1929,13 @@ def package_code( bytes_size, ) - def _allow_public_access(self, func_name: str, full_func_name: str) -> None: - - """Configure GCP function to be publicly accessible. - - Args: - func_name: our function name - full_func_name: GCP name - - Raises: - RuntimeError: - """ - allow_unauthenticated_req = ( - self.function_client.projects() - .locations() - .functions() - .setIamPolicy( - resource=full_func_name, - body={ - "policy": { - "bindings": [ - { - "role": "roles/cloudfunctions.invoker", - "members": ["allUsers"], - } - ] - } - }, - ) - ) - try: - self._execute_with_retry(allow_unauthenticated_req) - except HttpError as e: - raise RuntimeError( - f"Failed to configure function {full_func_name} " - f"for unauthenticated invocations! Error: {e}" - ) - self.logging.info(f"Function {func_name} accepts now unauthenticated invocations!") - def create_function( self, code_package: Benchmark, func_name: str, container_deployment: bool, container_uri: str | None, - ) -> "GCPFunction": + ) -> GCPFunction: """Create a new GCP Cloud Function or update existing one. Deploys a benchmark as a Cloud Function, handling code upload to Cloud Storage, @@ -637,89 +1954,76 @@ def create_function( Raises: NotImplementedError: If container_deployment is True - RuntimeError: If function creation or IAM configuration fails - """ - - if container_deployment: - raise NotImplementedError("Container deployment is not supported in GCP") - - package = code_package.code_location - if package is None: - raise RuntimeError("Code location is not set for GCP deployment") + RuntimeError: If function creation or IAM configuration fails + """ benchmark = code_package.benchmark - language_runtime = code_package.language_version - timeout = code_package.benchmark_config.timeout - memory = code_package.benchmark_config.memory - code_bucket: Optional[str] = None - storage_client = self._system_resources.get_storage() location = self.config.region project_name = self.config.project_name function_cfg = FunctionConfig.from_benchmark(code_package) architecture = function_cfg.architecture.value + code_bucket: Optional[str] = None + dep_config: Union[GCPFunctionGen1Config, GCPFunctionGen2Config, GCPContainerConfig] - code_package_name = os.path.basename(package) - code_package_name = f"{architecture}-{code_package_name}" - code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) - code_prefix = os.path.join(benchmark, code_package_name) - storage_client.upload(code_bucket, package, code_prefix) + if architecture == "arm64": + raise RuntimeError("GCP does not support arm64 deployments") - self.logging.info("Uploading function {} code to {}".format(func_name, code_bucket)) + # Select deployment strategy + strategy = ( + self.run_container_strategy + if container_deployment + else self.cloud_function_gen1_strategy + ) - full_func_name = GCP.get_full_function_name(project_name, location, func_name) - get_req = self.function_client.projects().locations().functions().get(name=full_func_name) + # Check if function/service already exists + function_exists = strategy.function_exists(project_name, location, func_name) - try: - self._execute_with_retry(get_req) - except HttpError: + deployment_type = ( + FunctionDeploymentType.CONTAINER + if container_deployment + else FunctionDeploymentType.FUNCTION_GEN1 + ) - envs = self._generate_function_envs(code_package) + dep_config = self._get_deployment_config(deployment_type) + if not function_exists: + # Create new function/service + envs = { + **self._generate_function_envs(code_package), + **strategy.generate_runtime_envs(), + } - create_req = ( - self.function_client.projects() - .locations() - .functions() - .create( - location="projects/{project_name}/locations/{location}".format( - project_name=project_name, location=location - ), - body={ - "name": full_func_name, - "entryPoint": ( - "org.serverlessbench.Handler" - if code_package.language == Language.JAVA - else "handler" - ), - "runtime": code_package.language_name + language_runtime.replace(".", ""), - "availableMemoryMb": memory, - "timeout": str(timeout) + "s", - "httpsTrigger": {}, - "ingressSettings": "ALLOW_ALL", - "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_prefix, - "environmentVariables": envs, - }, + # Get code bucket for non-container deployments + + strategy.create(func_name, code_package, function_cfg, envs, container_uri) + strategy.wait_for_deployment(func_name) + strategy.allow_public_access(project_name, location, func_name) + + if not container_deployment: + storage_client = self._system_resources.get_storage() + code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) + function = GCPFunction( + func_name, + benchmark, + code_package.hash, + function_cfg, + deployment_type, + dep_config, + code_bucket, + None, + ) + else: + function = GCPFunction( + func_name, + benchmark, + code_package.hash, + function_cfg, + deployment_type, + dep_config, + None, + container_uri, ) - ) - self._execute_with_retry(create_req) - self.logging.info( - f"Function {func_name} is creating - GCP build&deployment is started!" - ) - - # Poll build status until completion or failure - build_found = self._wait_for_build_and_poll(func_name) - if not build_found: - raise RuntimeError(f"No build operation found for {func_name}!") - - # Wait for deployment to become ACTIVE - self._wait_for_active_status(func_name) - - self._allow_public_access(func_name, full_func_name) - - function = GCPFunction( - func_name, benchmark, code_package.hash, function_cfg, code_bucket - ) else: - # if result is not empty, then function does exists + # Function/service exists, update it self.logging.info("Function {} exists on GCP, update the instance.".format(func_name)) function = GCPFunction( @@ -727,17 +2031,23 @@ def create_function( benchmark=benchmark, code_package_hash=code_package.hash, cfg=function_cfg, + deployment_type=deployment_type, bucket=code_bucket, + deployment_config=dep_config, + container_uri=container_uri, ) - self._allow_public_access(func_name, full_func_name) + + strategy.allow_public_access(project_name, location, func_name) self.update_function(function, code_package, container_deployment, container_uri) # Add LibraryTrigger to a new function - from sebs.gcp.triggers import LibraryTrigger + # Not supported on containers + if not container_deployment: + from sebs.gcp.triggers import LibraryTrigger - trigger = LibraryTrigger(func_name, self) - trigger.logging_handlers = self.logging_handlers - function.add_trigger(trigger) + trigger = LibraryTrigger(func_name, self, function.deployment_type) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) return function @@ -760,19 +2070,21 @@ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) RuntimeError: If trigger type is not supported """ from sebs.gcp.triggers import HTTPTrigger + from sebs.gcp.function import GCPFunction if trigger_type == Trigger.TriggerType.HTTP: - - # Get the HTTPS trigger URL - location = self.config.region - project_name = self.config.project_name - full_func_name = GCP.get_full_function_name(project_name, location, function.name) - get_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) + gcp_function = cast(GCPFunction, function) + self.logging.info(f"Function {function.name} - waiting for deployment...") + + # Select deployment strategy + strategy = ( + self.run_container_strategy + if gcp_function.deployment_type.is_container + else self.cloud_function_gen1_strategy ) - func_details = self._execute_with_retry(get_req) - invoke_url = func_details["httpsTrigger"]["url"] - self.logging.info(f"Function {function.name} - HTTP trigger URL: {invoke_url}") + + # Get trigger URL from strategy + invoke_url = strategy.create_trigger(function.name) trigger = HTTPTrigger(invoke_url) else: @@ -796,8 +2108,11 @@ def cached_function(self, function: Function) -> None: from sebs.faas.function import Trigger from sebs.gcp.triggers import LibraryTrigger + func = cast(GCPFunction, function) + for trigger in function.triggers(Trigger.TriggerType.LIBRARY): gcp_trigger = cast(LibraryTrigger, trigger) + gcp_trigger.deployment_type = func.deployment_type gcp_trigger.logging_handlers = self.logging_handlers gcp_trigger.deployment_client = self @@ -825,94 +2140,26 @@ def update_function( RuntimeError: If function update fails after maximum retries """ - if container_deployment: - raise NotImplementedError("Container deployment is not supported in GCP") - - if code_package.code_location is None: - raise RuntimeError("Code location is not set for GCP deployment") - function = cast(GCPFunction, function) - language_runtime = code_package.language_version - - function_cfg = FunctionConfig.from_benchmark(code_package) - architecture = function_cfg.architecture.value - code_package_name = os.path.basename(code_package.code_location) - storage = cast(GCPStorage, self._system_resources.get_storage()) - code_package_name = f"{architecture}-{code_package_name}" - - bucket = function.code_bucket(code_package.benchmark, storage) - storage.upload(bucket, code_package.code_location, code_package_name) - - envs = self._generate_function_envs(code_package) - - self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}") - full_func_name = GCP.get_full_function_name( - self.config.project_name, self.config.region, function.name - ) - req = ( - self.function_client.projects() - .locations() - .functions() - .patch( - name=full_func_name, - body={ - "name": full_func_name, - "entryPoint": ( - "org.serverlessbench.Handler" - if code_package.language == Language.JAVA - else "handler" - ), - "runtime": code_package.language_name + language_runtime.replace(".", ""), - "availableMemoryMb": function.config.memory, - "timeout": str(function.config.timeout) + "s", - "httpsTrigger": {}, - "sourceArchiveUrl": "gs://" + bucket + "/" + code_package_name, - "environmentVariables": envs, - }, - ) - ) - res = self._execute_with_retry(req) - - self.logging.info(f"Function {function.name} code update initiated") - - # Patch does not return buildName, need to wait for build to start - expected_version = int(res["metadata"]["versionId"]) - build_found = self._wait_for_build_and_poll(function.name) - if not build_found: - self.logging.warning( - f"No build operation found for {function.name} - " - "this is unexpected for code updates" - ) - - # Wait for deployment to become ACTIVE with expected version - self._wait_for_active_status(function.name, expected_version) - self.logging.info("Published new function code and configuration.") - - def _update_envs(self, full_function_name: str, envs: Dict) -> Dict: - """Merge new environment variables with existing function environment. - - Retrieves current function environment variables and merges them with - new variables, with new variables taking precedence on conflicts. - - Args: - full_function_name: Fully qualified function name - envs: New environment variables to add/update - - Returns: - Merged environment variables dictionary - """ - get_req = ( - self.function_client.projects().locations().functions().get(name=full_function_name) + # Select deployment strategy + strategy = ( + self.run_container_strategy + if container_deployment + else self.cloud_function_gen1_strategy ) - response = self._execute_with_retry(get_req) - # preserve old variables while adding new ones. - # but for conflict, we select the new one - if "environmentVariables" in response: - envs = {**response["environmentVariables"], **envs} + # Generate environment variables + envs = { + **self._generate_function_envs(code_package), + **strategy.generate_runtime_envs(), + } - return envs + # Update code using strategy + strategy.update_code(function, code_package, envs, container_uri) + if container_deployment: + function.set_container_uri(container_uri) + strategy.wait_for_deployment(function.name) def _generate_function_envs(self, code_package: Benchmark) -> Dict: """Generate environment variables for function based on benchmark requirements. @@ -951,6 +2198,7 @@ def update_function_configuration( function: Function instance to update code_package: Benchmark package with configuration requirements env_variables: Additional environment variables to set + container_uri: Container image URI (for container deployments) Returns: Version ID of the updated function @@ -962,100 +2210,55 @@ def update_function_configuration( assert code_package.has_input_processed function = cast(GCPFunction, function) - full_func_name = GCP.get_full_function_name( + + # Select deployment strategy + strategy = ( + self.run_container_strategy + if code_package.container_deployment + else self.cloud_function_gen1_strategy + ) + + # Get full resource name for env merging + full_func_name = strategy.get_full_function_name( self.config.project_name, self.config.region, function.name ) - envs = self._generate_function_envs(code_package) + # Prepare environment variables + envs = { + **self._generate_function_envs(code_package), + **strategy.generate_runtime_envs(), + } envs = {**envs, **env_variables} + # GCP might overwrite existing variables # If we modify them, we need to first read existing ones and append. if len(envs) > 0: - envs = self._update_envs(full_func_name, envs) - - if len(envs) > 0: - - req = ( - self.function_client.projects() - .locations() - .functions() - .patch( - name=full_func_name, - updateMask="availableMemoryMb,timeout,environmentVariables", - body={ - "availableMemoryMb": function.config.memory, - "timeout": str(function.config.timeout) + "s", - "environmentVariables": envs, - }, - ) - ) - - else: - - req = ( - self.function_client.projects() - .locations() - .functions() - .patch( - name=full_func_name, - updateMask="availableMemoryMb,timeout", - body={ - "availableMemoryMb": function.config.memory, - "timeout": str(function.config.timeout) + "s", - }, - ) - ) + envs = strategy.update_envs(full_func_name, envs) - res = self._execute_with_retry(req) - expected_version = int(res["metadata"]["versionId"]) - - self.logging.info(f"Function {function.name} configuration update initiated") + # Update configuration using strategy + res = strategy.update_config(function, envs) - # Wait for deployment to become ACTIVE with expected version - # Configuration updates don't trigger builds but still need deployment time - current_version = self._wait_for_active_status(function.name, expected_version, timeout=60) - self.logging.info("Published new function configuration.") + current_dep_config = self._get_deployment_config(function.deployment_type) + function._deployment_config = current_dep_config - return current_version + return res - def delete_function(self, func_name: str) -> None: - """Delete a Google Cloud Function. + def delete_function(self, func_name: str, function: Dict) -> None: + """Delete a Google Cloud Function or Cloud Run service. Args: - func_name: Name of the function to delete + func_name: Name of the function/service to delete """ - self.logging.info(f"Deleting function {func_name}") - - full_func_name = GCP.get_full_function_name( - self.config.project_name, self.config.region, func_name + # Select deployment strategy based on function name + # v1 functions don't allow hyphens, new functions don't allow underscores + gcp_function = GCPFunction.deserialize(function) + strategy = ( + self.run_container_strategy + if gcp_function.deployment_type.is_container + else self.cloud_function_gen1_strategy ) - try: - delete_req = ( - self.function_client.projects().locations().functions().delete(name=full_func_name) - ) - self._execute_with_retry(delete_req) - self.logging.info(f"Function {func_name} deleted successfully") - except HttpError as e: - if e.resp.status == 404: - self.logging.error(f"Function {func_name} does not exist!") - else: - self.logging.error(f"Failed to delete function {func_name}: {e}") - raise - - @staticmethod - def get_full_function_name(project_name: str, location: str, func_name: str) -> str: - """Generate the fully qualified function name for GCP API calls. - - Args: - project_name: GCP project ID - location: GCP region/location - func_name: Function name - - Returns: - Fully qualified function name in GCP format - """ - return f"projects/{project_name}/locations/{location}/functions/{func_name}" + strategy.delete_function(func_name) def shutdown(self) -> None: """Shutdown the GCP system and clean up resources. @@ -1087,128 +2290,20 @@ def download_metrics( metrics: Dictionary to populate with collected metrics """ - from google.api_core import exceptions - from time import sleep - - def wrapper(gen): - """Generator function to extract all results from GCP API paginated responses. - If we exhaust resource, we sleep 30 seconds before a retry. - - Args: - gen: generator of HTTP responses - - Yields: - each HTTP response - """ - while True: - try: - yield next(gen) - except StopIteration: - break - except exceptions.ResourceExhausted: - self.logging.info("Google Cloud resources exhausted, sleeping 30s") - sleep(30) - - """ - Use GCP's logging system to find execution time of each function invocation. - - There shouldn't be problem of waiting for complete results, - since logs appear very quickly here. - """ - import google.cloud.logging as gcp_logging - - logging_client = gcp_logging.Client() - logger = logging_client.logger("cloudfunctions.googleapis.com%2Fcloud-functions") - - """ - GCP accepts only single date format: 'YYYY-MM-DDTHH:MM:SSZ'. - Thus, we first convert timestamp to UTC timezone. - Then, we generate correct format. - - Add 1 second to end time to ensure that removing - milliseconds doesn't affect query. - """ - timestamps = [] - for timestamp in [start_time, end_time + 1]: - utc_date = datetime.fromtimestamp(timestamp, tz=timezone.utc) - timestamps.append(utc_date.strftime("%Y-%m-%dT%H:%M:%SZ")) - - invocations = logger.list_entries( - filter_=( - f'resource.labels.function_name = "{function_name}" ' - f'timestamp >= "{timestamps[0]}" ' - f'timestamp <= "{timestamps[1]}"' - ), - page_size=1000, - ) - invocations_processed = 0 - if hasattr(invocations, "pages"): - pages = list(wrapper(invocations.pages)) - else: - pages = [list(wrapper(invocations))] - entries = 0 - for page in pages: # invocations.pages: - for invoc in page: - entries += 1 - if "execution took" in invoc.payload: - execution_id = invoc.labels["execution_id"] - # might happen that we get invocation from another experiment - if execution_id not in requests: - continue - # find number of miliseconds - regex_result = re.search(r"\d+ ms", invoc.payload) - assert regex_result - exec_time = regex_result.group().split()[0] - # convert into microseconds - requests[execution_id].provider_times.execution = int(exec_time) * 1000 - invocations_processed += 1 - self.logging.info( - f"GCP: Received {entries} entries, found time metrics for {invocations_processed} " - f"out of {len(requests.keys())} invocations." - ) - - """ - Use metrics to find estimated values for maximum memory used, active instances - and network traffic. - https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudfunctions - """ - - # Set expected metrics here - available_metrics = ["execution_times", "user_memory_bytes", "network_egress"] - - client = monitoring_v3.MetricServiceClient() - project_name = client.common_project_path(self.config.project_name) + functions = self.cache_client.get_all_functions(self.name()) + if function_name not in functions: + raise RuntimeError(f"Function {function_name} not found in cache!") - end_time_nanos, end_time_seconds = math.modf(end_time) - start_time_nanos, start_time_seconds = math.modf(start_time) + function = GCPFunction.deserialize(functions[function_name]) - interval = monitoring_v3.TimeInterval( - { - "end_time": {"seconds": int(end_time_seconds) + 60}, - "start_time": {"seconds": int(start_time_seconds)}, - } + strategy = ( + self.run_container_strategy + if function.deployment_type.is_container + else self.cloud_function_gen1_strategy ) + strategy.download_execution_metrics(function_name, start_time, end_time, requests) - for metric in available_metrics: - - metrics[metric] = [] - - list_request = monitoring_v3.ListTimeSeriesRequest( - name=project_name, - filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format(metric), - interval=interval, - ) - - results = client.list_time_series(list_request) - for result in results: - if result.resource.labels.get("function_name") == function_name: - for point in result.points: - metrics[metric] += [ - { - "mean_value": point.value.distribution_value.mean, - "executions_count": point.value.distribution_value.count, - } - ] + strategy.download_metrics(function_name, start_time, end_time, metrics) def _enforce_cold_start(self, function: Function, code_package: Benchmark) -> int: """Force a cold start by updating function configuration. @@ -1252,7 +2347,7 @@ def enforce_cold_start(self, functions: List[Function], code_package: Benchmark) deployment_done = False while not deployment_done: for versionId, func in new_versions: - is_deployed, last_version = self.is_deployed(func.name, versionId) + is_deployed, last_version = self.is_deployed(func, versionId) if not is_deployed: undeployed_functions.append((versionId, func)) deployed = len(new_versions) - len(undeployed_functions) @@ -1292,7 +2387,7 @@ def get_functions(self, code_package: Benchmark, function_names: List[str]) -> L deployment_done = False while not deployment_done: for func in undeployed_functions_before: - is_deployed, last_version = self.is_deployed(func.name) + is_deployed, last_version = self.is_deployed(func) if not is_deployed: undeployed_functions.append(func) deployed = len(undeployed_functions_before) - len(undeployed_functions) @@ -1307,7 +2402,7 @@ def get_functions(self, code_package: Benchmark, function_names: List[str]) -> L return functions - def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: + def is_deployed(self, function: Function, versionId: int = -1) -> Tuple[bool, int]: """Check if a function is deployed and optionally verify its version. Args: func_name: Name of the function to check @@ -1316,29 +2411,16 @@ def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: Returns: Tuple of (is_deployed, current_version_id) """ - name = GCP.get_full_function_name(self.config.project_name, self.config.region, func_name) - function_client = self.get_function_client() - status_req = function_client.projects().locations().functions().get(name=name) - status_res = self._execute_with_retry(status_req) - if versionId == -1: - return (status_res["status"] == "ACTIVE", status_res["versionId"]) - else: - return (status_res["versionId"] == versionId, status_res["versionId"]) - - def deployment_version(self, func: Function) -> int: - """Get the current deployment version ID of a function. - - Args: - func: Function instance to check + # Select deployment strategy based on function name + # v1 functions don't allow hyphens, new functions don't allow underscores + gcp_function = cast(GCPFunction, function) + strategy = ( + self.run_container_strategy + if gcp_function.deployment_type.is_container + else self.cloud_function_gen1_strategy + ) - Returns: - Current version ID of the function - """ - name = GCP.get_full_function_name(self.config.project_name, self.config.region, func.name) - function_client = self.get_function_client() - status_req = function_client.projects().locations().functions().get(name=name) - status_res = self._execute_with_retry(status_req) - return int(status_res["versionId"]) + return strategy.is_deployed(function.name, versionId) @staticmethod def helper_zip(base_directory: str, path: str, archive: zipfile.ZipFile) -> None: diff --git a/sebs/gcp/triggers.py b/sebs/gcp/triggers.py index 2eaaba41..a31f04b1 100644 --- a/sebs/gcp/triggers.py +++ b/sebs/gcp/triggers.py @@ -23,13 +23,29 @@ import concurrent.futures import datetime import json -import time from typing import Dict, Optional # noqa from sebs.gcp.gcp import GCP +from sebs.gcp.function import FunctionDeploymentType from sebs.faas.function import ExecutionResult, Trigger +def normalize_request_id(res: ExecutionResult) -> None: + """Normalize the request identifier to the trace ID format used by GCP logs. + + Args: + res: Execution result object to update in place. + """ + # In GCP, we used to return the request id directly. + # However, containers have function request id, but they + # are not visible in logs - we cannot use them for query. + # + # However, both functions and containers have trace id, + # which needs to be slightly converted. + if "/" in res.request_id: + res.request_id = res.request_id.split("/", 1)[0] + + class LibraryTrigger(Trigger): """Direct Cloud Functions API trigger for synchronous invocation. @@ -40,18 +56,26 @@ class LibraryTrigger(Trigger): Attributes: name: Function name to invoke _deployment_client: GCP client for API operations + _deployment_type: Type of deployment (gen1 function or container) """ - def __init__(self, fname: str, deployment_client: Optional[GCP] = None) -> None: + def __init__( + self, + fname: str, + deployment_client: Optional[GCP] = None, + deployment_type: Optional[FunctionDeploymentType] = None, + ) -> None: """Initialize library trigger for direct function invocation. Args: fname: Name of the Cloud Function to invoke deployment_client: Optional GCP client for API operations + deployment_type: Optional deployment type (gen1 function or container) """ super().__init__() self.name = fname self._deployment_client = deployment_client + self._deployment_type = deployment_type @staticmethod def typename() -> str: @@ -84,6 +108,25 @@ def deployment_client(self, deployment_client: GCP) -> None: """ self._deployment_client = deployment_client + @property + def deployment_type(self) -> FunctionDeploymentType: + """Get the deployment type associated with this trigger. + + Returns: + Deployment type currently associated with the trigger. + """ + assert self._deployment_type + return self._deployment_type + + @deployment_type.setter + def deployment_type(self, deployment_type: FunctionDeploymentType) -> None: + """Set the deployment type associated with this trigger. + + Args: + deployment_type: Deployment type to associate with the trigger. + """ + self._deployment_type = deployment_type + @staticmethod def trigger_type() -> Trigger.TriggerType: """Get the trigger type for this implementation. @@ -96,31 +139,57 @@ def trigger_type() -> Trigger.TriggerType: def sync_invoke(self, payload: Dict) -> ExecutionResult: """Synchronously invoke the Cloud Function using the API. - Waits for function deployment, then invokes via Cloud Functions API. - Measures execution time and handles errors. + Waits for function deployment, then invokes via appropriate API based on + deployment type: + - FUNCTION_GEN1: Cloud Functions v1 API + - CONTAINER: Cloud Run service via HTTP + - FUNCTION_GEN2: Not yet supported Args: payload: Input data to send to the function Returns: ExecutionResult with timing, output, and error information + + Raises: + NotImplementedError: If deployment type is not supported """ self.logging.info(f"Invoke function {self.name}") - # Verify that the function is deployed - deployed = False - while not deployed: - if self.deployment_client.is_deployed(self.name): - deployed = True - else: - time.sleep(5) + # Check deployment type and invoke accordingly + if self._deployment_type == FunctionDeploymentType.FUNCTION_GEN1: + return self._invoke_gen1_function(payload) + elif self._deployment_type == FunctionDeploymentType.CONTAINER: + raise NotImplementedError( + "LibraryTrigger does not yet support CONTAINER deployment type. " + "Use HTTPTrigger instead." + ) + elif self._deployment_type == FunctionDeploymentType.FUNCTION_GEN2: + raise NotImplementedError( + "LibraryTrigger does not yet support FUNCTION_GEN2 deployment type. " + "Use HTTPTrigger instead." + ) + else: + raise NotImplementedError( + f"LibraryTrigger does not support deployment type: {self._deployment_type}. " + "Please specify deployment_type as FUNCTION_GEN1 or CONTAINER." + ) + + def _invoke_gen1_function(self, payload: Dict) -> ExecutionResult: + """Invoke a Cloud Functions Gen1 function using the v1 API. + + Args: + payload: Input data to send to the function - # GCP's fixed style for a function name + Returns: + ExecutionResult with timing, output, and error information + """ config = self.deployment_client.config full_func_name = ( - f"projects/{config.project_name}/locations/" f"{config.region}/functions/{self.name}" + f"projects/{config.project_name}/locations/{config.region}/functions/{self.name}" ) + function_client = self.deployment_client.get_function_client() req = ( function_client.projects() @@ -128,44 +197,53 @@ def sync_invoke(self, payload: Dict) -> ExecutionResult: .functions() .call(name=full_func_name, body={"data": json.dumps(payload)}) ) + begin = datetime.datetime.now() - res = self.deployment_client._execute_with_retry(req) + res = req.execute() end = datetime.datetime.now() gcp_result = ExecutionResult.from_times(begin, end) gcp_result.request_id = res["executionId"] + if "error" in res.keys() and res["error"] != "": - self.logging.error("Invocation of {} failed!".format(self.name)) - self.logging.error("Input: {}".format(payload)) + self.logging.error(f"Invocation of {self.name} failed!") + self.logging.error(f"Input: {payload}") gcp_result.stats.failure = True return gcp_result output = json.loads(res["result"]) gcp_result.parse_benchmark_output(output) + + normalize_request_id(gcp_result) + return gcp_result - def async_invoke(self, payload: Dict): + def async_invoke(self, payload: Dict) -> concurrent.futures.Future: """Asynchronously invoke the Cloud Function. - Note: This method is not currently implemented for GCP's LibraryTrigger. - GCP's `functions.call` API is synchronous. Asynchronous behavior could - need to be implemented using a thread pool or similar mechanism if desired. + Uses a ThreadPoolExecutor to run sync_invoke in the background. Args: payload: Input data to send to the function - Raises: - NotImplementedError: Async invocation not implemented for library triggers + Returns: + Future object for the async invocation """ - raise NotImplementedError() + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut def serialize(self) -> Dict: """Serialize trigger to dictionary for cache storage. Returns: - Dictionary containing trigger type and name + Dictionary containing trigger type, name, and deployment type """ - return {"type": "Library", "name": self.name} + return { + "type": "Library", + "name": self.name, + "deployment_type": self._deployment_type.value if self._deployment_type else None, + } @staticmethod def deserialize(obj: Dict) -> Trigger: @@ -177,7 +255,11 @@ def deserialize(obj: Dict) -> Trigger: Returns: Reconstructed LibraryTrigger instance """ - return LibraryTrigger(obj["name"]) + deployment_type = None + if "deployment_type" in obj and obj["deployment_type"] is not None: + deployment_type = FunctionDeploymentType.deserialize(obj["deployment_type"]) + + return LibraryTrigger(obj["name"], deployment_type=deployment_type) class HTTPTrigger(Trigger): @@ -226,9 +308,9 @@ def sync_invoke(self, payload: Dict) -> ExecutionResult: Returns: ExecutionResult from the HTTP invocation """ - - self.logging.debug(f"Invoke function {self.url}") - return self._http_invoke(payload, self.url) + res = self._http_invoke(payload, self.url) + normalize_request_id(res) + return res def async_invoke(self, payload: Dict) -> concurrent.futures.Future: """Asynchronously invoke the Cloud Function via HTTP.