feat: add concurrency_modifier support for QB endpoints#301
feat: add concurrency_modifier support for QB endpoints#301
Conversation
Add max_concurrency int field (default 1) to ResourceConfig dataclass and its from_dict() parser. Enables manifest to carry per-resource concurrency config for QB workers.
Read _max_concurrency from Endpoint facade before unwrapping to internal resource config in _extract_deployment_config(). Only set in manifest when value > 1 (default 1 means no override needed). For LB endpoints, max_concurrency is meaningless (uvicorn handles concurrency), so warn and remove the field in build() to keep the manifest schema consistent and avoid silent misconfiguration.
Add DEPLOYED_ASYNC_HANDLER_TEMPLATE and DEPLOYED_ASYNC_CLASS_HANDLER_TEMPLATE for max_concurrency > 1 with async handlers. Update _generate_handler to read max_concurrency from resource_data and select template accordingly. Sync handlers with max_concurrency > 1 get concurrency_modifier injected via string replacement. Includes 10 new tests covering all template selection paths and warning logs.
The concurrency feature accidentally tightened the name validation guard and removed auto-derive in __call__ and the _route name check, breaking nameless QB decorator mode and LB stub routing.
|
Promptless prepared a documentation update related to this change. Triggered by runpod/flash#301 Added documentation for the new Review at https://app.gopromptless.ai/suggestions/640ef899-880f-42f8-a055-db3680e8a632 |
There was a problem hiding this comment.
Pull request overview
Adds max_concurrency support intended to control concurrent job execution for queue-based (QB) endpoints, and propagates it through manifest generation and handler code generation.
Changes:
- Add
max_concurrencytoEndpoint(with validation) and to runtimeResourceConfig. - Propagate
max_concurrencyinto the build manifest (and warn/strip for load-balanced endpoints). - Generate handlers with
concurrency_modifierwhenmax_concurrency > 1(new async templates + sync injection path), plus tests.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Bumps runpod-flash version metadata. |
| tests/unit/test_endpoint.py | Adds unit tests for Endpoint.max_concurrency validation/defaults. |
| tests/unit/test_concurrency_manifest.py | Adds tests for ResourceConfig.max_concurrency and manifest behavior for QB vs LB. |
| tests/unit/cli/commands/build_utils/test_handler_generator.py | Adds coverage for handler codegen with/without concurrency_modifier and warning behavior. |
| src/runpod_flash/runtime/models.py | Adds max_concurrency field to ResourceConfig and from_dict handling. |
| src/runpod_flash/endpoint.py | Adds max_concurrency parameter + validation and stores it on the endpoint instance. |
| src/runpod_flash/cli/commands/build_utils/manifest.py | Extracts max_concurrency during build; warns/strips for LB endpoints. |
| src/runpod_flash/cli/commands/build_utils/handler_generator.py | Adds async handler templates and injects concurrency_modifier for max_concurrency > 1. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Extract max_concurrency from Endpoint facade before unwrapping. | ||
| # max_concurrency is a Flash concept that lives on Endpoint, | ||
| # not on the internal resource config (LiveServerless, etc.). | ||
| if hasattr(resource_config, "_max_concurrency"): | ||
| mc = resource_config._max_concurrency | ||
| if mc > 1: | ||
| config["max_concurrency"] = mc | ||
|
|
There was a problem hiding this comment.
max_concurrency extraction currently only works when resource_config is an Endpoint instance (e.g., referenced via a module-level variable and found via config_variable). For the common inline decorator form (@Endpoint(..., max_concurrency=...)), Endpoint.__call__() passes the internal resource config into @remote, so __remote_config__["resource_config"] is not an Endpoint and the ephemeral Endpoint instance is not reachable during manifest build. As a result, max_concurrency will be silently dropped from the manifest for inline-decorated QB endpoints.
Suggested fix: persist max_concurrency into the decorated object’s __remote_config__ (e.g., a top-level key like max_concurrency) during decoration, and have _extract_deployment_config() read it from remote_cfg when present, rather than relying on accessing a private field on the Endpoint facade.
| is_class = ( | ||
| func.is_class if hasattr(func, "is_class") else func.get("is_class", False) | ||
| ) | ||
| is_async = ( | ||
| func.is_async if hasattr(func, "is_async") else func.get("is_async", False) | ||
| ) | ||
|
|
||
| import_statement = ( | ||
| f"{name} = importlib.import_module('{module}').{name}" | ||
| if module and name | ||
| else "# No function to import" | ||
| ) | ||
|
|
||
| if max_concurrency > 100: | ||
| logger.warning( | ||
| "High max_concurrency=%d for resource '%s'. Ensure your handler " | ||
| "and GPU can support this level of concurrent execution.", | ||
| max_concurrency, | ||
| resource_name, | ||
| ) | ||
|
|
||
| if max_concurrency > 1 and not is_async: | ||
| logger.warning( | ||
| "max_concurrency=%d set on sync handler '%s'. " | ||
| "Only async handlers benefit from concurrent execution. " | ||
| "Consider making the handler async.", | ||
| max_concurrency, | ||
| resource_name, | ||
| ) | ||
|
|
||
| if is_class: | ||
| class_methods = ( | ||
| func.class_methods | ||
| if hasattr(func, "class_methods") | ||
| else func.get("class_methods", []) | ||
| ) | ||
| methods_dict = {m: m for m in class_methods} if class_methods else {} | ||
| return DEPLOYED_CLASS_HANDLER_TEMPLATE.format( | ||
|
|
||
| if max_concurrency > 1 and is_async: | ||
| return DEPLOYED_ASYNC_CLASS_HANDLER_TEMPLATE.format( | ||
| resource_name=resource_name, | ||
| timestamp=timestamp, | ||
| import_statement=import_statement, | ||
| class_name=name or "None", | ||
| methods_dict=repr(methods_dict), | ||
| max_concurrency=max_concurrency, | ||
| ) |
There was a problem hiding this comment.
is_async is used to decide whether to emit the new async class handler template, but the build scanner currently never marks class-based resources as async (it only sets is_async for non-class functions). This makes the max_concurrency > 1 and is_async async-class branch effectively unreachable in real builds, and will also trigger the “sync handler” warning even when the class methods are actually async def.
Suggested fix: either (a) update the scanner/manifest to set is_async=True for class resources when any public method is coroutine, or (b) make handler generation detect/handle async class methods without relying on the manifest-level is_async flag (e.g., generate an async handler that awaits coroutine results conditionally).
| def test_async_class_handler_with_concurrency(): | ||
| """max_concurrency > 1 + async class produces async class handler.""" | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| build_dir = Path(tmpdir) | ||
| manifest = { | ||
| "version": "1.0", | ||
| "generated_at": "2026-01-02T10:00:00Z", | ||
| "project_name": "test_app", | ||
| "resources": { | ||
| "vllm_worker": { | ||
| "resource_type": "Endpoint", | ||
| "max_concurrency": 10, | ||
| "functions": [ | ||
| { | ||
| "name": "VLLMWorker", | ||
| "module": "workers.vllm", | ||
| "is_async": True, | ||
| "is_class": True, | ||
| "class_methods": ["generate"], | ||
| } |
There was a problem hiding this comment.
These tests construct manifests with is_class=True and is_async=True, but the runtime scanner currently always sets is_async=False for class-based resources (it only computes async-ness for non-class functions). This makes the tests unrepresentative of the real build output and can give a false sense of coverage for the async-class concurrency path.
Suggested fix: either update the scanner/manifest generation to propagate async class detection (so is_async=True is possible for classes), or adjust the tests to match actual scanner behavior (and assert the expected sync-class template behavior/warnings).
runpod-Henrik
left a comment
There was a problem hiding this comment.
PR #301 — feat: add concurrency_modifier support for QB endpoints
Henrik's AI-Powered Bug Finder
1. Question: What happens if is_async is incorrectly detected?
If the scanner marks a sync function as is_async=True (or vice versa), the wrong template is selected:
is_async=Trueon a sync function → generated handler doesawait func(**job_input)on a non-coroutine → runtimeTypeErroron every jobis_async=Falseon an async function → sync template used, handler callsfunc(**job_input)withoutawait→ function returns a coroutine object, not a result
Is is_async detection in the scanner reliable for all QB worker patterns? If the scanner can be fooled (e.g., a function decorated with @functools.wraps wrapping an async function), users get a silently broken deployment.
2. Test coverage gap: no live concurrency test
All tests verify that the string concurrency_modifier appears in generated code. None verify that the RunPod platform actually honours it — i.e., that a deployed worker with max_concurrency=5 accepts 5 simultaneous jobs without queuing them. Noted as a gap, not a blocker.
Nits
_METHODS = {methods_dict}maps{m: m}— values are never read, only keys. Same pattern as the existing sync class template, so not new, but worth cleaning up.- The high-concurrency warning threshold of
> 100is arbitrary and undocumented. Worth a comment explaining the rationale.
Verdict: PASS WITH NITS
Core implementation is correct — validation, manifest propagation, template selection, and LB stripping all work as described. The is_async detection question is the main open item but depends on the scanner which is outside this PR's scope.
🤖 Reviewed by Henrik's AI-Powered Bug Finder
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return code.replace( | ||
| 'runpod.serverless.start({"handler": handler})', | ||
| "runpod.serverless.start({\n" | ||
| ' "handler": handler,\n' | ||
| f' "concurrency_modifier": lambda current: {max_concurrency},\n' | ||
| " })", | ||
| ) | ||
|
|
There was a problem hiding this comment.
_inject_concurrency_modifier relies on a hard-coded string replacement of runpod.serverless.start({"handler": handler}). If the template formatting/spacing changes (or if the start call is refactored), this will silently fail and omit concurrency_modifier without any error.
Suggested fix: generate the runpod.serverless.start(...) block via template placeholders (or structured formatting) instead of str.replace, or at minimum assert that the replacement occurred (e.g., check the expected substring was present and raise if not).
| return code.replace( | |
| 'runpod.serverless.start({"handler": handler})', | |
| "runpod.serverless.start({\n" | |
| ' "handler": handler,\n' | |
| f' "concurrency_modifier": lambda current: {max_concurrency},\n' | |
| " })", | |
| ) | |
| start_call = 'runpod.serverless.start({"handler": handler})' | |
| replacement = ( | |
| "runpod.serverless.start({\n" | |
| ' "handler": handler,\n' | |
| f' "concurrency_modifier": lambda current: {max_concurrency},\n' | |
| " })" | |
| ) | |
| if code.count(start_call) != 1: | |
| raise ValueError( | |
| "Unable to inject concurrency_modifier: expected exactly one " | |
| f"occurrence of {start_call!r} in generated handler code." | |
| ) | |
| return code.replace(start_call, replacement, 1) |
Summary
max_concurrencyparameter toEndpointfor controlling concurrent job execution on QB endpointsmax_concurrencythroughResourceConfigand manifest, with LB endpoint warning/strippingconcurrency_modifierformax_concurrency > 1+ async handlersconcurrency_modifierinto sync handler templates whenmax_concurrency > 1(with warning)Linear
AE-2415
Changes
endpoint.py: newmax_concurrencyparam with validation (>= 1)runtime/models.py:max_concurrencyfield onResourceConfigmanifest.py: propagate fromEndpointfacade, warn and strip for LB endpointshandler_generator.py: newDEPLOYED_ASYNC_HANDLER_TEMPLATEandDEPLOYED_ASYNC_CLASS_HANDLER_TEMPLATEwithconcurrency_modifier, plus_inject_concurrency_modifierfor sync fallbackTest plan
Endpoint.max_concurrencyvalidation (default, explicit, zero, negative)ResourceConfig.max_concurrencyround-trip through dict