-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhandler.py
More file actions
139 lines (109 loc) · 4.76 KB
/
handler.py
File metadata and controls
139 lines (109 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import importlib.util
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional
from logger import setup_logging
from unpack_volume import maybe_unpack
from version import format_version_banner
# Initialize logging configuration
setup_logging()
logger = logging.getLogger(__name__)
# Unpack Flash deployment artifacts if running in Flash mode
# This is a no-op for Live Serverless and local development
maybe_unpack()
# Log after unpack so bundled runpod_flash is on sys.path
logger.info(format_version_banner())
def _is_deployed_mode() -> bool:
"""True when running as a Flash-deployed endpoint (not Live Serverless)."""
return bool(os.getenv("FLASH_RESOURCE_NAME"))
def _load_generated_handler() -> Optional[Any]:
"""Load Flash-generated handler for deployed QB mode.
Checks for a handler_<resource_name>.py file generated by the flash
build pipeline. These handlers accept plain JSON input without
FunctionRequest/cloudpickle serialization.
In deployed mode (FLASH_RESOURCE_NAME set), failures are fatal.
FunctionRequest fallback is only valid for Live Serverless workers.
Returns:
Handler function if generated handler found, None if not in
deployed mode.
Raises:
RuntimeError: If in deployed mode and the handler cannot be loaded.
"""
resource_name = os.getenv("FLASH_RESOURCE_NAME")
if not resource_name:
return None
handler_file = Path(f"/app/handler_{resource_name}.py")
if not handler_file.resolve().is_relative_to(Path("/app").resolve()):
raise RuntimeError(
f"FLASH_RESOURCE_NAME '{resource_name}' resolves outside /app. "
f"This is a security violation. Check the endpoint environment variables."
)
if not handler_file.exists():
raise RuntimeError(
f"Generated handler {handler_file} not found for resource '{resource_name}'. "
f"The build artifact is incomplete. Redeploy with 'flash deploy'."
)
spec = importlib.util.spec_from_file_location(f"handler_{resource_name}", handler_file)
if not spec or not spec.loader:
raise RuntimeError(
f"Failed to create module spec for {handler_file}. "
f"The file may be corrupted. Redeploy with 'flash deploy'."
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except ImportError as e:
raise RuntimeError(
f"Generated handler {handler_file} failed to import: {e}. "
f"This usually means a dependency was built for the wrong Python version. "
f"Redeploy with 'flash deploy'."
) from e
except SyntaxError as e:
raise RuntimeError(
f"Generated handler {handler_file} has a syntax error: {e}. "
f"This indicates a bug in the flash build pipeline."
) from e
except Exception as e:
raise RuntimeError(
f"Generated handler {handler_file} failed to load: {e} ({type(e).__name__}). "
f"Redeploy with 'flash deploy'."
) from e
generated = getattr(mod, "handler", None)
if generated is None:
raise RuntimeError(
f"Generated handler {handler_file} has no 'handler' function. "
f"This indicates a bug in the flash build pipeline."
)
if not callable(generated):
raise RuntimeError(
f"Generated handler {handler_file} has a 'handler' attribute "
f"but it is not callable ({type(generated).__name__}). "
f"This indicates a bug in the flash build pipeline."
)
logger.info("Loaded generated handler from %s", handler_file)
return generated
# Deployed mode: generated handler is mandatory, failures are fatal.
# Live Serverless mode: FunctionRequest handler is the only path.
if _is_deployed_mode():
handler = _load_generated_handler()
else:
from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse
from remote_executor import RemoteExecutor
async def handler(event: Dict[str, Any]) -> Dict[str, Any]:
"""RunPod serverless handler for Live Serverless (FunctionRequest protocol)."""
output: FunctionResponse
try:
executor = RemoteExecutor()
input_data = FunctionRequest(**event.get("input", {}))
output = await executor.ExecuteFunction(input_data)
except Exception as error:
output = FunctionResponse(
success=False,
error=f"Error in handler: {str(error)}",
)
return output.model_dump() # type: ignore[no-any-return]
# Start the RunPod serverless handler (only available on RunPod platform)
if __name__ == "__main__":
import runpod
runpod.serverless.start({"handler": handler})