-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathdeployment.py
More file actions
564 lines (454 loc) · 19.1 KB
/
deployment.py
File metadata and controls
564 lines (454 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
"""Deployment environment management utilities."""
import asyncio
import copy
import json
import logging
from typing import Dict, Any
from datetime import datetime
from pathlib import Path
from runpod_flash.config import get_paths
from runpod_flash.core.resources.serverless import ServerlessResource
from runpod_flash.core.resources.app import FlashApp
from runpod_flash.core.resources.resource_manager import ResourceManager
from runpod_flash.runtime.resource_provisioner import create_resource_from_manifest
log = logging.getLogger(__name__)
RUNTIME_RESOURCE_FIELDS = set(ServerlessResource.RUNTIME_FIELDS) | {
"id",
"endpoint_id",
}
def _normalized_resource_attr(resource: Any, *names: str) -> str | None:
for name in names:
value = getattr(resource, name, None)
if isinstance(value, str) and value.strip():
return value
return None
def _manifest_without_ai_keys(manifest: Dict[str, Any]) -> Dict[str, Any]:
sanitized_manifest = copy.deepcopy(manifest)
resources = sanitized_manifest.get("resources")
if not isinstance(resources, dict):
return sanitized_manifest
for config in resources.values():
if isinstance(config, dict):
config.pop("aiKey", None)
return sanitized_manifest
def _resource_config_for_compare(config: Dict[str, Any]) -> Dict[str, Any]:
compare_config = copy.deepcopy(config)
for field in RUNTIME_RESOURCE_FIELDS:
compare_config.pop(field, None)
return compare_config
async def upload_build(app_name: str, build_path: str | Path):
app = await FlashApp.from_name(app_name)
await app.upload_build(build_path)
def get_deployment_environments() -> Dict[str, Dict[str, Any]]:
"""Get all deployment environments."""
paths = get_paths()
deployments_file = paths.deployments_file
if not deployments_file.exists():
return {}
try:
with open(deployments_file) as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {}
def save_deployment_environments(environments: Dict[str, Dict[str, Any]]):
"""Save deployment environments to file."""
paths = get_paths()
deployments_file = paths.deployments_file
# Ensure .flash directory exists
paths.ensure_flash_dir()
with open(deployments_file, "w") as f:
json.dump(environments, f, indent=2)
def create_deployment_environment(name: str, config: Dict[str, Any]):
"""Create a new deployment environment."""
environments = get_deployment_environments()
# Mock environment creation
environments[name] = {
"status": "idle",
"config": config,
"created_at": datetime.now().isoformat(),
"current_version": None,
"last_deployed": None,
"url": None,
"version_history": [],
}
save_deployment_environments(environments)
def remove_deployment_environment(name: str):
"""Remove a deployment environment."""
environments = get_deployment_environments()
if name in environments:
del environments[name]
save_deployment_environments(environments)
async def provision_resources_for_build(
app: FlashApp, build_id: str, environment_name: str, show_progress: bool = True
) -> Dict[str, str]:
"""Provision all resources upfront before environment activation.
Args:
app: FlashApp instance
build_id: ID of the build to provision resources for
environment_name: Name of environment (for logging/context)
show_progress: Whether to show CLI progress
Returns:
Mapping of resource_name -> endpoint_url
Raises:
RuntimeError: If provisioning fails for any resource
"""
# Load manifest from build
manifest = await app.get_build_manifest(build_id)
if not manifest or "resources" not in manifest:
log.warning(f"No resources in manifest for build {build_id}")
return {}
# Create resource manager
manager = ResourceManager()
resources_to_provision = []
# Create resource configs from manifest
manifest_python_version = manifest.get("python_version")
for resource_name, resource_config in manifest["resources"].items():
resource = create_resource_from_manifest(
resource_name,
resource_config,
python_version=manifest_python_version,
)
resources_to_provision.append((resource_name, resource))
if show_progress:
print(
f"Provisioning {len(resources_to_provision)} resources for environment '{environment_name}'..."
)
# Provision resources in parallel
resources_endpoints = {}
provisioning_results = []
try:
# Use asyncio.gather for parallel provisioning
tasks = [
manager.get_or_deploy_resource(resource)
for _, resource in resources_to_provision
]
provisioning_results = await asyncio.gather(*tasks)
except Exception as e:
log.error(f"Provisioning failed: {e}")
raise RuntimeError(f"Failed to provision resources: {e}") from e
# Build resources_endpoints mapping
lb_endpoint_url = None
for (resource_name, _), deployed_resource in zip(
resources_to_provision, provisioning_results
):
# Get endpoint URL (both LoadBalancer and Serverless have endpoint_url)
if hasattr(deployed_resource, "endpoint_url"):
endpoint_url = deployed_resource.endpoint_url
else:
log.warning(f"Resource {resource_name} has no endpoint_url attribute")
continue
resources_endpoints[resource_name] = endpoint_url
endpoint_id = _normalized_resource_attr(deployed_resource, "endpoint_id", "id")
if endpoint_id:
manifest["resources"][resource_name]["endpoint_id"] = endpoint_id
ai_key = _normalized_resource_attr(deployed_resource, "aiKey", "ai_key")
if ai_key:
manifest["resources"][resource_name]["aiKey"] = ai_key
# Track load balancer URL for prominent logging
if manifest["resources"][resource_name].get("is_load_balanced"):
lb_endpoint_url = endpoint_url
if show_progress:
print(f" ✓ {resource_name}: {endpoint_url}")
# Update manifest in FlashApp with resources_endpoints
manifest["resources_endpoints"] = resources_endpoints
await app.update_build_manifest(build_id, manifest)
if show_progress:
print("✓ All resources provisioned and manifest updated")
# Display load balancer URL prominently if present
if lb_endpoint_url:
print()
print("=" * 60)
print(f"Load Balancer Endpoint: {lb_endpoint_url}")
print("=" * 60)
return resources_endpoints
async def reconcile_and_provision_resources(
app: FlashApp,
build_id: str,
environment_name: str,
local_manifest: Dict[str, Any],
environment_id: str | None = None,
show_progress: bool = True,
) -> Dict[str, str]:
"""Reconcile local manifest with State Manager and provision resources.
Compares local manifest to State Manager manifest to determine:
- NEW resources to provision
- CHANGED resources to update
- REMOVED resources to delete
Args:
app: FlashApp instance
build_id: ID of the build
environment_name: Name of environment (for logging)
local_manifest: Local manifest dictionary
environment_id: Optional environment ID for endpoint provisioning
show_progress: Whether to display progress information during
reconciliation and provisioning
Returns:
Updated manifest with deployment information
Raises:
ValueError: If RUNPOD_API_KEY is missing when resources make remote calls
RuntimeError: If reconciliation or provisioning fails
"""
# Validate RUNPOD_API_KEY is available if any resource makes remote calls
has_remote_callers = any(
config.get("makes_remote_calls", False)
for config in local_manifest.get("resources", {}).values()
)
from runpod_flash.core.credentials import get_api_key
if has_remote_callers and not get_api_key():
raise ValueError(
"RUNPOD_API_KEY is required when deploying resources that make "
"remote calls. Set it via 'flash login' or in your environment."
)
# Load State Manager manifest for comparison
try:
state_manifest = await app.get_build_manifest(build_id)
except Exception as e:
log.warning(f"Could not fetch State Manager manifest: {e}")
state_manifest = {} # First deployment, no state manifest yet
# Reconcile: Determine actions
local_resources = set(local_manifest.get("resources", {}).keys())
state_resources = set(state_manifest.get("resources", {}).keys())
to_provision = local_resources - state_resources # New resources
to_update = local_resources & state_resources # Existing resources
to_delete = state_resources - local_resources # Removed resources
if show_progress:
log.debug(
f"Reconciliation: {len(to_provision)} new, "
f"{len(to_update)} existing, {len(to_delete)} to remove"
)
# Create resource manager
manager = ResourceManager()
actions = []
manifest_python_version = local_manifest.get("python_version")
# Provision new resources
for resource_name in sorted(to_provision):
resource_config = local_manifest["resources"][resource_name]
resource = create_resource_from_manifest(
resource_name,
resource_config,
flash_environment_id=environment_id,
python_version=manifest_python_version,
)
actions.append(
("provision", resource_name, manager.get_or_deploy_resource(resource))
)
# Update existing resources (check if config changed OR if endpoint missing)
for resource_name in sorted(to_update):
local_config = local_manifest["resources"][resource_name]
state_config = state_manifest.get("resources", {}).get(resource_name, {})
# Compare only user-managed config fields (exclude runtime metadata)
local_json = json.dumps(
_resource_config_for_compare(local_config),
sort_keys=True,
)
state_json = json.dumps(
_resource_config_for_compare(state_config),
sort_keys=True,
)
# Check if endpoint exists in state manifest
has_endpoint = resource_name in state_manifest.get("resources_endpoints", {})
if local_json != state_json or not has_endpoint:
# Config changed OR no endpoint - need to provision/update
resource = create_resource_from_manifest(
resource_name,
local_config,
flash_environment_id=environment_id,
python_version=manifest_python_version,
)
actions.append(
("update", resource_name, manager.get_or_deploy_resource(resource))
)
else:
# Config unchanged AND endpoint exists - reuse existing endpoint info
if "endpoint_id" in state_config:
local_manifest["resources"][resource_name]["endpoint_id"] = (
state_config["endpoint_id"]
)
if "aiKey" in state_config:
local_manifest["resources"][resource_name]["aiKey"] = state_config[
"aiKey"
]
if resource_name in state_manifest.get("resources_endpoints", {}):
local_manifest.setdefault("resources_endpoints", {})[resource_name] = (
state_manifest["resources_endpoints"][resource_name]
)
# Delete removed resources
for resource_name in sorted(to_delete):
log.debug(f"Resource {resource_name} marked for deletion (not implemented yet)")
# Execute all actions in parallel with timeout
if actions:
try:
provisioning_tasks = [action[2] for action in actions]
provisioning_results = await asyncio.wait_for(
asyncio.gather(*provisioning_tasks),
timeout=600, # 10 minutes
)
except asyncio.TimeoutError:
raise RuntimeError(
"Resource provisioning timed out after 10 minutes. "
"Check RunPod dashboard for partial deployments."
)
except Exception as e:
log.error(f"Provisioning failed: {e}")
raise RuntimeError(f"Failed to provision resources: {e}") from e
# Update local manifest with deployment info
local_manifest.setdefault("resources_endpoints", {})
for i, (action_type, resource_name, _) in enumerate(actions):
deployed_resource = provisioning_results[i]
# Extract endpoint info
endpoint_id = _normalized_resource_attr(
deployed_resource, "endpoint_id", "id"
)
endpoint_url = getattr(deployed_resource, "endpoint_url", None)
if isinstance(endpoint_url, str):
endpoint_url = endpoint_url.strip() or None
else:
endpoint_url = None
ai_key = _normalized_resource_attr(deployed_resource, "aiKey", "ai_key")
if endpoint_id:
local_manifest["resources"][resource_name]["endpoint_id"] = endpoint_id
if endpoint_url:
local_manifest["resources_endpoints"][resource_name] = endpoint_url
if ai_key:
local_manifest["resources"][resource_name]["aiKey"] = ai_key
log.debug(
f"{'Provisioned' if action_type == 'provision' else 'Updated'}: "
f"{resource_name} -> {endpoint_url}"
)
# Validate load balancer was provisioned
lb_resources = [
name
for name, config in local_manifest.get("resources", {}).items()
if config.get("is_load_balanced", False)
]
if lb_resources:
missing = [
name
for name in lb_resources
if name not in local_manifest.get("resources_endpoints", {})
]
if missing:
provisioned = list(local_manifest.get("resources_endpoints", {}).keys())
raise RuntimeError(
f"Load balancer resource(s) {missing} not provisioned. "
f"Successfully provisioned: {provisioned}"
)
local_manifest_for_disk = _manifest_without_ai_keys(local_manifest)
# Write updated manifest back to local file
manifest_path = Path.cwd() / ".flash" / "flash_manifest.json"
manifest_path.write_text(json.dumps(local_manifest_for_disk, indent=2))
log.debug(f"Local manifest updated at {manifest_path.relative_to(Path.cwd())}")
# Overwrite State Manager manifest with local manifest
await app.update_build_manifest(build_id, local_manifest)
return local_manifest.get("resources_endpoints", {})
def validate_local_manifest() -> Dict[str, Any]:
"""Validate that local manifest exists and is valid.
Returns:
Loaded manifest dictionary
Raises:
FileNotFoundError: If manifest not found
ValueError: If manifest is invalid
"""
manifest_path = Path.cwd() / ".flash" / "flash_manifest.json"
if not manifest_path.exists():
raise FileNotFoundError(
f"Manifest not found at {manifest_path}. "
"Run 'flash deploy' to build and deploy your project."
)
try:
with open(manifest_path) as f:
manifest = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid manifest JSON at {manifest_path}: {e}") from e
if not manifest or "resources" not in manifest:
raise ValueError(
f"Invalid manifest at {manifest_path}: missing 'resources' section"
)
return manifest
async def deploy_from_uploaded_build(
app: FlashApp,
build_id: str,
env_name: str,
local_manifest: Dict[str, Any],
) -> Dict[str, Any]:
"""Deploy an already-uploaded build to an environment.
Args:
app: FlashApp instance (already resolved)
build_id: ID of the uploaded build
env_name: Target environment name
local_manifest: Validated local manifest dict
Returns:
Deployment result with resources_endpoints and local_manifest keys
"""
environment = await app.get_environment_by_name(env_name)
result = await app.deploy_build_to_environment(build_id, environment_name=env_name)
try:
resources_endpoints = await reconcile_and_provision_resources(
app,
build_id,
env_name,
local_manifest,
environment_id=environment.get("id"),
show_progress=False,
)
log.debug(f"Provisioned {len(resources_endpoints)} resources for {env_name}")
except Exception as e:
log.error(f"Resource provisioning failed: {e}")
raise
result["resources_endpoints"] = resources_endpoints
result["local_manifest"] = local_manifest
return result
def rollback_deployment(name: str, target_version: str):
"""Rollback deployment to a previous version (mock implementation)."""
environments = get_deployment_environments()
if name not in environments:
raise ValueError(f"Environment {name} not found")
# Find target version
target_version_info = None
for version in environments[name]["version_history"]:
if version["version"] == target_version:
target_version_info = version
break
if not target_version_info:
raise ValueError(f"Version {target_version} not found")
# Update current version
environments[name]["current_version"] = target_version
environments[name]["last_deployed"] = datetime.now().isoformat()
# Update version history
for version in environments[name]["version_history"]:
version["is_current"] = version["version"] == target_version
save_deployment_environments(environments)
def get_environment_info(name: str) -> Dict[str, Any]:
"""Get detailed information about an environment."""
environments = get_deployment_environments()
if name not in environments:
raise ValueError(f"Environment {name} not found")
env_info = environments[name].copy()
# Add mock metrics and additional info
if env_info["status"] == "active":
env_info.update(
{
"uptime": "99.9%",
"requests_24h": 145234,
"avg_response_time": "245ms",
"error_rate": "0.02%",
"cpu_usage": "45%",
"memory_usage": "62%",
}
)
# Ensure version history exists and is properly formatted
if "version_history" not in env_info:
env_info["version_history"] = []
# Add sample version history if empty
if not env_info["version_history"] and env_info["current_version"]:
env_info["version_history"] = [
{
"version": env_info["current_version"],
"deployed_at": env_info.get(
"last_deployed", datetime.now().isoformat()
),
"description": "Initial deployment",
"is_current": True,
}
]
return env_info