diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index b22c4edc..00000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,88 +0,0 @@ -version: 2.1 - -orbs: - python: circleci/python@2.1 - -jobs: - linting: - executor: - name: 'python/default' - tag: '3.10' - steps: - - checkout - - restore_cache: - key: deps1-{{ .Branch }}-{{ checksum "requirements.txt" }} - - run: - command: | - sudo apt update && sudo apt install libcurl4-openssl-dev - name: Install curl-config from Ubuntu APT - - run: - command: | - python3 install.py --aws --azure --gcp --no-local - name: Install pip dependencies - - run: - command: | - . python-venv/bin/activate - black sebs --check --config .black.toml - name: Python code formatting with black - - run: - command: | - . python-venv/bin/activate - flake8 sebs --config=.flake8.cfg --tee --output-file flake-reports - name: Python code lint with flake8 - - run: - command: | - . python-venv/bin/activate - mypy sebs --config-file=.mypy.ini - name: Python static code verification with mypy - - run: - command: | - . python-venv/bin/activate - interrogate -v --fail-under 100 sebs - name: Check for Python documentation coverage - - store_artifacts: - path: flake-reports - destination: flake-reports - test-aws: - executor: python/default - steps: - - checkout - - setup_remote_docker - - restore_cache: - key: deps1-{{ .Branch }}-{{ checksum "requirements.txt" }} - - run: - command: | - if [[ -d $HOME/docker ]]; - then - ls $HOME/docker/*.tar.gz | xargs -I {file} sh -c "zcat {file} | docker load"; - else - docker pull mcopik/serverless-benchmarks:build.aws.python.3.7 - docker pull mcopik/serverless-benchmarks:build.aws.nodejs.12.x - fi - name: Load Docker images - - run: - command: | - python3 install.py --aws - name: Install pip dependencies - - run: - command: | - mkdir -p $HOME/docker - docker images mcopik/serverless-benchmarks --filter='dangling=false' --format '{{.Repository}}:{{.Tag}} {{.ID}}' |\ - xargs -n 2 -t sh -c 'test -e $HOME/docker/$1.tar.gz || docker save $0 | gzip -2 > $HOME/docker/$1.tar.gz' - name: Save Docker images - - save_cache: - key: deps1-{{ .Branch }}-{{ checksum "requirements.txt" }} - paths: - - "sebs-virtualenv" - - $HOME/docker - - run: - command: | - . sebs-virtualenv/bin/activate - tests/test_runner.py --deployment aws - name: Execute AWS tests - -workflows: - main: - jobs: - - linting - diff --git a/.github/workflows/_regression-job.yml b/.github/workflows/_regression-job.yml new file mode 100644 index 00000000..d22cf270 --- /dev/null +++ b/.github/workflows/_regression-job.yml @@ -0,0 +1,121 @@ +name: Regression Job (Reusable) + +on: + workflow_call: + inputs: + platform: + required: true + type: string + language: + required: true + type: string + version: + required: true + type: string + architecture: + required: true + type: string + +jobs: + test: + runs-on: ${{ inputs.architecture == 'arm64' && 'ubuntu-24.04-arm' || 'ubuntu-latest' }} + + env: + RESOURCE_PREFIX: ci + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Setup GCP credentials + if: inputs.platform == 'gcp' + uses: google-github-actions/auth@v2 + with: + credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_JSON }} + + - name: Setup Azure credentials + if: inputs.platform == 'azure' + run: | + echo "AZURE_SECRET_APPLICATION_ID=${{ secrets.AZURE_SECRET_APPLICATION_ID }}" >> $GITHUB_ENV + echo "AZURE_SECRET_TENANT=${{ secrets.AZURE_SECRET_TENANT }}" >> $GITHUB_ENV + echo "AZURE_SECRET_PASSWORD=${{ secrets.AZURE_SECRET_PASSWORD }}" >> $GITHUB_ENV + + - name: Setup AWS credentials + if: inputs.platform == 'aws' + run: | + echo "AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }}" >> $GITHUB_ENV + echo "AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }}" >> $GITHUB_ENV + echo "AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION || 'us-east-1' }}" >> $GITHUB_ENV + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Create virtual environment and install SeBS + run: | + uv venv + uv pip install . + + - name: Run regression tests + timeout-minutes: 10 + run: | + source .venv/bin/activate + uv run sebs benchmark regression test \ + --config configs/example.json \ + --deployment ${{ inputs.platform }} \ + --language ${{ inputs.language }} \ + --language-version ${{ inputs.version }} \ + --architecture ${{ inputs.architecture }} \ + --selected-architecture \ + --resource-prefix ci \ + --filter-output + + - name: Generate test summary + if: always() + run: | + echo "Regression Test Summary" > test-summary.txt + echo "======================" >> test-summary.txt + echo "Platform: ${{ inputs.platform }}" >> test-summary.txt + echo "Language: ${{ inputs.language }}" >> test-summary.txt + echo "Version: ${{ inputs.version }}" >> test-summary.txt + echo "" >> test-summary.txt + if ls regression_*.json 1> /dev/null 2>&1; then + ls -1 regression_*.json | wc -l | xargs echo "Benchmarks tested:" >> test-summary.txt + echo "" >> test-summary.txt + echo "Results saved to artifacts/results/" >> test-summary.txt + else + echo "No benchmark results found" >> test-summary.txt + fi + + - name: Upload test summary + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-summary-${{ inputs.platform }}-${{ inputs.language }}-${{ inputs.version }} + path: test-summary.txt + + - name: Collect and upload regression results + if: always() + run: | + mkdir -p results + if ls regression_*.json 1> /dev/null 2>&1; then + mv regression_*.json results/ || true + fi + + - name: Upload regression results + if: always() + uses: actions/upload-artifact@v4 + with: + name: results-${{ inputs.platform }}-${{ inputs.language }}-${{ inputs.version }} + path: results/ + if-no-files-found: ignore + + - name: Upload cache snapshot + if: always() + uses: actions/upload-artifact@v4 + with: + name: cache-snapshot-${{ inputs.platform }}-${{ inputs.language }}-${{ inputs.version }} + path: regression-cache/ + if-no-files-found: ignore + diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 00000000..479d9733 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,55 @@ +name: Lint + +on: + push: + pull_request: + workflow_dispatch: + +jobs: + linting: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install system dependencies + run: sudo apt update && sudo apt install -y libcurl4-openssl-dev + + - name: Cache uv dependencies + uses: actions/cache@v4 + with: + path: ~/.cache/uv + key: uv-${{ runner.os }}-${{ hashFiles('requirements.txt', 'pyproject.toml') }} + restore-keys: | + uv-${{ runner.os }}- + + - name: Install SeBS with dev dependencies + run: uv sync --extra dev + + - name: Python code formatting with black + run: uv run black sebs --check --config .black.toml + + - name: Python code lint with flake8 + run: uv run flake8 sebs --config=.flake8.cfg --tee --output-file flake-reports + + - name: Python static code verification with mypy + run: uv run mypy sebs --config-file=.mypy.ini + + - name: Check for Python documentation coverage + run: uv run interrogate -v --fail-under 100 sebs + + - name: Upload flake8 reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: flake-reports + path: flake-reports diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml new file mode 100644 index 00000000..a3d49105 --- /dev/null +++ b/.github/workflows/regression.yml @@ -0,0 +1,75 @@ +name: Regression Tests + +on: + push: + branches: + - master + - 'feature/**' + workflow_dispatch: + +jobs: + regression: + strategy: + matrix: + include: + - platform: aws + language: python + version: "3.11" + architecture: "x64" + - platform: aws + language: python + version: "3.11" + architecture: "arm64" + - platform: aws + language: nodejs + version: "16" + architecture: "x64" + - platform: aws + language: nodejs + version: "16" + architecture: "arm64" + - platform: aws + language: cpp + version: "all" + architecture: "x64" + - platform: aws + language: java + version: "17" + architecture: "x64" + - platform: aws + language: java + version: "17" + architecture: "arm64" + - platform: gcp + language: python + version: "3.11" + architecture: "x64" + - platform: gcp + language: nodejs + version: "20" + architecture: "x64" + - platform: gcp + language: java + version: "17" + architecture: "x64" + - platform: azure + language: python + version: "3.11" + architecture: "x64" + - platform: azure + language: nodejs + version: "20" + architecture: "x64" + - platform: azure + language: java + version: "17" + architecture: "x64" + fail-fast: false + + uses: ./.github/workflows/_regression-job.yml + with: + platform: ${{ matrix.platform }} + language: ${{ matrix.language }} + version: ${{ matrix.version }} + architecture: ${{ matrix.architecture }} + secrets: inherit diff --git a/README.md b/README.md index 6c38676a..b51db91a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ -[![CircleCI](https://circleci.com/gh/spcl/serverless-benchmarks.svg?style=shield)](https://circleci.com/gh/spcl/serverless-benchmarks) +[![Code Linting](https://github.com/spcl/serverless-benchmarks/actions/workflows/lint.yml/badge.svg)](https://github.com/spcl/serverless-benchmarks/actions) +[![Regression](https://github.com/spcl/serverless-benchmarks/actions/workflows/regression.yml/badge.svg)](https://github.com/spcl/serverless-benchmarks/actions) [![Documentation Status](https://readthedocs.org/projects/sebs/badge/?version=latest)](https://sebs.readthedocs.io/en/latest/?badge=latest) ![Release](https://img.shields.io/github/v/release/spcl/serverless-benchmarks) ![License](https://img.shields.io/github/license/spcl/serverless-benchmarks) diff --git a/benchmarks/300.utilities/311.compression/input.py b/benchmarks/300.utilities/311.compression/input.py index 10cf8986..6e67c045 100644 --- a/benchmarks/300.utilities/311.compression/input.py +++ b/benchmarks/300.utilities/311.compression/input.py @@ -32,8 +32,9 @@ def generate_input(data_dir, size, benchmarks_bucket, input_paths, output_paths, upload_files(data_dir, os.path.join(data_dir, dir), upload_func) input_config = {'object': {}, 'bucket': {}} - input_config['object']['key'] = datasets[0] + input_config["object"]["key"] = "acmart-master" input_config['bucket']['bucket'] = benchmarks_bucket input_config['bucket']['input'] = input_paths[0] input_config['bucket']['output'] = output_paths[0] + print(input_config) return input_config diff --git a/benchmarks/500.scientific/504.dna-visualisation/python/requirements.txt.3.11 b/benchmarks/500.scientific/504.dna-visualisation/python/requirements.txt.3.11 new file mode 100644 index 00000000..15a151be --- /dev/null +++ b/benchmarks/500.scientific/504.dna-visualisation/python/requirements.txt.3.11 @@ -0,0 +1,6 @@ +# Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved. +squiggle==0.3.1 +# Lambda images use glibc 2.26, for which there are no wheels on new packages +# we have to fix version to prevent compilation from source +numpy==2.2.6 +contourpy==1.3.2 diff --git a/benchmarks/wrappers/aws/python/setup.py b/benchmarks/wrappers/aws/python/setup.py index c34245e4..51d9c5f8 100644 --- a/benchmarks/wrappers/aws/python/setup.py +++ b/benchmarks/wrappers/aws/python/setup.py @@ -1,14 +1,9 @@ # Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved. from distutils.core import setup from glob import glob -from pkg_resources import parse_requirements - -with open('requirements.txt') as f: - requirements = [str(r) for r in parse_requirements(f)] setup( name='function', - install_requires=requirements, packages=['function'], package_dir={'function': '.'}, package_data={'function': glob('**', recursive=True)}, diff --git a/benchmarks/wrappers/aws/python/storage.py b/benchmarks/wrappers/aws/python/storage.py index 13fd471d..401947df 100644 --- a/benchmarks/wrappers/aws/python/storage.py +++ b/benchmarks/wrappers/aws/python/storage.py @@ -32,6 +32,9 @@ def download(self, bucket, file, filepath): def download_directory(self, bucket, prefix, path): objects = self.client.list_objects_v2(Bucket=bucket, Prefix=prefix) + # 'Contents' key is only present when objects are found + if 'Contents' not in objects: + raise RuntimeError(f"No objects found in bucket '{bucket}' with prefix '{prefix}'") for obj in objects['Contents']: file_name = obj['Key'] path_to_file = os.path.dirname(file_name) diff --git a/configs/example.json b/configs/example.json index f320982b..e4f79b21 100644 --- a/configs/example.json +++ b/configs/example.json @@ -4,7 +4,7 @@ "update_code": false, "update_storage": false, "download_results": false, - "architecture": "arm64", + "architecture": "x64", "container_deployment": true, "runtime": { "language": "python", diff --git a/configs/nodejs.json b/configs/nodejs.json index 4188b537..9abc0ce1 100644 --- a/configs/nodejs.json +++ b/configs/nodejs.json @@ -4,7 +4,7 @@ "update_code": false, "update_storage": false, "download_results": false, - "architecture": "arm64", + "architecture": "x64", "container_deployment": true, "runtime": { "language": "nodejs", diff --git a/configs/systems.json b/configs/systems.json index 27ad4959..671b99b2 100644 --- a/configs/systems.json +++ b/configs/systems.json @@ -1,7 +1,8 @@ { "general": { "docker_repository": "spcleth/serverless-benchmarks", - "SeBS_version": "1.2.0" + "SeBS_version": "1.2.1", + "previous_major_version": "1.2.0" }, "local": { "experiments": { diff --git a/dockerfiles/aws/java/Dockerfile.build b/dockerfiles/aws/java/Dockerfile.build index bd977d69..b2e1058c 100644 --- a/dockerfiles/aws/java/Dockerfile.build +++ b/dockerfiles/aws/java/Dockerfile.build @@ -2,6 +2,7 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} ARG VERSION ENV JAVA_VERSION=${VERSION} +ARG TARGETARCH # useradd, groupmod, build tooling RUN yum install -y shadow-utils unzip tar gzip zip @@ -13,7 +14,7 @@ ENV PATH=/opt/maven/bin:$PATH ENV GOSU_VERSION 1.14 # https://github.com/tianon/gosu/releases/tag/1.14 # key https://keys.openpgp.org/search?q=tianon%40debian.org -RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64" \ +RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-${TARGETARCH}" \ && chmod +x /usr/local/bin/gosu RUN mkdir -p /sebs/ COPY dockerfiles/java_installer.sh /sebs/installer.sh diff --git a/dockerfiles/aws/nodejs/Dockerfile.build b/dockerfiles/aws/nodejs/Dockerfile.build index 29bc8985..c5ef797e 100755 --- a/dockerfiles/aws/nodejs/Dockerfile.build +++ b/dockerfiles/aws/nodejs/Dockerfile.build @@ -1,12 +1,13 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} +ARG TARGETARCH # useradd, groupmod RUN yum install -y shadow-utils cmake curl libcurl libcurl-devel ENV GOSU_VERSION 1.14 # https://github.com/tianon/gosu/releases/tag/1.14 # key https://keys.openpgp.org/search?q=tianon%40debian.org -RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64" \ +RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-${TARGETARCH}" \ && chmod +x /usr/local/bin/gosu RUN mkdir -p /sebs/ COPY dockerfiles/nodejs_installer.sh /sebs/installer.sh diff --git a/dockerfiles/aws/python/Dockerfile.build b/dockerfiles/aws/python/Dockerfile.build index 3c6f2f74..e98b0f45 100755 --- a/dockerfiles/aws/python/Dockerfile.build +++ b/dockerfiles/aws/python/Dockerfile.build @@ -2,13 +2,14 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} ARG VERSION ENV PYTHON_VERSION=${VERSION} +ARG TARGETARCH # useradd, groupmod RUN yum install -y shadow-utils zip ENV GOSU_VERSION 1.14 # https://github.com/tianon/gosu/releases/tag/1.14 # key https://keys.openpgp.org/search?q=tianon%40debian.org -RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-amd64" \ +RUN curl -o /usr/local/bin/gosu -SL "https://github.com/tianon/gosu/releases/download/${GOSU_VERSION}/gosu-${TARGETARCH}" \ && chmod +x /usr/local/bin/gosu RUN mkdir -p /sebs/ COPY dockerfiles/python_installer.sh /sebs/installer.sh diff --git a/dockerfiles/nodejs_installer.sh b/dockerfiles/nodejs_installer.sh index 1dd9fd65..01a3eaf9 100644 --- a/dockerfiles/nodejs_installer.sh +++ b/dockerfiles/nodejs_installer.sh @@ -1,5 +1,5 @@ #!/bin/bash - +set -e if [ -f /nvm/nvm.sh ]; then . /nvm/nvm.sh fi diff --git a/dockerfiles/python_installer.sh b/dockerfiles/python_installer.sh index 35f3228d..5525cf87 100644 --- a/dockerfiles/python_installer.sh +++ b/dockerfiles/python_installer.sh @@ -1,5 +1,5 @@ #!/bin/bash - +set -e cd /mnt/function PLATFORM_ARG="" @@ -9,20 +9,18 @@ fi if [[ "${TARGET_ARCHITECTURE}" == "arm64" ]] && [[ -f "requirements.txt.arm.${PYTHON_VERSION}" ]]; then - pip3 -q install ${PLATFORM_ARG} -r requirements.txt.arm.${PYTHON_VERSION} -t .python_packages/lib/site-packages + pip3 install ${PLATFORM_ARG} -r requirements.txt.arm.${PYTHON_VERSION} -t .python_packages/lib/site-packages elif [[ -f "requirements.txt.${PYTHON_VERSION}" ]]; then - pip3 -q install ${PLATFORM_ARG} -r requirements.txt.${PYTHON_VERSION} -t .python_packages/lib/site-packages + pip3 install ${PLATFORM_ARG} -r requirements.txt.${PYTHON_VERSION} -t .python_packages/lib/site-packages else - pip3 -q install ${PLATFORM_ARG} -r requirements.txt -t .python_packages/lib/site-packages + pip3 install ${PLATFORM_ARG} -r requirements.txt -t .python_packages/lib/site-packages fi if [[ -f "${SCRIPT_FILE}" ]]; then /bin/bash ${SCRIPT_FILE} .python_packages/lib/site-packages fi - - diff --git a/docs/platforms.md b/docs/platforms.md index 6c993592..9bda1fa7 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -3,7 +3,7 @@ SeBS supports three commercial serverless platforms: AWS Lambda, Azure Functions, and Google Cloud Functions. Furthermore, we support the open source FaaS system OpenWhisk. -The file `config/example.json` contains all parameters that users can change +The file `configs/example.json` contains all parameters that users can change to customize the deployment. Some of these parameters, such as cloud credentials or storage instance address, are required. @@ -33,6 +33,17 @@ However, special care is needed to build Docker containers: since installation o binaries based on ARM containers on x86 CPUs. To build multi-platform images, we recommend to follow official [Docker guidelines](https://docs.docker.com/build/building/multi-platform/#build-multi-platform-images) and provide static QEMU installation. On Ubuntu-based distributions, this requires installing an OS package and executing a single Docker command to provide seamless emulation of ARM containers. +### Multi-platform Docker Images + +Build images, which encapsulate package building, are available as both x64 and arm64 for Python and Node.js on AWS Lambda. +To rebuild multi-plaform images, an additional flag is needed to enable the internal `docker buildx` command: + +```bash +sebs docker build --image-type build --language python --deployment aws --architecture x64 --language-version 3.11 --multi-platform +``` + +When rebuilding build images (not necessary for regular users, only for developers), make sure that your Docker installation supports multi-platform images, e.g., [you use `containerd` image store](https://docs.docker.com/engine/storage/containerd/) - old Docker installations might not change the storage type after an upgrade to Docker 29.0, where `containerd` is the default. + ## Cloud Account Identifiers SeBS ensures that all locally cached cloud resources are valid by storing a unique identifier associated with each cloud account. Furthermore, we store this identifier in experiment results to easily match results with the cloud account or subscription that was used to obtain them. We use non-sensitive identifiers such as account IDs on AWS, subscription IDs on Azure, and Google Cloud project IDs. @@ -52,7 +63,7 @@ Additionally, the account must have `AmazonAPIGatewayAdministrator` permission t automatically AWS HTTP trigger. You can provide a [role](https://docs.aws.amazon.com/lambda/latest/dg/lambda-intro-execution-role.html) with permissions to access AWS Lambda and S3; otherwise, one will be created automatically. -To use a user-defined lambda role, set the name in config JSON - see an example in `config/example.json`. +To use a user-defined lambda role, set the name in config JSON - see an example in `configs/example.json`. You can pass the credentials either using the default AWS-specific environment variables: @@ -208,7 +219,7 @@ or in the JSON input configuration: SeBS expects users to deploy and configure an OpenWhisk instance. Below, you will find example of instruction for deploying OpenWhisk instance. The configuration parameters of OpenWhisk for SeBS can be found -in `config/example.json` under the key `['deployment']['openwhisk']`. +in `configs/example.json` under the key `['deployment']['openwhisk']`. In the subsections below, we discuss the meaning and use of each parameter. To correctly deploy SeBS functions to OpenWhisk, following the subsections on *Toolchain* and *Docker* configuration is particularly important. @@ -282,7 +293,7 @@ and new language versions, Docker images must be placed in the registry. However, pushing the image to the default `spcleth/serverless-benchmarks` repository on Docker Hub requires permissions. To use a different Docker Hub repository, change the key -`['general']['docker_repository']` in `config/systems.json`. +`['general']['docker_repository']` in `configs/systems.json`. Alternatively, OpenWhisk users can configure the FaaS platform to use a custom and private Docker registry and push new images there. diff --git a/docs/storage.md b/docs/storage.md index 35bde19f..2f6cc54c 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -19,13 +19,13 @@ You can start the necessary storage services using the `storage` command in SeBS ```bash # Start only object storage -sebs storage start object config/storage.json --output-json storage_object.json +sebs storage start object configs/storage.json --output-json storage_object.json # Start only NoSQL database -sebs storage start nosql config/storage.json --output-json storage_nosql.json +sebs storage start nosql configs/storage.json --output-json storage_nosql.json # Start both storage types -sebs storage start all config/storage.json --output-json storage.json +sebs storage start all configs/storage.json --output-json storage.json ``` The command deploys the requested storage services as Docker containers and generates a configuration file in JSON format. @@ -87,7 +87,7 @@ For example, for an external address `10.10.1.15` (a LAN-local address on CloudL ```bash # For a LAN-local address (e.g., on CloudLab) -jq --slurpfile file1 storage.json '.deployment.openwhisk.storage = $file1[0] | .deployment.openwhisk.storage.object.minio.address = "10.10.1.15:9011"' config/example.json > config/openwhisk.json +jq --slurpfile file1 storage.json '.deployment.openwhisk.storage = $file1[0] | .deployment.openwhisk.storage.object.minio.address = "10.10.1.15:9011"' configs/example.json > configs/openwhisk.json ``` You can validate the configuration of Minio with an HTTP request by using `curl`: @@ -112,7 +112,7 @@ Here, we again assume the external IP address of the system is `10.10.1.15`, and ```bash # For a LAN-local address (e.g., on CloudLab) -jq '.deployment.openwhisk.storage.nosql.scylladb.address = "10.10.1.15:9012"' config/openwhisk.json | sponge config/openwhisk.json +jq '.deployment.openwhisk.storage.nosql.scylladb.address = "10.10.1.15:9012"' configs/openwhisk.json | sponge configs/openwhisk.json ``` You can validate the configuration of ScyllaDB with an HTTP request by using `curl`: diff --git a/docs/usage.md b/docs/usage.md index f97a549c..9d64dab1 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -21,7 +21,7 @@ If you want to simply build a function deployment, such as a full code package o then use the command below. ```bash -sebs benchmark build 110.dynamic-html --config config/example.json --deployment aws +sebs benchmark build 110.dynamic-html --config configs/example.json --deployment aws ``` It will create a code package (local) or build and push a container, when `--container-deployment` flag is used (AWS only). @@ -33,7 +33,7 @@ This command builds, deploys, and executes serverless benchmarks in the cloud. The example below invokes the benchmark `110.dynamic-html` on AWS via the standard HTTP trigger. ```bash -sebs benchmark invoke 110.dynamic-html test --config config/example.json --deployment aws --verbose +sebs benchmark invoke 110.dynamic-html test --config configs/example.json --deployment aws --verbose ``` The results will be stored in `experiment.json`. @@ -64,13 +64,13 @@ Additionally, we provide a regression option to execute all benchmarks on a give The example below demonstrates how to run the regression suite with `test` input size on AWS. ```bash -sebs benchmark regression test --config config/example.json --deployment aws +sebs benchmark regression test --config configs/example.json --deployment aws ``` The regression can be executed on a single benchmark as well: ```bash -sebs benchmark regression test --config config/example.json --deployment aws --benchmark-name 120.uploader +sebs benchmark regression test --config configs/example.json --deployment aws --benchmark-name 120.uploader ``` ## Experiment @@ -78,7 +78,7 @@ sebs benchmark regression test --config config/example.json --deployment aws --b This command is used to execute benchmarks described in the paper. The example below runs the experiment **perf-cost**: ```bash -sebs experiment invoke perf-cost --config config/example.json --deployment aws +sebs experiment invoke perf-cost --config configs/example.json --deployment aws ``` The configuration specifies that benchmark **110.dynamic-html** is executed 50 times, with 50 concurrent invocations, and both cold and warm invocations are recorded. @@ -107,7 +107,7 @@ sebs experiment process perf-cost --config example.json --deployment aws You can remove all allocated cloud resources with the following command: ```bash -sebs resource clean --config config/example.json +sebs resource clean --config configs/example.json ``` This option is currently supported only on AWS, where it removes Lambda functions and associated HTTP APIs and CloudWatch logs, @@ -123,7 +123,7 @@ map the container's port to port defined in the configuration on host network, a instance configuration to file `out_storage.json` ```bash -sebs storage start all config/storage.json --output-json out_storage.json +sebs storage start all configs/storage.json --output-json out_storage.json ``` Then, we need to update the configuration of `local` deployment with information on the storage @@ -132,7 +132,7 @@ instance. The `.deployment.local` object in the configuration JSON must contain this automatically with a single command by using `jq`: ```bash -jq '.deployment.local.storage = input' config/example.json out_storage.json > config/local_deployment.json +jq '.deployment.local.storage = input' configs/example.json out_storage.json > configs/local_deployment.json ``` The output file will contain a JSON object that should look similar to this one: @@ -183,7 +183,7 @@ The output file will contain a JSON object that should look similar to this one: To launch Docker containers, use the following command - this example launches benchmark `110.dynamic-html` with size `test`: ```bash -sebs local start 110.dynamic-html test out_benchmark.json --config config/local_deployment.json --deployments 1 --remove-containers --architecture=x64 +sebs local start 110.dynamic-html test out_benchmark.json --config configs/local_deployment.json --deployments 1 --remove-containers --architecture=x64 ``` The output file `out_benchmark.json` will contain the information on containers deployed and the endpoints that can be used to invoke functions: diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 293ed02c..ad68d967 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -140,7 +140,12 @@ def __init__( self.storage: Optional[S3] = None self.nosql_storage: Optional[DynamoDB] = None - def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None): + def initialize( + self, + config: Dict[str, str] = {}, + resource_prefix: Optional[str] = None, + quiet: bool = False, + ): """ Initialize AWS resources. @@ -158,7 +163,7 @@ def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] ) self.get_lambda_client() self.system_resources.initialize_session(self.session) - self.initialize_resources(select_prefix=resource_prefix) + self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) self.ecr_client = ECRContainer( self.system_config, self.session, self.config, self.docker_client @@ -598,9 +603,11 @@ def default_function_name( """ # Create function name resource_id = resources.resources_id if resources else self.config.resources.resources_id + # Extract benchmark number (e.g., "110" from "110.dynamic-html") + benchmark_number = code_package.benchmark.split(".")[0] func_name = "sebs-{}-{}-{}-{}-{}".format( resource_id, - code_package.benchmark, + benchmark_number, code_package.language_name, code_package.language_version, code_package.architecture, @@ -636,13 +643,14 @@ def delete_function(self, func_name: str) -> None: self.logging.info("Deleting function {}".format(func_name)) try: self.client.delete_function(FunctionName=func_name) + self.config.resources.delete_function_url(func_name, self.session, self.cache_client) except Exception: self.logging.error("Function {} does not exist!".format(func_name)) @staticmethod def parse_aws_report( log: str, requests: Union[ExecutionResult, Dict[str, ExecutionResult]] - ) -> str: + ) -> str | None: """Parse AWS Lambda execution report from CloudWatch logs. Extracts execution metrics from AWS Lambda log entries and updates @@ -668,8 +676,11 @@ def parse_aws_report( aws_vals[split[0]] = split[1].split()[0] if "START RequestId" in aws_vals: request_id = aws_vals["START RequestId"] - else: + elif "REPORT RequestId" in aws_vals: request_id = aws_vals["REPORT RequestId"] + else: + return None + if isinstance(requests, ExecutionResult): output = cast(ExecutionResult, requests) else: @@ -854,6 +865,13 @@ def download_metrics( for result_part in val: if result_part["field"] == "@message": request_id = AWS.parse_aws_report(result_part["value"], requests) + + if request_id is None: + self.logging.error( + "Request incomplete, cannot identify ID! " + f"Request: {result_part['value']}" + ) + if request_id in requests: results_processed += 1 requests_ids.remove(request_id) diff --git a/sebs/aws/config.py b/sebs/aws/config.py index b56b61b7..83230641 100644 --- a/sebs/aws/config.py +++ b/sebs/aws/config.py @@ -655,7 +655,6 @@ def cleanup_function_urls( """ deleted: List[str] = [] - deleted_functions: List[str] = [] dry_run_tag = "[DRY-RUN] " if dry_run else "" dict_copy = self._function_urls.copy() @@ -664,14 +663,8 @@ def cleanup_function_urls( self.logging.info(f"{dry_run_tag}Deleting Function URL for: {func_name}") if not dry_run: - self.delete_function_url(func_name, boto3_session) + self.delete_function_url(func_name, boto3_session, cache_client) deleted.append(func_url.url) - deleted_functions.append(func_name) - - if not dry_run: - for func_name in deleted_functions: - cache_client.remove_config_key(["aws", "resources", "function-urls", func_name]) - self._function_urls.pop(func_name, None) return deleted @@ -776,7 +769,9 @@ def function_url( self._function_urls[func.name] = function_url_obj return function_url_obj - def delete_function_url(self, function_name: str, boto3_session: boto3.session.Session) -> bool: + def delete_function_url( + self, function_name: str, boto3_session: boto3.session.Session, cache_client: Cache + ) -> bool: """ Delete a Lambda Function URL for the given function. Returns True if deleted successfully, False if it didn't exist. @@ -1086,8 +1081,13 @@ def update_cache(self, cache: Cache) -> None: keys=["aws", "resources", "container_repository"], ) cache.update_config(val=self._lambda_role, keys=["aws", "resources", "lambda-role"]) + + # remove old entries before writing new data. + cache.remove_config_key(["aws", "resources", "http-apis"]) for name, api in self._http_apis.items(): cache.update_config(val=api.serialize(), keys=["aws", "resources", "http-apis", name]) + + cache.remove_config_key(["aws", "resources", "function-urls"]) for name, func_url in self._function_urls.items(): cache.update_config( val=func_url.serialize(), diff --git a/sebs/aws/triggers.py b/sebs/aws/triggers.py index 301f3606..bfc66e31 100644 --- a/sebs/aws/triggers.py +++ b/sebs/aws/triggers.py @@ -129,7 +129,16 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: function_output = json.loads(ret["Payload"].read().decode("utf-8")) # AWS-specific parsing - AWS.parse_aws_report(log.decode("utf-8"), aws_result) + req_id = AWS.parse_aws_report(log.decode("utf-8"), aws_result) + if not req_id: + """ + This problem sometimes happens on very long cold starts - the execution + works but AWS returns too early. + """ + self.logging.error( + f"Unexpected AWS log format! Missing RequestID. Log: {log.decode('utf-8')}" + ) + # General benchmark output parsing # For some reason, the body is dict for NodeJS but a serialized JSON for Python if isinstance(function_output["body"], dict): diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 13138431..798f29d1 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -33,6 +33,7 @@ import datetime import json +import random import re import os import shutil @@ -160,6 +161,7 @@ def initialize( self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None, + quiet: bool = False, ) -> None: """Initialize Azure system and start CLI container. @@ -170,7 +172,7 @@ def initialize( config: Additional configuration parameters resource_prefix: Optional prefix for resource naming """ - self.initialize_resources(select_prefix=resource_prefix) + self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) self.allocate_shared_resource() def shutdown(self) -> None: @@ -335,6 +337,91 @@ def package_code( execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) return directory, code_size + def _execute_cli_with_retry( + self, + cmd: str, + max_retries: int = 5, + base_delay: float = 1.0, + max_delay: float = 32.0, + retryable_errors: Optional[Set[str]] = None, + ) -> bytes: + """Execute Azure CLI command with retry logic for transient errors. + + Handles transient CLI errors by retrying with exponential backoff + and jitter. Specific error patterns can be configured for retry. + + Args: + cmd: Azure CLI command to execute + max_retries: Maximum number of retry attempts (default: 5) + base_delay: Base delay in seconds for exponential backoff (default: 1.0) + max_delay: Maximum delay between retries in seconds (default: 32.0) + retryable_errors: Set of error patterns to trigger retries + (default: NotFound, TooManyRequests, find app with name) + + Returns: + Command output as bytes + + Raises: + RuntimeError: If the command fails with a non-retryable error or after + exhausting all retry attempts + """ + if retryable_errors is None: + retryable_errors = { + "NotFound", + "TooManyRequests", + "find app with name", + "ServiceUnavailable", + "InternalServerError", + } + + attempt = 0 + last_error = None + + while attempt <= max_retries: + try: + result = self.cli_instance.execute(cmd) + if attempt > 0: + self.logging.info(f"CLI command succeeded after {attempt} retries") + return result + except RuntimeError as e: + error_message = str(e) + last_error = e + + # Check if error is retryable + is_retryable = any(pattern in error_message for pattern in retryable_errors) + + if not is_retryable: + raise + + # Check if we have retries left + if attempt >= max_retries: + self.logging.error( + f"Max retries ({max_retries}) exhausted for CLI command, " + f"failing with error: {error_message}" + ) + raise + + # Calculate delay with exponential backoff and jitter + delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) + + if attempt == 0: + self.logging.warning( + f"Transient CLI error, retrying (attempt {attempt + 1}/{max_retries}): " + f"{error_message[:100]}" + ) + else: + self.logging.info( + f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s backoff" + ) + + time.sleep(delay) + attempt += 1 + + # This should not be reached, but just in case + if last_error: + raise last_error + raise RuntimeError("Unexpected state in retry logic") + def publish_function( self, function: Function, @@ -345,9 +432,9 @@ def publish_function( """Publish function code to Azure Functions. Deploys the packaged function code to Azure Functions using the - Azure Functions CLI tools. Handles retries and URL extraction. - Will repeat on failure, which is useful to handle delays in - Azure cache updates - it can take between 30 and 60 seconds. + Azure Functions CLI tools. Handles retries with exponential backoff + and jitter for transient errors. This is useful to handle delays in + Azure cache updates and service availability issues. Args: function: Function instance to publish @@ -361,62 +448,56 @@ def publish_function( Raises: RuntimeError: If function publication fails or URL cannot be found. """ - success = False - url = "" self.logging.info("Attempting publish of function {}".format(function.name)) - while not success: - try: - ret = self.cli_instance.execute( - f"bash -c 'cd {container_dest} " - "&& func azure functionapp publish {} --{} --no-build'".format( - function.name, self.AZURE_RUNTIMES[code_package.language_name] - ) - ) - url = "" - ret_str = ret.decode("utf-8") - for line in ret_str.split("\n"): - if "Invoke url:" in line: - url = line.split("Invoke url:")[1].strip() - break - - # We failed to find the URL the normal way - # Sometimes, the output does not include functions. - if url == "": - self.logging.warning( - "Couldnt find function URL in the output: {}".format(ret.decode("utf-8")) - ) - self.logging.info("Sleeping 30 seconds before attempting another query.") + publish_cmd = ( + f"bash -c 'cd {container_dest} " + "&& func azure functionapp publish {} --{} --no-build'".format( + function.name, self.AZURE_RUNTIMES[code_package.language_name] + ) + ) + + # Execute publish command with retry if requested + if repeat_on_failure: + ret = self._execute_cli_with_retry(publish_cmd) + else: + ret = self.cli_instance.execute(publish_cmd) - resource_group = self.config.resources.resource_group(self.cli_instance) - ret = self.cli_instance.execute( - "az functionapp function show --function-name handler " - f"--name {function.name} --resource-group {resource_group}" - ) - try: - url = json.loads(ret.decode("utf-8"))["invokeUrlTemplate"] - except json.decoder.JSONDecodeError: - raise RuntimeError( - f"Couldn't find the function URL in {ret.decode('utf-8')}" - ) + self.logging.debug(f"Function app publish of {function.name}, ret {ret.decode('utf-8')}") + + # Extract URL from publish output + url = "" + ret_str = ret.decode("utf-8") + for line in ret_str.split("\n"): + if "Invoke url:" in line: + url = line.split("Invoke url:")[1].strip() + break + + # Fallback: query function details if URL not found in publish output + if url == "": + self.logging.warning( + "Couldn't find function URL in the publish output: {}".format(ret.decode("utf-8")) + ) + self.logging.info("Querying function details to retrieve URL") + + resource_group = self.config.resources.resource_group(self.cli_instance) + query_cmd = ( + "az functionapp function show --function-name handler " + f"--name {function.name} --resource-group {resource_group}" + ) + + # Use retry for the query as well if repeat_on_failure is enabled + if repeat_on_failure: + ret = self._execute_cli_with_retry(query_cmd) + else: + ret = self.cli_instance.execute(query_cmd) + + self.logging.debug(f"Function query for {function.name}! Return {ret.decode('utf-8')}") + try: + url = json.loads(ret.decode("utf-8"))["invokeUrlTemplate"] + except json.decoder.JSONDecodeError: + raise RuntimeError(f"Couldn't find the function URL in {ret.decode('utf-8')}") - success = True - except RuntimeError as e: - error = str(e) - # app not found - # Azure changed the description as some point - if ("find app with name" in error or "NotFound" in error) and repeat_on_failure: - # Sleep because of problems when publishing immediately - # after creating function app. - time.sleep(30) - self.logging.info( - "Sleep 30 seconds for Azure to register function app {}".format( - function.name - ) - ) - # escape loop. we failed! - else: - raise e return url def update_function( @@ -587,6 +668,43 @@ def update_function_configuration(self, function: Function, code_package: Benchm "Updating function's memory and timeout configuration is not supported." ) + def delete_function(self, func_name: str) -> None: + """Delete an Azure Function App and its associated storage account. + + Args: + func_name: Name of the Azure Function App to delete + """ + self.logging.info(f"Deleting function app {func_name}") + + """ + For Azure, we need to retrieve the associated storage account. + Each function has its own storage account. + """ + all_functions = self.cache_client.get_all_functions(self.name()) + if func_name not in all_functions: + self.logging.error( + f"Failed to find function {func_name} in functions: {all_functions.keys()}." + ) + raise RuntimeError(f"Failed to find function {func_name} in cache.") + + function = cast(AzureFunction, self.function_type().deserialize(all_functions[func_name])) + + try: + self.cli_instance.execute( + f"az functionapp delete --name {func_name} " + f"--resource-group {self.config.resources.resource_group(self.cli_instance)}" + ) + self.logging.info(f"Function app {func_name} deleted successfully") + except RuntimeError as e: + self.logging.error(f"Failed to delete the function app {func_name}!") + raise e + + self.logging.info( + f"Deleting storage account {function.function_storage.account_name} " + f"associated with function {func_name}" + ) + self.config.resources.delete_storage_account(self.cli_instance, function.function_storage) + def _mount_function_code(self, code_package: Benchmark) -> str: """Mount function code package in Azure CLI container. @@ -709,7 +827,7 @@ def create_function( while True: try: # create function app - self.cli_instance.execute( + ret = self.cli_instance.execute( ( " az functionapp create --resource-group {resource_group} " " --os-type Linux --consumption-plan-location {region} " @@ -718,6 +836,7 @@ def create_function( " --functions-version 4 " ).format(**config) ) + self.logging.debug(f"Function app {func_name}, ret {ret.decode('utf-8')}") self.logging.info("Azure: Created function app {}".format(func_name)) break except RuntimeError as e: @@ -728,7 +847,7 @@ def create_function( ) # Rethrow -> another error else: - raise + raise e from None function = AzureFunction( name=func_name, benchmark=code_package.benchmark, diff --git a/sebs/azure/cli.py b/sebs/azure/cli.py index a1eade42..7719f9fa 100644 --- a/sebs/azure/cli.py +++ b/sebs/azure/cli.py @@ -129,6 +129,8 @@ def execute(self, cmd: str) -> bytes: RuntimeError: If command execution fails. """ exit_code, out = self.docker_instance.exec_run(cmd, user="docker_user") + # exec_run without stream=True always returns bytes + assert isinstance(out, bytes) if exit_code != 0: raise RuntimeError( "Command {} failed at Azure CLI docker!\n Output {}".format( diff --git a/sebs/azure/config.py b/sebs/azure/config.py index 44f1d4f2..2c5fb5e6 100644 --- a/sebs/azure/config.py +++ b/sebs/azure/config.py @@ -11,6 +11,7 @@ AzureResources: Manages Azure resource allocation and lifecycle AzureConfig: Combines credentials and resources for Azure deployment """ +from __future__ import annotations import json import logging @@ -444,6 +445,35 @@ def delete_resource_group(self, cli_instance: AzureCLI, name: str, wait: bool = self.logging.error(ret.decode()) raise RuntimeError("Failed to delete the resource group!") + def delete_storage_account( + self, cli_instance: AzureCLI, account: AzureResources.Storage + ) -> None: + """Delete Azure storage account. + + Args: + cli_instance: Azure CLI instance for executing deletion + account: Storage account to delete + + Raises: + RuntimeError: If deletion fails. + """ + + storage_account_name = account.account_name + try: + cli_instance.execute( + f"az storage account delete --name {storage_account_name} " + f"--resource-group {self._resource_group} --yes" + ) + self.logging.info(f"Storage account {storage_account_name} deleted successfully.") + + # delete the account from our list + self._storage_accounts = [ + acc for acc in self._storage_accounts if acc.account_name != storage_account_name + ] + except RuntimeError as e: + self.logging.error(f"Failed to delete the storage account {storage_account_name}!") + self.logging.error(f"Error: {e}") + def cosmosdb_account(self, cli_instance: AzureCLI) -> CosmosDBAccount: """Get or create CosmosDB account for NoSQL storage. @@ -600,7 +630,7 @@ def _create_storage_account( """Internal method to create storage account. Creates a new Azure storage account with the specified name. - This one can be usedboth for data storage and function storage. + This one can be used both for data storage and function storage. This method does NOT update cache or add to resource collections. Args: @@ -610,6 +640,23 @@ def _create_storage_account( Returns: New Storage instance for the created account. """ + + resource_group = self.resource_group(cli_instance) + ret = cli_instance.execute( + f"az storage account list --resource-group {resource_group} " + f"--query \"[?starts_with(name,'{account_name}') && location=='{self._region}']\"" + ) + try: + resp = json.loads(ret) + if len(resp) > 0: + self.logging.info(f"Using existing storage account {account_name}") + """ + List does not return connection string, so we need to query it separately. + """ + return AzureResources.Storage.from_allocation(account_name, cli_instance) + except json.JSONDecodeError: + pass + sku = "Standard_LRS" self.logging.info("Starting allocation of storage account {}.".format(account_name)) cli_instance.execute( @@ -674,8 +721,7 @@ def serialize(self) -> dict: Dictionary containing all resource configuration data. """ out = super().serialize() - if len(self._storage_accounts) > 0: - out["storage_accounts"] = [x.serialize() for x in self._storage_accounts] + out["storage_accounts"] = [x.serialize() for x in self._storage_accounts] if self._resource_group: out["resource_group"] = self._resource_group if self._cosmosdb_account: diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 3a94ee07..9576713f 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -578,6 +578,7 @@ def __init__( output_dir: str, cache_client: Cache, docker_client: docker.client.DockerClient, + verbose: bool = False, ): """ Initialize a Benchmark instance. @@ -594,6 +595,7 @@ def __init__( output_dir: Directory for output files cache_client: Cache client for caching code packages docker_client: Docker client for building dependencies + verbose: Print verbose build logs. Raises: RuntimeError: If the benchmark is not found or doesn't support the language @@ -608,6 +610,7 @@ def __init__( self._language_variant = config.runtime.variant.value self._architecture = self._experiment_config.architecture self._container_deployment = config.container_deployment + self._verbose = verbose benchmark_path = find_benchmark(self.benchmark, "benchmarks") if not benchmark_path: @@ -1158,22 +1161,33 @@ def directory_size(directory: str) -> int: def builder_image_name(self) -> Tuple[str, str]: """Image names of builder Docker images for preparing benchmarks. - We are progressively replacing all unversioned image names with versioned ones. + Returns two image names for fallback behavior: + - Current version image (tagged with current SeBS version) + - Previous version image (tagged with previous major SeBS version) + + This allows new SeBS versions to use images from the previous stable + version without requiring a complete rebuild of all images. Returns: - Tuple of unversioned and versioned image names. + Tuple of (previous_version_image_name, current_version_image_name). """ - unversioned_image_name = "build.{deployment}.{language}.{runtime}".format( + base_image_name = "build.{deployment}.{language}.{runtime}".format( deployment=self._deployment_name, language=self.language_name, runtime=self.language_version, ) - image_name = "{base_image_name}-{sebs_version}".format( - base_image_name=unversioned_image_name, - sebs_version=self._system_config.version(), + # Current version image (try this first) + current_version_image_name = "{base}-{version}".format( + base=base_image_name, + version=self._system_config.version(), + ) + # Previous major version image (fallback) + previous_version_image_name = "{base}-{version}".format( + base=base_image_name, + version=self._system_config.previous_version(), ) - return unversioned_image_name, image_name + return previous_version_image_name, current_version_image_name def install_dependencies(self, output_dir: str) -> None: """Install benchmark dependencies using Docker. @@ -1184,10 +1198,11 @@ def install_dependencies(self, output_dir: str) -> None: Pulls a pre-built Docker image specific to the deployment, language, and runtime version. Mounts the output directory into the container and runs an installer script (`/sebs/installer.sh`) within the container. - Handles fallbacks to unversioned Docker images if versioned ones are not found. - Supports copying files to/from Docker for environments where volume mounting - is problematic (e.g., CircleCI). + Tries current SeBS version image first, falls back to previous major version + image if the current version image is not available. This allows new SeBS + versions to use images from the previous stable version without requiring + a complete rebuild. Args: output_dir: Directory containing the code package to build @@ -1208,7 +1223,7 @@ def install_dependencies(self, output_dir: str) -> None: ) else: repo_name = self._system_config.docker_repository() - unversioned_image_name, image_name = self.builder_image_name() + previous_version_image_name, current_version_image_name = self.builder_image_name() def ensure_image(name: str) -> None: """Internal implementation of checking for Docker image existence. @@ -1217,7 +1232,7 @@ def ensure_image(name: str) -> None: name: image name Raises: - RuntimeError: when neither versioned nor unversioned images exists. + RuntimeError: when image does not exist locally or cannot be pulled. """ try: self._docker_client.images.get(repo_name + ":" + name) @@ -1232,33 +1247,34 @@ def ensure_image(name: str) -> None: "Docker pull of image {}:{} failed!".format(repo_name, name) ) + # Try current version image first, fallback to previous version if not available + image_name = current_version_image_name try: - ensure_image(image_name) + ensure_image(current_version_image_name) except RuntimeError as e: self.logging.warning( "Failed to ensure image {}, falling back to {}: {}".format( - image_name, unversioned_image_name, e + current_version_image_name, previous_version_image_name, e ) ) try: - ensure_image(unversioned_image_name) + ensure_image(previous_version_image_name) + # update `image_name` to the fallback image name + image_name = previous_version_image_name except RuntimeError: raise - # update `image_name` in the context to the fallback image name - image_name = unversioned_image_name - - # Create set of mounted volumes unless Docker volumes are disabled - if not self._experiment_config.check_flag("docker_copy_build_files"): - volumes = {os.path.abspath(output_dir): {"bind": "/mnt/function", "mode": "rw"}} - package_script = os.path.abspath( - os.path.join(self._benchmark_path, self.language_name, "package.sh") - ) - # does this benchmark has package.sh script? - if os.path.exists(package_script): - volumes[package_script] = { - "bind": "/mnt/function/package.sh", - "mode": "ro", - } + + # Create set of mounted volumes + volumes = {os.path.abspath(output_dir): {"bind": "/mnt/function", "mode": "rw"}} + package_script = os.path.abspath( + os.path.join(self._benchmark_path, self.language_name, "package.sh") + ) + # does this benchmark has package.sh script? + if os.path.exists(package_script): + volumes[package_script] = { + "bind": "/mnt/function/package.sh", + "mode": "ro", + } # run Docker container to install packages PACKAGE_FILES = { @@ -1274,91 +1290,50 @@ def ensure_image(name: str) -> None: "Docker build of benchmark dependencies in container " "of image {repo}:{image}".format(repo=repo_name, image=image_name) ) - uid = os.getuid() - # Standard, simplest build - if not self._experiment_config.check_flag("docker_copy_build_files"): - self.logging.info( - "Docker mount of benchmark code from path {path}".format( - path=os.path.abspath(output_dir) - ) - ) - container = self._docker_client.containers.run( - "{}:{}".format(repo_name, image_name), - volumes=volumes, - environment={ - "CONTAINER_UID": str(os.getuid()), - "CONTAINER_GID": str(os.getgid()), - "CONTAINER_USER": "docker_user", - "APP": self.benchmark, - "PLATFORM": self._deployment_name.upper(), - "TARGET_ARCHITECTURE": self._experiment_config._architecture, - }, - remove=False, - detach=True, - ) - try: - exit_code = container.wait() - stdout = container.logs() - if exit_code["StatusCode"] != 0: - error_log_path = os.path.join(output_dir, "error.log") - with open(error_log_path, "wb") as error_file: - error_file.write(stdout) - self.logging.error( - f"Build failed! Container exited with " - f"code {exit_code['StatusCode']}" - ) - self.logging.error(f"Logs saved to {error_log_path}") - raise RuntimeError("Package build failed!") - finally: - container.remove() - # Hack to enable builds on platforms where Docker mounted volumes - # are not supported. Example: CircleCI docker environment - else: - container = self._docker_client.containers.run( - "{}:{}".format(repo_name, image_name), - environment={"APP": self.benchmark}, - # user="1000:1000", - user=uid, - remove=True, - detach=True, - tty=True, - command="/bin/bash", + self.logging.info( + "Docker mount of benchmark code from path {path}".format( + path=os.path.abspath(output_dir) ) - # copy application files - import tarfile + ) + container = self._docker_client.containers.run( + "{}:{}".format(repo_name, image_name), + volumes=volumes, + environment={ + "CONTAINER_UID": str(os.getuid()), + "CONTAINER_GID": str(os.getgid()), + "CONTAINER_USER": "docker_user", + "APP": self.benchmark, + "PLATFORM": self._deployment_name.upper(), + "TARGET_ARCHITECTURE": self._experiment_config._architecture, + }, + remove=False, + detach=True, + ) + try: + exit_code = container.wait() + stdout = container.logs() + + error_log_path: str = "" + if exit_code["StatusCode"] != 0: + error_log_path = os.path.join(output_dir, "error.log") + elif self._verbose: + error_log_path = os.path.join(output_dir, "build.log") + + if exit_code["StatusCode"] != 0 or self._verbose: + with open(error_log_path, "wb") as error_file: + error_file.write(stdout) + + if exit_code["StatusCode"] != 0: + self.logging.error( + f"Build failed! Container exited with " + f"code {exit_code['StatusCode']}" + ) + self.logging.error(f"Logs saved to {error_log_path}") + raise RuntimeError("Package build failed!") - self.logging.info( - "Send benchmark code from path {path} to " - "Docker instance".format(path=os.path.abspath(output_dir)) - ) - tar_archive = os.path.join(output_dir, os.path.pardir, "function.tar") - with tarfile.open(tar_archive, "w") as tar: - for f in os.listdir(output_dir): - tar.add(os.path.join(output_dir, f), arcname=f) - with open(tar_archive, "rb") as data: - container.put_archive("/mnt/function", data.read()) - # do the build step - exit_code, stdout = container.exec_run( - cmd="/bin/bash /sebs/installer.sh", - user="docker_user", - stdout=True, - stderr=True, - ) - # copy updated code with package - data, stat = container.get_archive("/mnt/function") - with open(tar_archive, "wb") as output_filef: - for chunk in data: - output_filef.write(chunk) - with tarfile.open(tar_archive, "r") as tar: - tar.extractall(output_dir) - # docker packs the entire directory with basename function - for f in os.listdir(os.path.join(output_dir, "function")): - shutil.move( - os.path.join(output_dir, "function", f), - os.path.join(output_dir, f), - ) - shutil.rmtree(os.path.join(output_dir, "function")) - container.stop() + self.logging.debug(f"Build Build logs saved to {error_log_path}") + finally: + container.remove() # Pass to output information on optimizing builds. # Useful for AWS where packages have to obey size limits. @@ -1475,7 +1450,53 @@ def build( assert container_client is not None repo_name = self._system_config.docker_repository() - _, image_name = self.builder_image_name() + previous_version_image_name, current_version_image_name = self.builder_image_name() + + # Try current version build image first, fallback to previous version + image_name = current_version_image_name + current_available = False + try: + self._docker_client.images.get(f"{repo_name}:{current_version_image_name}") + current_available = True + except Exception: + # Not available locally, try to pull it + try: + self.logging.info( + f"Docker pull of build image {repo_name}:{current_version_image_name}" + ) + self._docker_client.images.pull(repo_name, current_version_image_name) + current_available = True + except Exception: + pass + + if not current_available: + # Current version not available, try previous version + try: + self._docker_client.images.get(f"{repo_name}:{previous_version_image_name}") + image_name = previous_version_image_name + self.logging.info( + f"Using previous version build image {previous_version_image_name} " + "(current version not available)" + ) + except Exception: + # Previous version not local, try to pull it + try: + self.logging.info( + f"Docker pull of build image {repo_name}:{previous_version_image_name}" + ) + self._docker_client.images.pull(repo_name, previous_version_image_name) + image_name = previous_version_image_name + self.logging.info( + f"Using previous version build image {previous_version_image_name} " + "(current version not available)" + ) + except Exception: + # Neither version available - use current and let build fail + self.logging.warning( + f"Neither current ({current_version_image_name}) nor previous " + f"({previous_version_image_name}) version build image available, " + "build may fail" + ) """ Generate custom Dockerfile for C++ benchmarks @@ -1496,6 +1517,10 @@ def build( self._benchmark_config._cpp_dependencies, dockerfile_template, self._system_config.version(), + previous_version=self._system_config.previous_version(), + docker_client=self._docker_client, + docker_repository=self._system_config.docker_repository(), + logger=self.logging, ) dockerfile_path = os.path.join(self._output_dir, "Dockerfile") with open(dockerfile_path, "w") as f: diff --git a/sebs/cli.py b/sebs/cli.py index 4e1cc558..e78396ce 100755 --- a/sebs/cli.py +++ b/sebs/cli.py @@ -11,6 +11,7 @@ import logging import functools import os +import sys import traceback from typing import cast, List, Optional @@ -455,33 +456,55 @@ def package( multiple=True, help="JSON configuration of deployed storage.", ) -@common_params @click.option( - "--cache", - default=os.path.join(os.path.curdir, "regression-cache"), - help="Location of experiments cache.", + "--selected-architecture/--all-architectures", + type=bool, + default=False, + help="Skip non-selected CPU architectures.", ) @click.option( - "--output-dir", - default=os.path.join(os.path.curdir, "regression-output"), - help="Output directory for results.", + "--filter-output/--no-filter-output", + type=bool, + default=False, + help="Filter resource IDs and URls from output.", ) -def regression(benchmark_input_size, benchmark_name, storage_configuration, **kwargs): +@common_params +def regression( + benchmark_input_size, + benchmark_name, + storage_configuration, + selected_architecture, + filter_output, + **kwargs, +): """Run regression test suite across benchmarks.""" + # for regression, deployment client is initialized locally # disable default initialization + + from pathlib import Path + + if Path(kwargs["cache"]) == Path("cache"): + kwargs["cache"] = os.path.join(os.path.curdir, "regression-cache") + (config, output_dir, logging_filename, sebs_client, _) = parse_common_params( initialize_deployment=False, storage_configuration=storage_configuration, **kwargs, ) - regression_suite( + architecture = config["experiments"]["architecture"] if selected_architecture else None + has_failures = regression_suite( sebs_client, config["experiments"], set((config["deployment"]["name"],)), config, + kwargs["resource_prefix"], benchmark_name, + architecture, + filter_output, ) + # Exit with non-zero code if any tests failed + sys.exit(1 if has_failures else 0) @cli.group() @@ -886,9 +909,19 @@ def resources_remove(resource, prefix, wait, dry_run, **kwargs): default=False, help="Simulate run without actual deletions.", ) +@click.option( + "--resource-type", + type=click.Choice(["functions"]), + default=None, + help="Clean up only specific resource type. If not specified, cleans up all resources.", +) @common_params -def resources_cleanup(resources_id, dry_run, **kwargs): - """Clean up cloud resources created by SeBS experiments.""" +def resources_cleanup(resources_id, dry_run, resource_type, **kwargs): + """Clean up cloud resources created by SeBS experiments. + + By default, cleans up all resources (functions, storage, etc.). + Use --resource-type to clean up only specific resource types. + """ ( config, output_dir, @@ -898,10 +931,20 @@ def resources_cleanup(resources_id, dry_run, **kwargs): ) = parse_common_params(**kwargs) try: - result = deployment_client.cleanup_resources(dry_run=dry_run) - total = sum(len(v) for v in result.values()) - action = "found" if dry_run else "deleted" - sebs_client.logging.info(f"Total resources {action}: {total}") + if resource_type == "functions": + # Clean up only functions + deleted_functions = deployment_client.cleanup_functions(dry_run=dry_run) + action = "found" if dry_run else "deleted" + sebs_client.logging.info(f"Total functions {action}: {len(deleted_functions)}") + if deleted_functions: + for func_name in deleted_functions: + sebs_client.logging.info(f" - {func_name}") + else: + # Clean up all resources (original behavior) + result = deployment_client.cleanup_resources(dry_run=dry_run) + total = sum(len(v) for v in result.values()) + action = "found" if dry_run else "deleted" + sebs_client.logging.info(f"Total resources {action}: {total}") except NotImplementedError as e: sebs_client.logging.error(str(e)) @@ -944,6 +987,12 @@ def docker_cmd(): type=str, help="Optional Docker platform (e.g., linux/amd64) to override host architecture.", ) +@click.option( + "--multi-platform/--no-multi-platform", + default=False, + type=bool, + help="When true, build multi-platform images (requires QEMU support)", +) @click.option( "--dependency-type", default=None, @@ -959,6 +1008,7 @@ def docker_build( language_version, architecture, platform, + multi_platform, dependency_type, parallel, verbose, @@ -982,6 +1032,7 @@ def docker_build( architecture=architecture, dependency_type=dependency_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) diff --git a/sebs/config.py b/sebs/config.py index c5d3fe9b..6f66e888 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -213,6 +213,18 @@ def version(self) -> str: """ return self._system_config["general"].get("SeBS_version", "unknown") + def previous_version(self) -> str: + """Get the previous major SeBS framework version. + + This is used as a fallback version for Docker images that haven't been + rebuilt for the current version. It allows new SeBS versions to use + images from the previous stable version without requiring a full rebuild. + + Returns: + str: The previous major version string, or 'unknown' if not configured. + """ + return self._system_config["general"].get("previous_major_version", "unknown") + def docker_image_name( self, system: str, diff --git a/sebs/cpp_dependencies.py b/sebs/cpp_dependencies.py index 58673ca0..6cdb8cd2 100644 --- a/sebs/cpp_dependencies.py +++ b/sebs/cpp_dependencies.py @@ -255,6 +255,10 @@ def generate_dockerfile( cpp_dependencies: list[CppDependencies], dockerfile_template: str, sebs_version: str, + previous_version: str | None = None, + docker_client=None, + docker_repository: str | None = None, + logger=None, ) -> str: """ Generate a custom Dockerfile for C++ Lambda functions with selective dependencies. @@ -263,9 +267,17 @@ def generate_dockerfile( 1. FROM statements for required dependency images 2. COPY statements to only copy needed libraries from each dependency. + Supports version fallback: if a dependency image doesn't exist with the current + SeBS version, falls back to the previous major version. + Args: cpp_dependencies: List of explicit dependencies from benchmark config dockerfile_template: Content of Dockerfile.function template + sebs_version: Current SeBS version + previous_version: Previous major SeBS version for fallback + docker_client: Docker client for checking image availability + docker_repository: Docker repository name + logger: Logger instance for logging fallback information Returns: Complete Dockerfile content with placeholders replaced @@ -276,9 +288,61 @@ def generate_dockerfile( from_statements = [] for dep in required_deps: config = dep_dict[dep] + + # Determine which version to use (current or previous fallback) + version_to_use = sebs_version + if docker_client and docker_repository and previous_version: + current_image_tag = f"{config.docker_img}-{sebs_version}" + previous_image_tag = f"{config.docker_img}-{previous_version}" + current_image = f"{docker_repository}:{current_image_tag}" + previous_image = f"{docker_repository}:{previous_image_tag}" + + # Try current version first (check locally, then pull) + current_available = False + try: + docker_client.images.get(current_image) + current_available = True + except Exception: + # Not available locally, try to pull it + try: + docker_client.images.pull(docker_repository, current_image_tag) + current_available = True + except Exception: + pass + + if not current_available: + # Current version not available, try previous version + try: + docker_client.images.get(previous_image) + version_to_use = previous_version + if logger: + logger.info( + f"Using previous version {previous_version} for " + f"dependency {dep.value} (current version not available)" + ) + except Exception: + # Previous version not local, try to pull it + try: + docker_client.images.pull(docker_repository, previous_image_tag) + version_to_use = previous_version + if logger: + logger.info( + f"Using previous version {previous_version} for " + f"dependency {dep.value} (current version not available)" + ) + except Exception: + # Neither version available - use current and let build fail + if logger: + logger.warning( + f"Neither current ({sebs_version}) nor previous " + f"({previous_version}) version available for " + f"dependency {dep.value}, build may fail" + ) + pass + # Use the short name (e.g., "sdk") as the stage alias from_statements.append( - f"FROM ${{BASE_REPOSITORY}}:{config.docker_img}-{sebs_version} as {dep.value}" + f"FROM ${{BASE_REPOSITORY}}:{config.docker_img}-{version_to_use} as {dep.value}" ) copy_statements = [] diff --git a/sebs/docker_builder.py b/sebs/docker_builder.py index 420ad446..de81fa5b 100644 --- a/sebs/docker_builder.py +++ b/sebs/docker_builder.py @@ -17,7 +17,7 @@ from rich.progress import Progress, TaskID from sebs.config import SeBSConfig -from sebs.utils import LoggingBase +from sebs.utils import execute, LoggingBase class DockerImageBuilder(LoggingBase): @@ -77,6 +77,169 @@ def __init__( else: self.docker_client = docker_client + def _should_use_multiplatform( + self, system: str, image_type: str, language: Optional[str] = None + ) -> bool: + """Check if multi-platform build should be used for this image. + + Multi-platform builds (x64 + arm64) are only enabled for: + - AWS platform + - Build images only + - Python or Node.js languages + + Args: + system: Deployment platform (e.g., 'aws', 'gcp') + image_type: Type of image (e.g., 'build', 'run') + language: Programming language (e.g., 'python', 'nodejs') + + Returns: + True if multi-platform build should be used, False otherwise + """ + return ( + system == "aws" and image_type == "build" and language in ["python", "nodejs", "java"] + ) + + def _execute_multiplatform_build( + self, + system: str, + language: str, + version: str, + image_name: str, + dockerfile: str, + buildargs: dict, + ) -> bool: + """Execute multi-platform build by building separate images and stitching them. + + Builds separate images for linux/amd64 and linux/arm64 with their respective + BASE_IMAGE values, then creates a multi-platform manifest to combine them. + + Args: + system: Deployment platform (e.g., 'aws') + language: Programming language (e.g., 'python', 'nodejs') + version: Language version (e.g., '3.11', '16') + image_name: Docker image tag (without architecture suffix) + dockerfile: Path to Dockerfile + buildargs: Base build args (BASE_IMAGE will be overridden per arch) + + Returns: + True if build succeeded, False otherwise + """ + self.logging.info(f"Building multi-platform image: {image_name}") + self.logging.info("Platforms: linux/amd64, linux/arm64") + self.logging.debug(f"Dockerfile: {dockerfile}") + + # Get architecture-specific base images from config + systems_config = self.config._system_config + language_config = systems_config[system]["languages"][language] + + if "base_images" not in language_config: + self.logging.error(f"No base_images found for {system}/{language}") + return False + + base_images = language_config["base_images"] + if "x64" not in base_images or "arm64" not in base_images: + self.logging.error(f"Missing x64 or arm64 base images for {system}/{language}") + return False + + if version not in base_images["x64"] or version not in base_images["arm64"]: + self.logging.error(f"Version {version} not found in base images") + return False + + amd64_base = base_images["x64"][version] + arm64_base = base_images["arm64"][version] + + self.logging.debug(f"AMD64 base image: {amd64_base}") + self.logging.debug(f"ARM64 base image: {arm64_base}") + + # Architecture-specific image tags + amd64_image = f"{image_name}-amd64" + arm64_image = f"{image_name}-arm64" + + # Build both architecture images + for platform, base_image, arch_image in [ + ("linux/amd64", amd64_base, amd64_image), + ("linux/arm64", arm64_base, arm64_image), + ]: + self.logging.info(f"Building {platform} image: {arch_image}") + + # Override BASE_IMAGE for this architecture + arch_buildargs = buildargs.copy() + arch_buildargs["BASE_IMAGE"] = base_image + + # Build command + cmd = [ + "docker", + "build", + "--platform", + platform, + "--provenance", + "false", + "-f", + dockerfile, + "-t", + arch_image, + "--push", + ] + + # Add build args + for key, value in arch_buildargs.items(): + cmd.extend(["--build-arg", f"{key}={value}"]) + + # Add context path + cmd.append(str(self.project_dir)) + + try: + execute(cmd, cwd=str(self.project_dir)) + self.logging.info(f"Successfully built {arch_image}") + except RuntimeError as exc: + self.logging.error(f"Build failed for {arch_image}") + self.logging.error(f"Exit: {exc}") + return False + + self.logging.info(f"Removing old multi-platform manifest: {image_name}") + manifest_cmd = [ + "docker", + "manifest", + "rm", + image_name, + ] + try: + execute(manifest_cmd) + self.logging.info(f"Successfully removed old manifest {image_name}") + except RuntimeError: + # ignore, manifest might not have existed + pass + + # Create multi-platform manifest + self.logging.info(f"Creating multi-platform manifest: {image_name}") + manifest_cmd = [ + "docker", + "manifest", + "create", + image_name, + amd64_image, + arm64_image, + ] + try: + execute(manifest_cmd) + self.logging.info(f"Successfully created manifest: {image_name}") + except RuntimeError as exc: + self.logging.error(f"Manifest creation failed for {image_name}") + self.logging.error(f"Exit: {exc}") + return False + + self.logging.info(f"Pushing multi-platform manifest: {image_name}") + push_cmd = ["docker", "manifest", "push", image_name] + try: + execute(push_cmd) + self.logging.info(f"Successfully created manifest: {image_name}") + except RuntimeError as exc: + self.logging.error(f"Manifest creation failed for {image_name}") + self.logging.error(f"Exit: {exc}") + return False + + return True + def _execute_build( self, system: str, @@ -85,11 +248,15 @@ def _execute_build( version: Optional[str] = None, version_name: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> bool: """ Execute the actual build operation for a single image. + For AWS Python/Node.js build images, uses multi-platform build (x64 + arm64). + For all other images, uses standard single-platform build. + Args: system: Deployment platform image_type: Type of image @@ -97,11 +264,13 @@ def _execute_build( version: Language version, optional version_name: Base image name for the version, optional platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 Returns: True if build succeeded, False otherwise """ + # Locate dockerfile if language: dockerfile = os.path.join( @@ -127,6 +296,16 @@ def _execute_build( if version_name: buildargs["BASE_IMAGE"] = version_name + # Check if multi-platform build should be used + if multi_platform and self._should_use_multiplatform(system, image_type, language): + if version is None or version_name is None or language is None: + self.logging.error("Multi-platform build requires version and language") + return False + return self._execute_multiplatform_build( + system, language, version, image_name, dockerfile, buildargs + ) + + # Standard single-platform build platform_arg = platform or os.environ.get("DOCKER_DEFAULT_PLATFORM") self.logging.debug(f"Building {image_name} from {dockerfile}") @@ -162,6 +341,9 @@ def _execute_push( """ Execute the actual push operation for a single image. + Multi-platform images (AWS Python/Node.js build images) are skipped + as they are already pushed during the build process. + Args: system: Deployment platform image_type: Type of image @@ -172,6 +354,14 @@ def _execute_push( Returns: True if push succeeded, False otherwise """ + # Skip multi-platform images - they're already pushed during build + if self._should_use_multiplatform(system, image_type, language): + image_name = self.config.docker_image_name(system, image_type, language, version) + self.logging.info( + f"Skipping push for multi-platform image (already pushed): {image_name}" + ) + return True + # Full Docker image tag image_name = self.config.docker_image_name(system, image_type, language, version) @@ -210,6 +400,7 @@ def _process_image( version: Optional[str] = None, version_name: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> bool: """ @@ -223,6 +414,7 @@ def _process_image( version: Language version, optional version_name: Base image name for the version, optional platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 Returns: @@ -239,7 +431,14 @@ def _process_image( # Execute the appropriate operation if operation == "build": return self._execute_build( - system, image_type, language, version, version_name, platform, parallel + system, + image_type, + language, + version, + version_name, + platform, + multi_platform, + parallel, ) elif operation == "push": return self._execute_push(system, image_type, language, version) @@ -256,6 +455,7 @@ def _process_language( language_version: Optional[str] = None, image_type: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> None: """ @@ -270,6 +470,7 @@ def _process_language( language_version: Specific version to process, processes all if None image_type: Specific image type to process, processes all if None platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 """ # Maps to language_version and Docker base image for that version @@ -300,6 +501,7 @@ def _process_language( language, *image_config, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) else: @@ -310,6 +512,7 @@ def _process_language( language, *image_config, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) @@ -324,6 +527,7 @@ def _process_system( architecture: str = "x64", dependency_type: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> None: """ @@ -339,6 +543,7 @@ def _process_system( architecture: Target architecture, default "x64" dependency_type: Specific dependency for cpp (opencv, boost, etc.), optional platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 """ if image_type == "manage": @@ -376,6 +581,7 @@ def _process_system( version, base_image, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) else: @@ -391,6 +597,7 @@ def _process_system( version, base_image, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) else: @@ -407,6 +614,7 @@ def _process_system( language_version=language_version, image_type=image_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) else: @@ -423,13 +631,19 @@ def _process_system( language_version=language_version, image_type=image_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) # No filters - process additional image types supported on the platform if "images" in system_config: for img_type, _ in system_config["images"].items(): self._process_image( - operation, system, img_type, platform=platform, parallel=parallel + operation, + system, + img_type, + platform=platform, + multi_platform=multi_platform, + parallel=parallel, ) def _process( @@ -442,6 +656,7 @@ def _process( architecture: str = "x64", dependency_type: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> None: """ @@ -459,6 +674,7 @@ def _process( architecture: Target architecture, default "x64" dependency_type: Specific dependency for cpp, optional platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 """ systems_config = self.config._system_config @@ -475,6 +691,7 @@ def _process( architecture=architecture, dependency_type=dependency_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) else: @@ -491,6 +708,7 @@ def _process( architecture=architecture, dependency_type=dependency_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) @@ -503,6 +721,7 @@ def build( architecture: str = "x64", dependency_type: Optional[str] = None, platform: Optional[str] = None, + multi_platform: bool = False, parallel: int = 1, ) -> None: """ @@ -522,6 +741,7 @@ def build( architecture: Target architecture, default "x64" dependency_type: Specific dependency for cpp, optional platform: Docker platform override, optional + multi_platform: If we should build both x64 and arm64 parallel: Number of parallel workers, default 1 """ self._process( @@ -533,6 +753,7 @@ def build( architecture=architecture, dependency_type=dependency_type, platform=platform, + multi_platform=multi_platform, parallel=parallel, ) diff --git a/sebs/faas/function.py b/sebs/faas/function.py index 651c1f65..f5602e17 100644 --- a/sebs/faas/function.py +++ b/sebs/faas/function.py @@ -553,6 +553,22 @@ class NodeJS(Enum): BUN = "bun" LLRT = "llrt" + class Java(Enum): + """Java runtime variants. + Currently only JDK. + """ + + DEFAULT = "default" + + class Cpp(Enum): + """Cpp runtime variants. + + Currently only one variant, + compiled with gcc. + """ + + DEFAULT = "default" + @classmethod def for_language(cls, language: Language, val: str) -> Enum: """Deserialize a variant string for the given language.""" @@ -574,6 +590,8 @@ def default(cls, language: Language) -> Enum: Variant._LANG_MAP = { Language.PYTHON: Variant.Python, Language.NODEJS: Variant.NodeJS, + Language.JAVA: Variant.Java, + Language.CPP: Variant.Cpp, } diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 17134542..72bf5b0d 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -198,7 +198,7 @@ def cleanup_resources(self, dry_run: bool = False) -> dict: """ raise NotImplementedError(f"Resource cleanup not implemented for {self.name()}") - def initialize_resources(self, select_prefix: Optional[str]): + def initialize_resources(self, select_prefix: Optional[str], quiet: bool = False): """ Initialize cloud resources for the deployment. @@ -213,9 +213,10 @@ def initialize_resources(self, select_prefix: Optional[str]): """ # User provided resources or found in cache if self.config.resources.has_resources_id: - self.logging.info( - f"Using existing resource name: {self.config.resources.resources_id}." - ) + if not quiet: + self.logging.info( + f"Using existing resource name: {self.config.resources.resources_id}." + ) return # Now search for existing resources @@ -225,15 +226,16 @@ def initialize_resources(self, select_prefix: Optional[str]): if select_prefix is not None: for dep in deployments: if select_prefix in dep: - self.logging.info( - f"Using existing deployment {dep} that matches prefix {select_prefix}!" - ) + if not quiet: + self.logging.info( + f"Using existing deployment {dep} that matches prefix {select_prefix}!" + ) self.config.resources.resources_id = dep return # We warn users that we create a new resource ID # They can use them with a new config - if len(deployments) > 0: + if len(deployments) > 0 and not quiet: self.logging.warning( f"We found {len(deployments)} existing deployments! " "If you want to use any of them, please abort, and " @@ -248,12 +250,18 @@ def initialize_resources(self, select_prefix: Optional[str]): else: res_id = str(uuid.uuid1())[0:8] self.config.resources.resources_id = res_id - self.logging.info(f"Generating unique resource name {res_id}") + if not quiet: + self.logging.info(f"Generating unique resource name {res_id}") # Ensure that the bucket is created - this allocates the new resource self.system_resources.get_storage().get_bucket(Resources.StorageBucketType.BENCHMARKS) - def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None): + def initialize( + self, + config: Dict[str, str] = {}, + resource_prefix: Optional[str] = None, + quiet: bool = False, + ): """ Initialize the system. diff --git a/sebs/gcp/cli.py b/sebs/gcp/cli.py index 964b3efa..b2b5fb5f 100644 --- a/sebs/gcp/cli.py +++ b/sebs/gcp/cli.py @@ -117,6 +117,8 @@ def execute(self, cmd: str) -> bytes: RuntimeError: If the command fails (non-zero exit code) """ exit_code, out = self.docker_instance.exec_run(cmd) + # exec_run without stream=True always returns bytes + assert isinstance(out, bytes) if exit_code != 0: raise RuntimeError( "Command {} failed at gcloud CLI docker!\n Output {}".format( diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index c33f3240..b419ae68 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -28,6 +28,7 @@ import docker import os import logging +import random import re import shutil import time @@ -69,6 +70,11 @@ class GCP(System): logging_handlers: Logging configuration for status reporting """ + """ + Google API is not the most robust - sometimes we need to retry REST operations. + """ + TRANSIENT_HTTP_CODES = frozenset({429, 503}) + def __init__( self, system_config: SeBSConfig, @@ -134,7 +140,10 @@ def function_type() -> "Type[Function]": return GCPFunction def initialize( - self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None + self, + config: Dict[str, str] = {}, + resource_prefix: Optional[str] = None, + quiet: bool = False, ) -> None: """Initialize the GCP system for function deployment and management. @@ -148,7 +157,7 @@ def initialize( resource_prefix: Optional prefix for resource naming to avoid conflicts """ self.function_client = build("cloudfunctions", "v1", cache_discovery=False) - self.initialize_resources(select_prefix=resource_prefix) + self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) def get_function_client(self): """Get the Google Cloud Functions API client. @@ -160,6 +169,76 @@ def get_function_client(self): """ return self.function_client + def _execute_with_retry( + self, + request, + max_retries: int = 5, + base_delay: float = 1.0, + max_delay: float = 32.0, + ) -> Dict: + """Execute a googleapiclient request with retry logic for transient errors. + + Handles transient HTTP errors (503, 429) by retrying with exponential backoff + and jitter. Non-transient errors are raised immediately without retry. + + Args: + request: googleapiclient request object to execute + max_retries: Maximum number of retry attempts (default: 5) + base_delay: Base delay in seconds for exponential backoff (default: 1.0) + max_delay: Maximum delay between retries in seconds (default: 32.0) + + Returns: + Response dictionary from the API call + + Raises: + HttpError: If the request fails with a non-transient error or after + exhausting all retry attempts + """ + attempt = 0 + last_error = None + + while attempt <= max_retries: + try: + result = request.execute() + if attempt > 0: + self.logging.info(f"Request succeeded after {attempt} retries") + return result + except HttpError as e: + status_code = e.resp.status + last_error = e + + # Only retry on transient errors + if status_code not in GCP.TRANSIENT_HTTP_CODES: + raise + + # Check if we have retries left + if attempt >= max_retries: + self.logging.error( + f"Max retries ({max_retries}) exhausted, failing with status {status_code}" + ) + raise + + # Calculate delay with exponential backoff and jitter + delay = min(base_delay * (2**attempt) + random.uniform(0, 1), max_delay) + + if attempt == 0: + self.logging.warning( + f"Transient error {status_code}, retrying " + f"(attempt {attempt + 1}/{max_retries})" + ) + else: + self.logging.info( + f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s backoff" + ) + + time.sleep(delay) + attempt += 1 + + # This should not be reached, but just in case + if last_error: + raise last_error + raise RuntimeError("Unexpected state in retry logic") + def default_function_name( self, code_package: Benchmark, resources: Optional[Resources] = None ) -> str: @@ -282,7 +361,7 @@ def _wait_for_build_and_poll( get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) - func_details = get_req.execute() + func_details = self._execute_with_retry(get_req) if "buildId" in func_details: previous_build_id = func_details["buildId"] except HttpError: @@ -303,7 +382,7 @@ def _wait_for_build_and_poll( get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) - func_details = get_req.execute() + func_details = self._execute_with_retry(get_req) # Check if there's a new build in progress if "buildId" in func_details: @@ -336,6 +415,9 @@ def _wait_for_active_status( After a build completes, the function may be in DEPLOY_IN_PROGRESS state for a short time. This function polls until the status becomes ACTIVE. + Furthermore, we handle HTTP errors: + * 503 / 429 transient backend errors — GCP Cloud Functions v1 + can periodically returns these; they are not deployment failures. Args: func_name: Name of the function to check @@ -352,18 +434,30 @@ def _wait_for_active_status( self.config.project_name, self.config.region, func_name ) begin = time.time() + last_status: Optional[str] = None self.logging.info(f"Waiting for function {func_name} to become ACTIVE...") while True: + + elapsed = time.time() - begin + if elapsed > timeout: + raise RuntimeError( + f"Timeout waiting for function {func_name} to become ACTIVE " + f"after {elapsed:.0f}s. Last status: {last_status}" + ) + get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) - func_details = get_req.execute() + func_details = self._execute_with_retry(get_req) status = func_details["status"] current_version = int(func_details["versionId"]) + if status != last_status: + last_status = status + if status == "ACTIVE": # Check version if specified if expected_version is not None and current_version != expected_version: @@ -382,12 +476,6 @@ def _wait_for_active_status( self.logging.error(f"Function {func_name} has unexpected status: {status}") raise RuntimeError(f"Function {func_name} deployment failed with status: {status}") - if time.time() - begin > timeout: - raise RuntimeError( - f"Timeout waiting for function {func_name} to become ACTIVE. " - f"Current status: {status}" - ) - time.sleep(2) def package_code( @@ -484,6 +572,44 @@ def package_code( bytes_size, ) + def _allow_public_access(self, func_name: str, full_func_name: str) -> None: + + """Configure GCP function to be publicly accessible. + + Args: + func_name: our function name + full_func_name: GCP name + + Raises: + RuntimeError: + """ + allow_unauthenticated_req = ( + self.function_client.projects() + .locations() + .functions() + .setIamPolicy( + resource=full_func_name, + body={ + "policy": { + "bindings": [ + { + "role": "roles/cloudfunctions.invoker", + "members": ["allUsers"], + } + ] + } + }, + ) + ) + try: + self._execute_with_retry(allow_unauthenticated_req) + except HttpError as e: + raise RuntimeError( + f"Failed to configure function {full_func_name} " + f"for unauthenticated invocations! Error: {e}" + ) + self.logging.info(f"Function {func_name} accepts now unauthenticated invocations!") + def create_function( self, code_package: Benchmark, @@ -542,7 +668,7 @@ def create_function( get_req = self.function_client.projects().locations().functions().get(name=full_func_name) try: - get_req.execute() + self._execute_with_retry(get_req) except HttpError: envs = self._generate_function_envs(code_package) @@ -572,7 +698,7 @@ def create_function( }, ) ) - create_req.execute() + self._execute_with_retry(create_req) self.logging.info( f"Function {func_name} is creating - GCP build&deployment is started!" ) @@ -585,46 +711,7 @@ def create_function( # Wait for deployment to become ACTIVE self._wait_for_active_status(func_name) - allow_unauthenticated_req = ( - self.function_client.projects() - .locations() - .functions() - .setIamPolicy( - resource=full_func_name, - body={ - "policy": { - "bindings": [ - { - "role": "roles/cloudfunctions.invoker", - "members": ["allUsers"], - } - ] - } - }, - ) - ) - - # Avoid infinite loop - MAX_RETRIES = 5 - counter = 0 - while counter < MAX_RETRIES: - try: - allow_unauthenticated_req.execute() - break - except HttpError: - - self.logging.info( - "Sleeping for 5 seconds because the created functions is not yet available!" - ) - time.sleep(5) - counter += 1 - else: - raise RuntimeError( - f"Failed to configure function {full_func_name} " - "for unauthenticated invocations!" - ) - - self.logging.info(f"Function {func_name} accepts now unauthenticated invocations!") + self._allow_public_access(func_name, full_func_name) function = GCPFunction( func_name, benchmark, code_package.hash, function_cfg, code_bucket @@ -640,6 +727,7 @@ def create_function( cfg=function_cfg, bucket=code_bucket, ) + self._allow_public_access(func_name, full_func_name) self.update_function(function, code_package, container_deployment, container_uri) # Add LibraryTrigger to a new function @@ -680,9 +768,9 @@ def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) get_req = ( self.function_client.projects().locations().functions().get(name=full_func_name) ) - func_details = get_req.execute() + func_details = self._execute_with_retry(get_req) invoke_url = func_details["httpsTrigger"]["url"] - self.logging.info(f"Function {function.name} - HTTP trigger ready at {invoke_url}") + self.logging.info(f"Function {function.name} - HTTP trigger URL: {invoke_url}") trigger = HTTPTrigger(invoke_url) else: @@ -781,7 +869,7 @@ def update_function( }, ) ) - res = req.execute() + res = self._execute_with_retry(req) self.logging.info(f"Function {function.name} code update initiated") @@ -815,7 +903,7 @@ def _update_envs(self, full_function_name: str, envs: Dict) -> Dict: get_req = ( self.function_client.projects().locations().functions().get(name=full_function_name) ) - response = get_req.execute() + response = self._execute_with_retry(get_req) # preserve old variables while adding new ones. # but for conflict, we select the new one @@ -916,7 +1004,7 @@ def update_function_configuration( ) ) - res = req.execute() + res = self._execute_with_retry(req) expected_version = int(res["metadata"]["versionId"]) self.logging.info(f"Function {function.name} configuration update initiated") @@ -928,6 +1016,31 @@ def update_function_configuration( return current_version + def delete_function(self, func_name: str) -> None: + """Delete a Google Cloud Function. + + Args: + func_name: Name of the function to delete + """ + self.logging.info(f"Deleting function {func_name}") + + full_func_name = GCP.get_full_function_name( + self.config.project_name, self.config.region, func_name + ) + + try: + delete_req = ( + self.function_client.projects().locations().functions().delete(name=full_func_name) + ) + self._execute_with_retry(delete_req) + self.logging.info(f"Function {func_name} deleted successfully") + except HttpError as e: + if e.resp.status == 404: + self.logging.error(f"Function {func_name} does not exist!") + else: + self.logging.error(f"Failed to delete function {func_name}: {e}") + raise + @staticmethod def get_full_function_name(project_name: str, location: str, func_name: str) -> str: """Generate the fully qualified function name for GCP API calls. @@ -1204,7 +1317,7 @@ def is_deployed(self, func_name: str, versionId: int = -1) -> Tuple[bool, int]: name = GCP.get_full_function_name(self.config.project_name, self.config.region, func_name) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) - status_res = status_req.execute() + status_res = self._execute_with_retry(status_req) if versionId == -1: return (status_res["status"] == "ACTIVE", status_res["versionId"]) else: @@ -1222,7 +1335,7 @@ def deployment_version(self, func: Function) -> int: name = GCP.get_full_function_name(self.config.project_name, self.config.region, func.name) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) - status_res = status_req.execute() + status_res = self._execute_with_retry(status_req) return int(status_res["versionId"]) @staticmethod diff --git a/sebs/gcp/triggers.py b/sebs/gcp/triggers.py index 5a855166..2eaaba41 100644 --- a/sebs/gcp/triggers.py +++ b/sebs/gcp/triggers.py @@ -129,7 +129,7 @@ def sync_invoke(self, payload: Dict) -> ExecutionResult: .call(name=full_func_name, body={"data": json.dumps(payload)}) ) begin = datetime.datetime.now() - res = req.execute() + res = self.deployment_client._execute_with_retry(req) end = datetime.datetime.now() gcp_result = ExecutionResult.from_times(begin, end) diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 5803906a..320510d7 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -105,7 +105,10 @@ def __init__( ) def initialize( - self, config: Dict[str, str] = {}, resource_prefix: Optional[str] = None + self, + config: Dict[str, str] = {}, + resource_prefix: Optional[str] = None, + quiet: bool = False, ) -> None: """ Initialize OpenWhisk system resources. @@ -114,7 +117,7 @@ def initialize( config: Additional configuration parameters (currently unused) resource_prefix: Optional prefix for resource naming """ - self.initialize_resources(select_prefix=resource_prefix) + self.initialize_resources(select_prefix=resource_prefix, quiet=quiet) @property def config(self) -> OpenWhiskConfig: diff --git a/sebs/regression.py b/sebs/regression.py index 53336a2a..7026f3f1 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -28,7 +28,7 @@ from typing import cast, Dict, Optional, Set, TYPE_CHECKING from sebs.faas.function import Trigger -from sebs.utils import ColoredWrapper +from sebs.utils import ColoredWrapper, SensitiveDataFilter, LoggingBase if TYPE_CHECKING: from sebs import SeBS @@ -80,6 +80,10 @@ # User-defined config passed during initialization, set in regression_suite() cloud_config: Optional[dict] = None +RESOURCE_PREFIX = "regr" +LOGGING_REDACTED = False +LOGGING_REDACTOR: SensitiveDataFilter = SensitiveDataFilter() + class TestSequenceMeta(type): """Metaclass for dynamically generating regression test cases. @@ -179,6 +183,9 @@ def test(self): logger = logging.getLogger(log_name) logger.setLevel(logging.INFO) logging_wrapper = ColoredWrapper(log_name, logger) + if LOGGING_REDACTED: + logger.addFilter(LOGGING_REDACTOR) + logging_wrapper.set_filter(LOGGING_REDACTOR) # Configure experiment settings self.experiment_config["architecture"] = architecture @@ -335,7 +342,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with AWSTestSequencePython.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) return deployment_client @@ -389,7 +405,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with AWSTestSequenceNodejs.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) return deployment_client @@ -432,7 +457,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): logging_filename=os.path.join(self.client.output_dir, f), ) with AWSTestSequenceCpp.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) return deployment_client @@ -483,7 +517,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): logging_filename=os.path.join(self.client.output_dir, f), ) with AWSTestSequenceJava.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.account_id, + ) return deployment_client @@ -540,12 +583,14 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): ) # Initialize Azure CLI if not already done + needs_login = False if not hasattr(AzureTestSequencePython, "cli"): from sebs.azure.cli import AzureCLI AzureTestSequencePython.cli = AzureCLI( self.client.config, self.client.docker_client ) + needs_login = True # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) @@ -563,9 +608,14 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Initialize CLI with login and setup resources deployment_client.system_resources.initialize_cli( - cli=AzureTestSequencePython.cli, login=True + cli=AzureTestSequencePython.cli, login=needs_login ) - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id(deployment_client.config.resources.resources_id) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id + ) return deployment_client @@ -619,12 +669,14 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): ) # Initialize Azure CLI if not already done + needs_login = False if not hasattr(AzureTestSequenceNodejs, "cli"): from sebs.azure.cli import AzureCLI AzureTestSequenceNodejs.cli = AzureCLI( self.client.config, self.client.docker_client ) + needs_login = True # Create a copy of the config and set architecture and deployment type config_copy = copy.deepcopy(cloud_config) @@ -641,8 +693,15 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): ) # Initialize CLI and setup resources (no login needed - reuses Python session) - deployment_client.system_resources.initialize_cli(cli=AzureTestSequenceNodejs.cli) - deployment_client.initialize(resource_prefix="regr") + deployment_client.system_resources.initialize_cli( + cli=AzureTestSequenceNodejs.cli, login=needs_login + ) + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id(deployment_client.config.resources.resources_id) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id + ) return deployment_client @@ -716,7 +775,12 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): deployment_client.system_resources.initialize_cli( cli=AzureTestSequenceJava.cli, login=needs_login ) - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id(deployment_client.config.resources.resources_id) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id + ) return deployment_client @@ -770,7 +834,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with GCPTestSequencePython.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) return deployment_client @@ -824,7 +897,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with GCPTestSequenceNodejs.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) return deployment_client @@ -878,7 +960,16 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with GCPTestSequenceJava.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) + if LOGGING_REDACTED: + LOGGING_REDACTOR.set_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) + LoggingBase.set_filtering_resource_id( + deployment_client.config.resources.resources_id, + deployment_client.config.credentials.project_name, + ) return deployment_client @@ -936,7 +1027,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with OpenWhiskTestSequencePython.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) return deployment_client @@ -994,7 +1085,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with OpenWhiskTestSequenceNodejs.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) return deployment_client @@ -1048,7 +1139,7 @@ def get_deployment(self, benchmark_name, architecture, deployment_type): # Synchronize resource initialization with a lock with OpenWhiskTestSequenceJava.lock: - deployment_client.initialize(resource_prefix="regr") + deployment_client.initialize(resource_prefix=RESOURCE_PREFIX, quiet=LOGGING_REDACTED) return deployment_client @@ -1123,6 +1214,7 @@ def filter_out_benchmarks( language_version: str, architecture: str, deployment_type: str, + selected_architecture: str | None = None, ) -> bool: """Filter out benchmarks that are not supported on specific platforms. @@ -1142,13 +1234,18 @@ def filter_out_benchmarks( """ # fmt: off + # user can asks to use only a selected architecture + if selected_architecture is not None and selected_architecture != architecture: + return False + # Arm architecture currently not supported for C++ if (language == "cpp" and architecture == "arm64"): return False # Filter out image recognition on newer Python versions on AWS if (deployment_name == "aws" and language == "python" - and language_version in ["3.9", "3.10", "3.11"]): + and language_version in ["3.9", "3.10", "3.11"] + and deployment_type == "package"): return "411.image-recognition" not in benchmark # C++ code package is too large for this benchmark @@ -1174,7 +1271,10 @@ def regression_suite( experiment_config: dict, providers: Set[str], deployment_config: dict, + resource_prefix: str | None = None, benchmark_name: Optional[str] = None, + selected_architecture: str | None = None, + filter_output: bool = False, ): """Create and run a regression test suite for specified cloud providers. @@ -1195,6 +1295,16 @@ def regression_suite( Raises: AssertionError: If a requested provider is not in the deployment config """ + + global RESOURCE_PREFIX + if resource_prefix is not None: + RESOURCE_PREFIX = resource_prefix + + global LOGGING_REDACTED + if filter_output: + LOGGING_REDACTED = True + LoggingBase.enable_filtering() + # Create the test suite suite = unittest.TestSuite() @@ -1279,6 +1389,7 @@ def regression_suite( language_version, test_architecture, test_deployment_type, + selected_architecture, ): print(f"Skip test {test_name} - not supported.") continue diff --git a/sebs/sebs.py b/sebs/sebs.py index a3dd89a9..5d40b103 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -368,6 +368,7 @@ def get_benchmark( self._output_dir, self.cache_client, self.docker_client, + self.verbose, ) # Set up logging diff --git a/sebs/storage/scylladb.py b/sebs/storage/scylladb.py index c47dcef5..6c532686 100644 --- a/sebs/storage/scylladb.py +++ b/sebs/storage/scylladb.py @@ -188,7 +188,9 @@ def start(self) -> None: if attempts == max_attempts: self.logging.error("Failed to launch ScyllaDB!") - self.logging.error(f"Last result of nodetool status: {out}") + # exec_run without stream=True always returns bytes + assert isinstance(out, bytes) + self.logging.error(f"Last result of nodetool status: {out.decode('utf-8')}") raise RuntimeError("Failed to launch ScyllaDB!") self.configure_connection() diff --git a/sebs/utils.py b/sebs/utils.py index 538194ae..868f5d34 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -19,9 +19,11 @@ import click import datetime import platform +import threading +import re from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Pattern # Global constants PROJECT_DIR = Path(__file__).parent @@ -148,7 +150,7 @@ def execute(cmd, shell=False, cwd=None) -> str: Raises: RuntimeError: If command execution fails """ - if not shell: + if not shell and isinstance(cmd, str): cmd = cmd.split() ret = subprocess.run( cmd, shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT @@ -280,6 +282,123 @@ def global_logging() -> None: logging.basicConfig(format=logging_format, datefmt=logging_date_format, level=logging.INFO) +class SensitiveDataFilter(logging.Filter): + """Logging filter that removes function URLs and resource ID from output. + + Attributes: + _DEFAULT_URL_PATTERNS: List of patterns for URLs of functions. + REDACTED: String to replace with. + """ + + _DEFAULT_URL_PATTERNS: tuple[str, ...] = ( + r"https?://[a-zA-Z0-9-]+\.execute-api\.[a-z0-9-]+\.amazonaws\.com[^\s\"']*", # API Gateway + r"https?://[a-zA-Z0-9-]+\.lambda-url\.[a-z0-9-]+\.on\.aws[^\s\"']*", # Lambda URLs + r"https?://[a-zA-Z0-9-]+\.azurewebsites\.net[^\s\"']*", # Azure Functions + r"https?://[a-z0-9-]+-[a-z0-9-]+\.[a-z0-9-]+\.run\.app[^\s\"']*", # GCP Cloud Run + r"https?://[a-zA-Z0-9-]+\.cloudfunctions\.net[^\s\"']*", # GCP Functions + r"https?://[a-zA-Z0-9-]+\.workers\.dev[^\s\"']*", # Cloudflare Workers + ) + + REDACTED = "[REDACTED]" + + """Redacts serverless endpoint URLs and configurable resource IDs from logs. + Resource IDs can be added/removed at runtime as deployments happen. + + This allows hiding exact resource names in the cloud from publically visible logs, + e.g., in CI workers. + """ + + def __init__(self) -> None: + """Initialize logging filter.""" + super().__init__() + self._url_re: Pattern[str] = re.compile("|".join(SensitiveDataFilter._DEFAULT_URL_PATTERNS)) + self._resource_id: Optional[str] = None + self._resource_re: Optional[Pattern[str]] = None + + self._lock = threading.Lock() + + def set_resource_id(self, resource_id: str, cloud_id: Optional[str] = None) -> None: + """Set filtering for a specific resource ID. + + The function is idempotent - we can only set the resource ID once. + It is also thread-safe, so multiple threads doing multithreading + can call it many times and we initialize only once. + Args: + resource_id: + """ + with self._lock: + if self._resource_id is not None: + return + + self._resource_id = resource_id + + from sebs.aws.aws import AWS + from sebs.gcp.gcp import GCP + + resources_ids = set( + [ + self._resource_id, + AWS.format_function_name(self._resource_id), + GCP.format_function_name(self._resource_id), + ] + ) + if cloud_id is not None: + resources_ids.add(cloud_id) + alternation = "|".join(re.escape(r) for r in resources_ids) + self._resource_re = re.compile(alternation) + + def _scrub(self, text: str) -> str: + """Replace secrets with redacted. + + Args: + text: logged messages + + Returns: + logged messages with secrets replaced + """ + text = self._url_re.sub(SensitiveDataFilter.REDACTED, text) + if self._resource_re is not None: + text = self._resource_re.sub(SensitiveDataFilter.REDACTED, text) + return text + + def filter_string(self, msg: str) -> str: + """Apply redaction to custom messages. + + Args: + msg: message + + Returns: + message with data redacted + """ + return self._scrub(msg) + + def filter(self, record: logging.LogRecord) -> bool: + """Apply redaction to logging messages. + + Args: + record: logging record + + Returns: + always true + """ + # Redact the format string itself (covers pre-formatted f-strings). + if isinstance(record.msg, str): + record.msg = self._scrub(record.msg) + + # Redact lazy-formatting args: logger.info("deployed %s", url). + if record.args: + if isinstance(record.args, dict): + record.args = { + k: self._scrub(v) if isinstance(v, str) else v for k, v in record.args.items() + } + else: + record.args = tuple( + self._scrub(a) if isinstance(a, str) else a for a in record.args + ) + # never drop the record, just rewrite it + return True + + class ColoredWrapper: """ Wrapper for logging with colored console output. @@ -318,6 +437,7 @@ def __init__(self, prefix, logger, verbose=True, propagte=False): self.propagte = propagte self.prefix = prefix self._logging = logger + self._filter: Optional[SensitiveDataFilter] = None def debug(self, message): """ @@ -384,11 +504,23 @@ def _print(self, message, color): color: ANSI color code to use """ timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f") + + if self._filter is not None: + message = self._filter.filter_string(message) + click.echo( f"{color}{ColoredWrapper.BOLD}[{timestamp}]{ColoredWrapper.END} " f"{ColoredWrapper.BOLD}{self.prefix}{ColoredWrapper.END} {message}" ) + def set_filter(self, filter: SensitiveDataFilter): + """Set custom data filter. + + Args: + filter: + """ + self._filter = filter + class LoggingHandlers: """ @@ -439,6 +571,8 @@ class LoggingBase: logging: ColoredWrapper for formatted console output """ + REDACTION_FILTER: Optional[SensitiveDataFilter] = SensitiveDataFilter() + def __init__(self): """ Initialize the logging base with a unique identifier. @@ -456,6 +590,29 @@ def __init__(self): self._logging.setLevel(logging.INFO) self.wrapper = ColoredWrapper(self.log_name, self._logging) + if LoggingBase.REDACTION_FILTER is not None: + self._logging.addFilter(LoggingBase.REDACTION_FILTER) + self.wrapper.set_filter(LoggingBase.REDACTION_FILTER) + + @classmethod + def enable_filtering(cls) -> None: + """Enable sensitive data filtering for all loggers.""" + if cls.REDACTION_FILTER is None: + cls.REDACTION_FILTER = SensitiveDataFilter() + + @classmethod + def set_filtering_resource_id(cls, resource_id: str, cloud_id: Optional[str] = None) -> None: + """Add resource ID and cloud user IDs to logging filtering. + + Args: + resource_id: SeBS cloud ID + cloud_id: cloud-specific user ID (e.g., AWS account ID or GCP project name) + """ + assert ( + cls.REDACTION_FILTER is not None + ), "Filtering must be enabled before setting resource ID" + cls.REDACTION_FILTER.set_resource_id(resource_id, cloud_id) + @property def logging(self) -> ColoredWrapper: """ @@ -496,6 +653,10 @@ def logging_handlers(self, handlers: LoggingHandlers): propagte=handlers.handler is not None, ) + if LoggingBase.REDACTION_FILTER is not None: + self._logging.addFilter(LoggingBase.REDACTION_FILTER) + self.wrapper.set_filter(LoggingBase.REDACTION_FILTER) + if self._logging_handlers.handler is not None: self._logging.addHandler(self._logging_handlers.handler) diff --git a/sebs/version.py b/sebs/version.py index dc7990a0..b0eee8ab 100644 --- a/sebs/version.py +++ b/sebs/version.py @@ -1,3 +1,3 @@ # Copyright 2020-2025 ETH Zurich and the SeBS authors. All rights reserved. """Main SeBS version information.""" -__version__ = "1.2.0" +__version__ = "1.2.1" diff --git a/tests/aws/create_function.py b/tests/aws/create_function.py index 2c5b810c..d8fc270d 100644 --- a/tests/aws/create_function.py +++ b/tests/aws/create_function.py @@ -17,9 +17,6 @@ class AWSCreateFunction(unittest.TestCase): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } }, }, "nodejs": { @@ -29,9 +26,6 @@ class AWSCreateFunction(unittest.TestCase): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } } } } diff --git a/tests/aws/invoke_function_http.py b/tests/aws/invoke_function_http.py index 894759a2..9324c3d4 100644 --- a/tests/aws/invoke_function_http.py +++ b/tests/aws/invoke_function_http.py @@ -28,9 +28,6 @@ def test_invoke_sync_python(self): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } }, } benchmark_name = "110.dynamic-html" @@ -56,9 +53,6 @@ def test_invoke_sync_nodejs(self): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } }, } benchmark_name = "110.dynamic-html" diff --git a/tests/aws/invoke_function_sdk.py b/tests/aws/invoke_function_sdk.py index 4353edf5..f52bd40d 100644 --- a/tests/aws/invoke_function_sdk.py +++ b/tests/aws/invoke_function_sdk.py @@ -29,9 +29,6 @@ def test_invoke_sync_python(self): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } }, } benchmark_name = "110.dynamic-html" @@ -55,9 +52,6 @@ def test_invoke_sync_nodejs(self): "update_code": False, "update_storage": False, "download_results": False, - "flags": { - "docker_copy_build_files": True - } }, } benchmark_name = "110.dynamic-html"