diff --git a/tests/performance/commands/__init__.py b/tests/performance/commands/__init__.py new file mode 100644 index 00000000..b6cde4e3 --- /dev/null +++ b/tests/performance/commands/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end performance tests that drive the CLI in-process.""" diff --git a/tests/performance/commands/conftest.py b/tests/performance/commands/conftest.py new file mode 100644 index 00000000..141077bc --- /dev/null +++ b/tests/performance/commands/conftest.py @@ -0,0 +1,144 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared fixtures + summary table for E2E performance tests. + +Tests in this directory inject the ``record_result`` fixture and call it +once per parameterization. After the session finishes, +:func:`pytest_terminal_summary` prints a formatted table of every recorded +row — handy when running roofline + low-QPS together. +""" + +from __future__ import annotations + +import os +import platform +from typing import Any + +import pytest + + +class _Collected: + """Module-level singleton holding rows recorded during the session.""" + + rows: list[dict[str, Any]] = [] + + +@pytest.fixture +def record_result(): + """Record a result row that will appear in the end-of-session summary. + + Pass keyword fields you want in the table — anything missing renders as + ``—`` in the output. + + Usage:: + + def test_foo(record_result): + record_result( + "max_throughput", stream=False, + qps=44426.0, total=2_000_000, elapsed=45.02, failed=0, + ) + """ + + def _record(label: str, **fields: Any) -> None: + _Collected.rows.append({"label": label, **fields}) + + return _record + + +# ----------------------------------------------------------------------------- +# Host info + table rendering +# ----------------------------------------------------------------------------- + + +def _host_info() -> dict[str, str]: + cpu = "unknown" + try: + with open("/proc/cpuinfo") as f: + for line in f: + if line.startswith("model name"): + cpu = line.split(":", 1)[1].strip() + break + except OSError: + # CPU model is informational; missing /proc/cpuinfo (non-Linux, + # restricted container) just leaves it as "unknown". + pass + cores = os.cpu_count() or 0 + return { + "host": platform.node(), + "arch": platform.machine(), + "cpu": cpu, + "cores": str(cores) if cores else "?", + } + + +def _fmt_cell(value: Any, kind: str) -> str: + if value is None: + return "—" + if kind == "stream": + return "on " if value else "off" + # Conversions go through float() first so numeric strings ("100.0") + # don't crash int(). Any conversion failure falls back to str(value) + # so the end-of-session summary never blows up the pytest run. + try: + if kind == "qps": + v = float(value) + return f"{v:>9,.0f}" if v >= 100 else f"{v:>9.2f}" + if kind == "total": + return f"{int(float(value)):>10,}" + if kind == "elapsed": + return f"{float(value):>7.2f}s" + if kind == "failed": + return f"{int(float(value)):>4}" + except (TypeError, ValueError): + return str(value) + return str(value) + + +def pytest_terminal_summary(terminalreporter, exitstatus, config) -> None: # noqa: ARG001 + rows = _Collected.rows + if not rows: + return + + tr = terminalreporter + tr.write_sep("=", "E2E Performance Summary") + + info = _host_info() + tr.write_line( + f"Host: {info['host']} Arch: {info['arch']} Cores: {info['cores']}" + ) + tr.write_line(f"CPU: {info['cpu']}") + tr.write_line("") + + headers = ["Test", "Stream", "QPS", "Total", "Elapsed", "Failed"] + kinds = ["label", "stream", "qps", "total", "elapsed", "failed"] + keys = ["label", "stream", "qps", "total", "elapsed", "failed"] + + body = [ + [_fmt_cell(r.get(k), kind) for k, kind in zip(keys, kinds, strict=False)] + for r in rows + ] + + widths = [ + max(len(h), max((len(row[i]) for row in body), default=0)) + for i, h in enumerate(headers) + ] + fmt = " ".join(f"{{:<{w}}}" for w in widths) + sep = " ".join("-" * w for w in widths) + + tr.write_line(fmt.format(*headers)) + tr.write_line(sep) + for row in body: + tr.write_line(fmt.format(*row)) diff --git a/tests/performance/commands/test_e2e_perf.py b/tests/performance/commands/test_e2e_perf.py new file mode 100644 index 00000000..57a2e90d --- /dev/null +++ b/tests/performance/commands/test_e2e_perf.py @@ -0,0 +1,278 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end performance tests for the benchmark CLI. + +Two families of tests, both driving the cyclopts ``inference-endpoint`` +app in-process and parameterized on stream/non-stream: + +* **Roofline** against :class:`MaxThroughputServer` (instant pre-compiled + responses). Measures peak QPS for each load pattern + (``max_throughput``, ``concurrency``, ``poisson``). Prints numbers + rather than asserting on them. + +* **Low-QPS correctness** against :class:`VariableResponseServer` + (realistic TTFT + per-token TPOT). Asserts zero ``failed`` requests at + 5 QPS for 20 s — guards keep-alive / idle-pool / slow-response + regressions. + +Both families are marked ``performance`` and are therefore CI-skipped; +run them explicitly when investigating throughput regressions or +benchmarking a new machine. Results from every parametrized case are +written via the ``record_result`` fixture and rendered as a single +summary table by ``conftest.py`` after the session completes. + +Run:: + + pytest -vs -m performance --no-cov tests/performance/commands/test_e2e_perf.py +""" + +from __future__ import annotations + +import pytest +from inference_endpoint.testing.max_throughput_server import MaxThroughputServer +from inference_endpoint.testing.variable_throughput_server import VariableResponseServer + +from .utils import run_cli + +# ============================================================================= +# Roofline tests — MaxThroughputServer, every load pattern, stream + non-stream +# ============================================================================= + + +@pytest.fixture(scope="module", params=[False, True], ids=["nonstream", "stream"]) +def max_tput_server(request): + """Stub server returning fixed pre-compiled responses (roofline target).""" + with MaxThroughputServer( + port=0, + num_workers=4, + stream=request.param, + stream_interval=10, + quiet=True, + ) as srv: + yield srv + + +@pytest.mark.performance +@pytest.mark.xdist_group(name="serial_performance") +def test_max_throughput_roofline(max_tput_server, tmp_path, record_result): + """Offline burst — issue 2,000,000 queries at t=0.""" + results = run_cli( + [ + "offline", + "--load-pattern", + "max_throughput", + "--num-samples", + "2000000", + ], + tmp_path, + max_tput_server, + ) + r = results["results"] + assert r["failed"] == 0, f"failed={r['failed']} (expected 0)" + record_result( + "max_throughput (2M burst)", + stream=max_tput_server.stream, + qps=r["qps"], + total=r["total"], + elapsed=r["elapsed_time"], + failed=r["failed"], + ) + print( + f"\n max_throughput stream={max_tput_server.stream}: " + f"QPS={r['qps']:>10,.0f} total={r['total']:>9,} " + f"elapsed={r['elapsed_time']:6.2f}s" + ) + + +@pytest.mark.performance +@pytest.mark.xdist_group(name="serial_performance") +@pytest.mark.parametrize("concurrency", [1000, 4000, 16000]) +def test_concurrency_roofline(max_tput_server, concurrency, tmp_path, record_result): + """Online concurrency — N in-flight requests for fixed duration.""" + results = run_cli( + [ + "online", + "--load-pattern", + "concurrency", + "--concurrency", + str(concurrency), + "--duration", + "10s", + "--runtime.max-duration-ms", + "12000", + # Headroom so wall time, not sample count, is the limit. + "--num-samples", + "10000000", + ], + tmp_path, + max_tput_server, + ) + r = results["results"] + assert r["failed"] == 0, f"failed={r['failed']} (expected 0)" + record_result( + f"concurrency c={concurrency:,}", + stream=max_tput_server.stream, + qps=r["qps"], + total=r["total"], + elapsed=r["elapsed_time"], + failed=r["failed"], + ) + print( + f"\n concurrency c={concurrency:>5} stream={max_tput_server.stream}: " + f"QPS={r['qps']:>10,.0f} total={r['total']:>9,} " + f"elapsed={r['elapsed_time']:6.2f}s" + ) + + +@pytest.mark.performance +@pytest.mark.xdist_group(name="serial_performance") +def test_poisson_binary_search_max_qps(max_tput_server, tmp_path, record_result): + """Binary search for the largest 10k-multiple target_qps the server sustains.""" + STEP = 10_000 + LO, HI = 10_000, 250_000 # search space (inclusive) + PASS_RATIO = 0.95 # achieved/target threshold for "sustained" + + # Standard binary search over candidate targets so the LO boundary is + # actually exercised: with ``while lo < hi`` we could converge to + # ``lo == hi == LO/STEP`` without ever issuing a run at LO, leaving + # ``max_sustained`` reported as 0 even if LO is sustainable. + history: list[tuple[int, float, bool]] = [] + best_sustained = 0 + lo, hi = LO // STEP, HI // STEP # integer bounds in units of STEP + while lo <= hi: + mid = (lo + hi) // 2 + target = mid * STEP + results = run_cli( + [ + "online", + "--load-pattern", + "poisson", + "--target-qps", + str(target), + "--duration", + "10s", + "--runtime.max-duration-ms", + "12000", + # Headroom so wall time, not sample count, is the limit. + "--num-samples", + str(max(100_000, target * 15)), + ], + tmp_path / f"qps_{target}", + max_tput_server, + ) + r = results["results"] + achieved = r["qps"] + sustained = achieved >= target * PASS_RATIO + history.append((target, achieved, sustained)) + if sustained: + best_sustained = target + lo = mid + 1 + else: + hi = mid - 1 + + max_sustained = best_sustained + record_result( + "poisson max_sustained", + stream=max_tput_server.stream, + qps=max_sustained, + failed=0, + ) + print( + f"\n poisson binary search stream={max_tput_server.stream}: " + f"max_sustained={max_sustained:>7,} QPS (PASS_RATIO={PASS_RATIO})" + ) + for t, a, s in history: + print(f" target={t:>7,} achieved={a:>10,.0f} sustained={s}") + + +# ============================================================================= +# Low-QPS correctness — VariableResponseServer, 5 QPS, no network errors +# ============================================================================= + + +@pytest.fixture(scope="module", params=[False, True], ids=["nonstream", "stream"]) +def variable_server(request): + """Realistic LLM stub: ~100-char responses, 50ms TTFT, 10ms/token TPOT.""" + with VariableResponseServer( + host="127.0.0.1", + port=0, + output_len_mean=100, + output_len_spread=0.2, + inter_token_latency=10.0, + inter_token_spread=0.1, + first_chunk_latency=0.05, + first_chunk_spread=0.1, + stream=request.param, + stream_interval=20, + num_workers=2, + quiet=True, + ) as srv: + yield srv + + +@pytest.mark.performance +@pytest.mark.xdist_group(name="serial_performance") +def test_low_qps_no_network_errors(variable_server, tmp_path, record_result): + """Sustain 5 QPS Poisson for 20 s — must complete with zero failed requests. + + Low QPS spaces requests far enough apart that idle connections may + sit past ``TCP_KEEPIDLE`` (1 s in :class:`_SocketConfig`). A regression + in keep-alive probing, idle pool eviction, or slow-response handling + surfaces here as non-zero ``failed`` count. + """ + TARGET_QPS = 5 + DURATION_S = 20 + + results = run_cli( + [ + "online", + "--load-pattern", + "poisson", + "--target-qps", + str(TARGET_QPS), + "--duration", + f"{DURATION_S}s", + # 2x Poisson expectation so wall time (--duration) always caps + # the run; without headroom, variance in inter-arrivals can + # finish the test early before the full idle-connection window. + "--num-samples", + str(TARGET_QPS * DURATION_S * 2), + # Low QPS needs neither many workers nor pre-warmed connections; + # using auto defaults makes startup slow and flaky against a stub + # that has TTFT + per-token delays. + "--workers", + "4", + "--client.warmup-connections", + "0", + ], + tmp_path, + variable_server, + ) + r = results["results"] + assert r["failed"] == 0, f"failed={r['failed']} of {r['total']}" + record_result( + f"low_qps target={TARGET_QPS}", + stream=variable_server.stream, + qps=r["qps"], + total=r["total"], + elapsed=r["elapsed_time"], + failed=r["failed"], + ) + print( + f"\n low_qps target={TARGET_QPS} stream={variable_server.stream}: " + f"achieved={r['qps']:.2f} QPS total={r['total']} " + f"failed={r['failed']} elapsed={r['elapsed_time']:.2f}s" + ) diff --git a/tests/performance/commands/utils.py b/tests/performance/commands/utils.py new file mode 100644 index 00000000..f9713013 --- /dev/null +++ b/tests/performance/commands/utils.py @@ -0,0 +1,86 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helpers for E2E command tests.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Protocol + +DATASET = str( + Path(__file__).resolve().parents[2] / "assets" / "datasets" / "ds_samples.jsonl" +) + + +class StubServer(Protocol): + """Duck-type for the stub servers (MaxThroughputServer, VariableResponseServer). + + Named ``StubServer`` rather than ``TestServer`` so pytest doesn't try to + collect it as a test class (any class whose name starts with ``Test`` is + a collection candidate). + """ + + url: str + stream: bool + + +def run_cli( + extra_args: list[str], + report_dir: Path, + server: StubServer, + *, + dataset: str = DATASET, +) -> dict: + """Invoke ``inference-endpoint`` in-process via cyclopts; return results.json. + + Client ``--streaming`` is coupled to the server's response mode: the stub + server always returns the same pre-compiled bytes (JSON or SSE), + regardless of what the client sent in the request body. Mismatched modes + produce ``DecodeError: JSON is malformed`` on every response. + + Env overrides (useful in containers where cpu_affinity is restricted): + ROOFLINE_NUM_WORKERS — override --workers (default: auto) + ROOFLINE_INIT_TIMEOUT — override --client.worker-initialization-timeout + """ + from inference_endpoint.main import app + + report_dir.mkdir(parents=True, exist_ok=True) + args = [ + "benchmark", + *extra_args, + "--endpoints", + server.url, + "--streaming", + "on" if server.stream else "off", + "--model", + "max-tp", + "--dataset", + dataset, + "--report-dir", + str(report_dir), + ] + if nw := os.environ.get("ROOFLINE_NUM_WORKERS"): + args += ["--workers", nw] + if to := os.environ.get("ROOFLINE_INIT_TIMEOUT"): + args += ["--client.worker-initialization-timeout", to] + try: + app(args) + except SystemExit as e: + if e.code not in (None, 0): + raise + return json.loads((report_dir / "results.json").read_text())