diff --git a/changes/11613.feature.md b/changes/11613.feature.md new file mode 100644 index 00000000000..bc3b3cd9116 --- /dev/null +++ b/changes/11613.feature.md @@ -0,0 +1 @@ +Add three-stage PROVISIONING sub-status pipeline (PENDING→STARTING→WARMING_UP→RUNNING) for model service routes with ReplicaID typed identifier. diff --git a/src/ai/backend/common/identifier/replica.py b/src/ai/backend/common/identifier/replica.py new file mode 100644 index 00000000000..99755c21214 --- /dev/null +++ b/src/ai/backend/common/identifier/replica.py @@ -0,0 +1,6 @@ +from typing import NewType +from uuid import UUID + +__all__ = ("ReplicaID",) + +ReplicaID = NewType("ReplicaID", UUID) diff --git a/src/ai/backend/manager/api/adapters/scheduling_history/adapter.py b/src/ai/backend/manager/api/adapters/scheduling_history/adapter.py index 5ad530b0b4d..b1d7887801f 100644 --- a/src/ai/backend/manager/api/adapters/scheduling_history/adapter.py +++ b/src/ai/backend/manager/api/adapters/scheduling_history/adapter.py @@ -25,6 +25,7 @@ SessionHistoryNode, ) from ai.backend.common.dto.manager.v2.scheduling_history.types import SubStepResultInfo +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.manager.api.adapter_options.pagination.pagination import PaginationSpec from ai.backend.manager.api.adapters.base import BaseAdapter from ai.backend.manager.data.deployment.types import DeploymentHistoryData, RouteHistoryData @@ -537,7 +538,7 @@ async def route_scoped_search( input: AdminSearchRouteHistoriesInput, ) -> AdminSearchRouteHistoriesPayload: """Search route histories scoped to a route.""" - scope = RouteHistorySearchScope(route_id=route_id) + scope = RouteHistorySearchScope(route_id=ReplicaID(route_id)) querier = self._build_route_querier(input) action_result = ( await self._processors.scheduling_history.search_route_scoped_history.wait_for_complete( diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index cdebb6d1d8c..f164cb412f7 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -110,6 +110,7 @@ class RouteHandlerCategory(enum.StrEnum): LIFECYCLE = "lifecycle" HEALTH = "health" + SYNC = "sync" class DeploymentHandlerCategory(enum.StrEnum): @@ -291,9 +292,6 @@ class RouteTransitionTarget: class RouteStatusTransitions: """Status transitions for route handlers. - Route handlers have success/failure/stale outcomes (no expired/give_up). - Each outcome can change lifecycle status, health status, or both. - Attributes: success: Target state when handler succeeds, None means no change failure: Target state when handler fails, None means no change diff --git a/src/ai/backend/manager/models/routing/row.py b/src/ai/backend/manager/models/routing/row.py index 7813fa905fc..537d4bf48fd 100644 --- a/src/ai/backend/manager/models/routing/row.py +++ b/src/ai/backend/manager/models/routing/row.py @@ -13,6 +13,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload from ai.backend.common.identifier.deployment import DeploymentID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( @@ -53,7 +54,7 @@ class RoutingRow(Base): # type: ignore[misc] sa.UniqueConstraint("endpoint", "session", name="uq_routings_endpoint_session"), ) - id: Mapped[uuid.UUID] = mapped_column( + id: Mapped[ReplicaID] = mapped_column( "id", GUID, primary_key=True, server_default=sa.text("uuid_generate_v4()") ) endpoint: Mapped[DeploymentID] = mapped_column( @@ -237,30 +238,6 @@ async def get( raise NoResultFound return row - def __init__( - self, - id: uuid.UUID, - endpoint: DeploymentID, - session: uuid.UUID | None, - session_owner: uuid.UUID, - domain: str, - project: uuid.UUID, - revision: uuid.UUID, - status: RouteStatus = RouteStatus.PROVISIONING, - traffic_ratio: float = 1.0, - traffic_status: RouteTrafficStatus = RouteTrafficStatus.ACTIVE, - ) -> None: - self.id = id - self.endpoint = endpoint - self.session = session - self.session_owner = session_owner - self.domain = domain - self.project = project - self.status = status - self.traffic_ratio = traffic_ratio - self.revision = revision - self.traffic_status = traffic_status - def delegate_ownership(self, user_uuid: uuid.UUID) -> None: self.session_owner = user_uuid diff --git a/src/ai/backend/manager/models/scheduling_history/row.py b/src/ai/backend/manager/models/scheduling_history/row.py index 28c89cc076e..09bd2aa902d 100644 --- a/src/ai/backend/manager/models/scheduling_history/row.py +++ b/src/ai/backend/manager/models/scheduling_history/row.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Mapped, mapped_column from ai.backend.common.data.model_deployment.types import ModelDeploymentStatus +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import KernelId, SessionId from ai.backend.manager.data.deployment.types import ( DeploymentHandlerCategory, @@ -248,7 +249,7 @@ class RouteHistoryRow(Base): # type: ignore[misc] id: Mapped[uuid.UUID] = mapped_column( "id", GUID, primary_key=True, server_default=sa.text("uuid_generate_v4()") ) - route_id: Mapped[uuid.UUID] = mapped_column("route_id", GUID, nullable=False, index=True) + route_id: Mapped[ReplicaID] = mapped_column("route_id", GUID, nullable=False, index=True) deployment_id: Mapped[uuid.UUID] = mapped_column( "deployment_id", GUID, nullable=False, index=True ) diff --git a/src/ai/backend/manager/repositories/deployment/creators/route.py b/src/ai/backend/manager/repositories/deployment/creators/route.py index 4287d34812e..d6a4641f766 100644 --- a/src/ai/backend/manager/repositories/deployment/creators/route.py +++ b/src/ai/backend/manager/repositories/deployment/creators/route.py @@ -39,13 +39,13 @@ class RouteCreatorSpec(CreatorSpec[RoutingRow]): @override def build_row(self) -> RoutingRow: return RoutingRow( - id=uuid.uuid4(), endpoint=self.deployment_id, session=None, session_owner=self.session_owner_id, domain=self.domain, project=self.project_id, status=RouteStatus.PROVISIONING, + sub_status=RouteSubStatus.PENDING, traffic_ratio=self.traffic_ratio, revision=self.revision_id, traffic_status=self.traffic_status, diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 656405e4c63..c5f364cf920 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -29,6 +29,7 @@ from ai.backend.common.identifier.deployment_preset import DeploymentPresetID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID from ai.backend.common.identifier.image import ImageID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.identifier.resource_group import ResourceGroupName from ai.backend.common.identifier.runtime_variant import RuntimeVariantID from ai.backend.common.identifier.vfolder import VFolderUUID @@ -179,6 +180,8 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + RouteSessionInfo, + RouteSessionKernelInfo, ) from ai.backend.manager.repositories.scheduler.types.session_creation import ( ContainerUserContext, @@ -1208,7 +1211,7 @@ async def get_endpoint_id_by_session( async def fetch_route_service_discovery_info( self, - route_ids: set[uuid.UUID], + route_ids: set[ReplicaID], ) -> list[RouteServiceDiscoveryInfo]: """Fetch service discovery information for routes. @@ -1770,9 +1773,9 @@ async def update_route_status_bulk_with_history( async def _get_last_route_histories_by_category( self, db_sess: SASession, - route_ids: list[uuid.UUID], + route_ids: list[ReplicaID], category: RouteHandlerCategory, - ) -> dict[uuid.UUID, RouteHistoryRow]: + ) -> dict[ReplicaID, RouteHistoryRow]: """Get last history records per route filtered by handler category.""" if not route_ids: return {} @@ -1796,8 +1799,8 @@ async def _get_last_route_histories_by_category( async def _get_last_route_histories_bulk( self, db_sess: SASession, - route_ids: list[uuid.UUID], - ) -> dict[uuid.UUID, RouteHistoryRow]: + route_ids: list[ReplicaID], + ) -> dict[ReplicaID, RouteHistoryRow]: """Get last history records for multiple routes efficiently.""" if not route_ids: return {} @@ -1927,15 +1930,15 @@ async def fetch_kernel_connection_info( async def update_route_replica_info( self, - updates: dict[uuid.UUID, tuple[str, int]], + updates: dict[ReplicaID, RouteSessionKernelInfo], ) -> None: """Update replica_host and replica_port for routes.""" async with self._begin_session_read_committed() as db_sess: - for route_id, (host, port) in updates.items(): + for route_id, kernel in updates.items(): query = ( sa.update(RoutingRow) .where(RoutingRow.id == route_id) - .values(replica_host=host, replica_port=port) + .values(replica_host=kernel.replica_host, replica_port=kernel.replica_port) ) await db_sess.execute(query) @@ -2129,8 +2132,8 @@ async def fetch_deployment_context( async def fetch_session_statuses_by_route_ids( self, - route_ids: set[uuid.UUID], - ) -> Mapping[uuid.UUID, SessionStatus | None]: + route_ids: set[ReplicaID], + ) -> Mapping[ReplicaID, SessionStatus | None]: """Fetch session statuses for multiple routes. Args: @@ -2158,12 +2161,83 @@ async def fetch_session_statuses_by_route_ids( rows = result.all() # 결과를 매핑으로 변환 - status_map: dict[uuid.UUID, SessionStatus | None] = {} + status_map: dict[ReplicaID, SessionStatus | None] = {} for route_id, session_status in rows: - status_map[route_id] = session_status + status_map[ReplicaID(route_id)] = session_status return status_map + async def fetch_route_session_kernel_infos( + self, + route_ids: set[ReplicaID], + ) -> Mapping[ReplicaID, RouteSessionInfo | None]: + """Fetch session status and kernel connection info for multiple routes. + + Args: + route_ids: Set of route IDs to fetch information for + + Returns: + Mapping of route_id to RouteSessionInfo: + - None → route has no session linked + - RouteSessionInfo(status=TERMINAL, kernel=None) → session terminated + - RouteSessionInfo(status=RUNNING, kernel=RouteSessionKernelInfo(host, port)) → ready + - RouteSessionInfo(status=PREPARING, kernel=None) → not yet running + """ + if not route_ids: + return {} + + async with self._begin_readonly_session_read_committed() as db_sess: + query = ( + sa.select( + RoutingRow.id, + SessionRow.status, + KernelRow.kernel_host, + KernelRow.service_ports, + ) + .select_from(RoutingRow) + .outerjoin(SessionRow, RoutingRow.session == SessionRow.id) + .outerjoin( + KernelRow, + sa.and_( + KernelRow.session_id == RoutingRow.session, + KernelRow.cluster_role == "main", + ), + ) + .where(RoutingRow.id.in_(route_ids)) + ) + + result = await db_sess.execute(query) + rows = result.all() + + info_map: dict[ReplicaID, RouteSessionInfo | None] = {} + for row in rows: + route_id = ReplicaID(row.id) + if row.status is None: + info_map[route_id] = None + continue + + kernel: RouteSessionKernelInfo | None = None + if row.kernel_host and row.service_ports: + inference_port: int | None = None + for port_info in row.service_ports: + if port_info.get("is_inference", False): + host_ports = port_info.get("host_ports", []) + if host_ports: + inference_port = host_ports[0] + break + if inference_port is not None: + kernel = RouteSessionKernelInfo( + replica_host=row.kernel_host, + replica_port=inference_port, + ) + + info_map[route_id] = RouteSessionInfo( + status=row.status, + kernel=kernel, + ) + + return info_map + async def fetch_route_connection_infos( self, *, diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index fa1f6959b0f..4f1111ab4a7 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -21,6 +21,7 @@ from ai.backend.common.identifier.deployment_preset import DeploymentPresetID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID from ai.backend.common.identifier.image import ImageID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.identifier.resource_group import ResourceGroupName from ai.backend.common.identifier.runtime_variant import RuntimeVariantID from ai.backend.common.identifier.vfolder import VFolderUUID @@ -104,7 +105,13 @@ from .db_source import DeploymentDBSource from .storage_source import DeploymentStorageSource -from .types import ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo +from .types import ( + ProjectDeploymentSearchScope, + RouteData, + RouteServiceDiscoveryInfo, + RouteSessionInfo, + RouteSessionKernelInfo, +) log = BraceStyleAdapter(logging.getLogger(__name__)) @@ -758,7 +765,7 @@ async def fetch_kernel_connection_info( @deployment_repository_resilience.apply() async def update_route_replica_info( self, - updates: dict[uuid.UUID, tuple[str, int]], + updates: dict[ReplicaID, RouteSessionKernelInfo], ) -> None: """Update replica_host and replica_port for routes.""" await self._db_source.update_route_replica_info(updates) @@ -1105,11 +1112,27 @@ async def calculate_desired_replicas_for_deployment( @deployment_repository_resilience.apply() async def fetch_session_statuses_by_route_ids( self, - route_ids: set[uuid.UUID], - ) -> Mapping[uuid.UUID, SessionStatus | None]: + route_ids: set[ReplicaID], + ) -> Mapping[ReplicaID, SessionStatus | None]: """Fetch session IDs for multiple routes.""" return await self._db_source.fetch_session_statuses_by_route_ids(route_ids) + @deployment_repository_resilience.apply() + async def fetch_route_session_kernel_infos( + self, + route_ids: set[ReplicaID], + ) -> Mapping[ReplicaID, RouteSessionInfo | None]: + """Fetch session status and kernel connection info for multiple routes. + + Returns: + Mapping of route_id to RouteSessionInfo: + - None → route has no session linked + - RouteSessionInfo(status=TERMINAL, kernel=None) → session terminated + - RouteSessionInfo(status=RUNNING, kernel=RouteSessionKernelInfo(host, port)) → ready + - RouteSessionInfo(status=PREPARING, kernel=None) → not yet running + """ + return await self._db_source.fetch_route_session_kernel_infos(route_ids) + @deployment_repository_resilience.apply() async def fetch_route_connection_infos( self, @@ -1150,7 +1173,7 @@ async def get_endpoint_id_by_session( @deployment_repository_resilience.apply() async def fetch_route_service_discovery_info( self, - route_ids: set[uuid.UUID], + route_ids: set[ReplicaID], ) -> list[RouteServiceDiscoveryInfo]: """Fetch service discovery information for routes. diff --git a/src/ai/backend/manager/repositories/deployment/types/__init__.py b/src/ai/backend/manager/repositories/deployment/types/__init__.py index 741b2a06b16..69a0d6434ba 100644 --- a/src/ai/backend/manager/repositories/deployment/types/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/types/__init__.py @@ -7,6 +7,8 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + RouteSessionInfo, + RouteSessionKernelInfo, ) __all__ = [ @@ -16,4 +18,6 @@ "ProjectDeploymentSearchScope", "RouteData", "RouteServiceDiscoveryInfo", + "RouteSessionInfo", + "RouteSessionKernelInfo", ] diff --git a/src/ai/backend/manager/repositories/deployment/types/endpoint.py b/src/ai/backend/manager/repositories/deployment/types/endpoint.py index 30470007a0a..d8779fde3a1 100644 --- a/src/ai/backend/manager/repositories/deployment/types/endpoint.py +++ b/src/ai/backend/manager/repositories/deployment/types/endpoint.py @@ -14,6 +14,7 @@ from ai.backend.common.data.endpoint.types import EndpointLifecycle from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -21,6 +22,7 @@ RouteSubStatus, RouteTrafficStatus, ) +from ai.backend.manager.data.session.types import SessionStatus from ai.backend.manager.errors.resource import ProjectNotFound from ai.backend.manager.models.endpoint.row import EndpointRow from ai.backend.manager.models.group.row import GroupRow @@ -67,7 +69,7 @@ class EndpointData: class RouteData: """Data structure for model service route.""" - route_id: uuid.UUID + route_id: ReplicaID deployment_id: DeploymentID session_id: SessionId | None status: RouteStatus @@ -84,11 +86,27 @@ class RouteData: error_data: dict[str, Any] = field(default_factory=dict) +@dataclass(frozen=True) +class RouteSessionKernelInfo: + """Kernel connection info — only present when session is RUNNING with inference port.""" + + replica_host: str + replica_port: int + + +@dataclass(frozen=True) +class RouteSessionInfo: + """Session state for a STARTING route. kernel is None when not yet RUNNING or no port.""" + + status: SessionStatus + kernel: RouteSessionKernelInfo | None + + @dataclass class RouteServiceDiscoveryInfo: """Service discovery information for a model service route.""" - route_id: uuid.UUID + route_id: ReplicaID deployment_id: DeploymentID endpoint_name: str runtime_variant: str diff --git a/src/ai/backend/manager/repositories/scheduling_history/creators.py b/src/ai/backend/manager/repositories/scheduling_history/creators.py index 732822aba97..8d093b8db42 100644 --- a/src/ai/backend/manager/repositories/scheduling_history/creators.py +++ b/src/ai/backend/manager/repositories/scheduling_history/creators.py @@ -4,10 +4,10 @@ from dataclasses import dataclass, field from typing import override -from uuid import UUID from ai.backend.common.data.endpoint.types import EndpointLifecycle from ai.backend.common.identifier.deployment import DeploymentID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import KernelId, SessionId from ai.backend.manager.data.deployment.types import ( DeploymentHandlerCategory, @@ -127,7 +127,7 @@ def build_row(self) -> DeploymentHistoryRow: class RouteHistoryCreatorSpec(CreatorSpec[RouteHistoryRow]): """CreatorSpec for route history.""" - route_id: UUID + route_id: ReplicaID deployment_id: DeploymentID category: RouteHandlerCategory phase: str # RouteLifecycleType value diff --git a/src/ai/backend/manager/repositories/scheduling_history/types.py b/src/ai/backend/manager/repositories/scheduling_history/types.py index 9441f45d96c..dca6a6b4ba1 100644 --- a/src/ai/backend/manager/repositories/scheduling_history/types.py +++ b/src/ai/backend/manager/repositories/scheduling_history/types.py @@ -7,6 +7,7 @@ from uuid import UUID from ai.backend.common.data.filter_specs import UUIDEqualMatchSpec +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.manager.errors.deployment import EndpointNotFound from ai.backend.manager.errors.kernel import SessionNotFound from ai.backend.manager.errors.service import RouteNotFound @@ -100,7 +101,7 @@ class RouteHistorySearchScope(SearchScope): Used for entity-scoped queries where route_id is the scope parameter. """ - route_id: UUID + route_id: ReplicaID """Required. The route to search history for.""" def to_condition(self) -> QueryCondition: diff --git a/src/ai/backend/manager/sokovan/deployment/route/coordinator.py b/src/ai/backend/manager/sokovan/deployment/route/coordinator.py index 9aa9a3abcea..7c4cba36905 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/route/coordinator.py @@ -40,14 +40,16 @@ ProvisioningRouteHandler, RouteEvictionHandler, RouteHandler, + RunningRouteHandler, ServiceDiscoverySyncHandler, + StartingRouteHandler, TerminatingRouteHandler, + WarmingUpRouteHandler, ) from ai.backend.manager.sokovan.deployment.route.handlers.observer import ( RouteHealthObserver, RouteObserver, ) -from ai.backend.manager.sokovan.deployment.route.handlers.running import RunningRouteHandler from ai.backend.manager.sokovan.deployment.route.recorder import RouteRecorderContext from ai.backend.manager.sokovan.deployment.route.types import ( RouteExecutionResult, @@ -150,6 +152,14 @@ def _init_handlers(self, executor: RouteExecutor) -> Mapping[RouteLifecycleType, route_executor=executor, event_producer=self._event_producer, ), + RouteLifecycleType.CHECK_STARTING: StartingRouteHandler( + route_executor=executor, + event_producer=self._event_producer, + ), + RouteLifecycleType.CHECK_WARMING_UP: WarmingUpRouteHandler( + route_executor=executor, + event_producer=self._event_producer, + ), RouteLifecycleType.RUNNING: RunningRouteHandler( route_executor=executor, event_producer=self._event_producer, @@ -429,7 +439,21 @@ def _create_task_specs() -> list[RouteTaskSpec]: long_interval=60.0, initial_delay=10.0, ), - # Check running routes frequently with both short and long cycles + # Check STARTING routes: wait for replica host/port + RouteTaskSpec( + RouteLifecycleType.CHECK_STARTING, + short_interval=5.0, + long_interval=60.0, + initial_delay=10.0, + ), + # Check WARMING_UP routes: initial health probe + RouteTaskSpec( + RouteLifecycleType.CHECK_WARMING_UP, + short_interval=5.0, + long_interval=60.0, + initial_delay=15.0, + ), + # Check RUNNING routes session liveness RouteTaskSpec( RouteLifecycleType.RUNNING, short_interval=10.0, diff --git a/src/ai/backend/manager/sokovan/deployment/route/executor.py b/src/ai/backend/manager/sokovan/deployment/route/executor.py index 5ab9e42016d..6e3c54528cf 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/executor.py +++ b/src/ai/backend/manager/sokovan/deployment/route/executor.py @@ -2,7 +2,7 @@ import logging from collections.abc import Mapping, Sequence -from typing import Any +from typing import cast from uuid import UUID from ai.backend.common.clients.http_client.client_pool import ClientPool @@ -25,6 +25,7 @@ from ai.backend.common.exception import BackendAIError from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.service_discovery import ServiceDiscovery from ai.backend.common.service_discovery.service_discovery import ModelServiceMetadata from ai.backend.common.types import SessionId @@ -37,6 +38,7 @@ RouteStatus, RouteTrafficStatus, ) +from ai.backend.manager.data.session.types import SessionStatus from ai.backend.manager.errors.deployment import ( EndpointNotFound, RouteSessionNotFound, @@ -45,7 +47,11 @@ from ai.backend.manager.models.routing.conditions import RouteConditions from ai.backend.manager.repositories.base import BatchQuerier, NoPagination from ai.backend.manager.repositories.deployment import DeploymentRepository -from ai.backend.manager.repositories.deployment.types import RouteData +from ai.backend.manager.repositories.deployment.types import ( + RouteData, + RouteSessionInfo, + RouteSessionKernelInfo, +) from ai.backend.manager.sokovan.deployment.deployment_draft_builder import ( DeploymentSessionDraftBuilder, ) @@ -218,6 +224,82 @@ async def terminate_routes(self, routes: Sequence[RouteData]) -> RouteExecutionR errors=[], ) + async def check_starting_routes(self, routes: Sequence[RouteData]) -> RouteExecutionResult: + """Check if STARTING routes have their sessions ready. + + Queries session status and kernel connection info for routes whose sessions + are being provisioned. Transitions routes to: + - success (replica info ready): session is RUNNING with an inference port + - error (session terminated): session reached a terminal status + - skip (still starting): session is not yet RUNNING + + Args: + routes: Routes in STARTING state to check + + Returns: + Result containing routes that are ready (success) or failed (error) + """ + if not routes: + return RouteExecutionResult(successes=[], errors=[]) + + route_ids = {ReplicaID(route.route_id) for route in routes} + session_infos: dict[ReplicaID, RouteSessionInfo | None] = dict( + await self._deployment_repo.fetch_route_session_kernel_infos(route_ids) + ) + + successes: list[RouteData] = [] + errors: list[RouteExecutionError] = [] + updates: dict[ReplicaID, RouteSessionKernelInfo] = {} + + for route in routes: + replica_id = ReplicaID(route.route_id) + info = session_infos.get(replica_id) + + if info is None: + errors.append( + RouteExecutionError( + route_info=route, + reason="Session not found", + error_detail="Route has no session linked", + error_code=None, + ) + ) + continue + + if info.status.is_terminal(): + errors.append( + RouteExecutionError( + route_info=route, + reason="Session terminated", + error_detail=f"Session reached terminal status: {info.status}", + error_code=None, + ) + ) + continue + + if info.kernel is not None: + updates[replica_id] = info.kernel + successes.append(route) + elif info.status == SessionStatus.RUNNING: + errors.append( + RouteExecutionError( + route_info=route, + reason="Session running but no inference port available", + error_detail=f"Session status: {info.status}, kernel has no inference port", + error_code=None, + ) + ) + # else: session not yet RUNNING → skip (stay in STARTING) + + if updates: + await self._deployment_repo.update_route_replica_info(updates) + await self._initialize_health_records(successes, updates) + + return RouteExecutionResult( + successes=successes, + errors=errors, + ) + async def check_running_routes(self, routes: Sequence[RouteData]) -> RouteExecutionResult: """Check health status of running routes. @@ -254,98 +336,36 @@ async def check_running_routes(self, routes: Sequence[RouteData]) -> RouteExecut ) ) - # Phase 3: Populate replica connection info for routes missing it - routes_missing_replica = [r for r in successes if not r.replica_host] - if routes_missing_replica: - with RouteRecorderContext.shared_phase("populate_replica_info"): - with RouteRecorderContext.shared_step("fetch_kernel_connection_info"): - await self._populate_replica_info(routes_missing_replica) - - # Phase 4: Ensure RouteHealthRecords exist in Valkey for routes with replica info - routes_with_replica = [r for r in successes if r.replica_host and r.replica_port] - if routes_with_replica: - await self._ensure_health_records(routes_with_replica) - return RouteExecutionResult( successes=successes, errors=errors, ) - async def _populate_replica_info(self, routes: Sequence[RouteData]) -> None: - """Fetch kernel host/port, store on route, and initialize RouteHealthRecords in Valkey.""" - session_ids = [r.session_id for r in routes if r.session_id] - if not session_ids: - return - - kernel_info = await self._deployment_repo.fetch_kernel_connection_info(session_ids) - updates: dict[UUID, tuple[str, int]] = {} - populated_routes: list[RouteData] = [] - for route in routes: - if route.session_id and route.session_id in kernel_info: - info = kernel_info[route.session_id] - if info[0] and info[1]: - updates[route.route_id] = info - populated_routes.append(route) - - if updates: - await self._deployment_repo.update_route_replica_info(updates) - - if populated_routes: - await self._initialize_health_records(populated_routes, updates) - - async def _ensure_health_records(self, routes: Sequence[RouteData]) -> None: - """Ensure RouteHealthRecords exist in Valkey for routes that already have replica info. - - Routes may already have replica_host/port in DB (set by a previous cycle or legacy code) - but lack a RouteHealthRecord in Valkey. This method checks and initializes missing records. - """ - route_id_strs = [str(r.route_id) for r in routes] - existing = await self._valkey_schedule.get_route_health_records_batch(route_id_strs) - missing = [r for r in routes if existing.get(str(r.route_id)) is None] - if not missing: - return - log.warning( - "RouteHealthRecord missing in Valkey for {} routes, re-initializing: {}", - len(missing), - [str(r.route_id)[:8] for r in missing], - ) - replica_info = { - r.route_id: (r.replica_host, r.replica_port) - for r in missing - if r.replica_host and r.replica_port - } - await self._initialize_health_records(missing, replica_info) - async def _initialize_health_records( self, routes: Sequence[RouteData], - replica_info: Mapping[UUID, tuple[str, int]], + replica_info: Mapping[ReplicaID, RouteSessionKernelInfo], ) -> None: """Create RouteHealthRecords in Valkey for routes that just got replica info.""" revision_ids = {r.revision_id for r in routes} health_configs = await self._deployment_repo.fetch_health_check_configs_by_revision_ids( revision_ids ) - redis_time = await self._valkey_schedule.get_redis_time() - - # Read existing running_at values that were set when routes transitioned to RUNNING - # These may be in partial hashes (only running_at field), so read raw field directly - running_at_map = await self._valkey_schedule.get_route_running_at_batch([ - str(r.route_id) for r in routes - ]) + route_id_strs = [str(r.route_id) for r in routes] + existing_running_at = await self._valkey_schedule.get_route_running_at_batch(route_id_strs) + current_time = await self._valkey_schedule.get_redis_time() records: list[RouteHealthRecord] = [] for route in routes: - host, port = replica_info[route.route_id] + kernel = replica_info[route.route_id] health_config = health_configs.get(route.revision_id) health_path = health_config.path if health_config else "/" initial_delay = health_config.initial_delay if health_config else 60.0 created_at = int(route.created_at.timestamp()) - # Use running_at from Valkey (set at RUNNING transition), fallback to redis_time route_id_str = str(route.route_id) - running_at = running_at_map.get(route_id_str) or redis_time + running_at = existing_running_at.get(route_id_str) or current_time initial_delay_until = running_at + int(initial_delay) records.append( @@ -354,8 +374,8 @@ async def _initialize_health_records( created_at=created_at, initial_delay_until=initial_delay_until, health_path=health_path, - inference_port=port, - replica_host=host, + inference_port=kernel.replica_port, + replica_host=kernel.replica_host, running_at=running_at, ) ) @@ -364,6 +384,52 @@ async def _initialize_health_records( await self._valkey_schedule.initialize_route_health_records_batch(records) log.debug("Initialized {} RouteHealthRecords in Valkey", len(records)) + async def check_warming_up_health(self, routes: Sequence[RouteData]) -> RouteExecutionResult: + """Check health of PROVISIONING+WARMING_UP routes for initial activation. + + - success: health probe passed, or no health check configured → RUNNING+ACTIVE + - failure: initial_delay exceeded without a passing probe → TERMINATING + - (no transition): still within initial_delay → route stays WARMING_UP + """ + route_id_strs = [str(r.route_id) for r in routes] + revision_ids = {r.revision_id for r in routes} + + records = await self._valkey_schedule.get_route_health_records_batch(route_id_strs) + health_configs = await self._deployment_repo.fetch_health_check_configs_by_revision_ids( + revision_ids + ) + current_time = await self._valkey_schedule.get_redis_time() + + successes: list[RouteData] = [] + errors: list[RouteExecutionError] = [] + + for route in routes: + health_config = health_configs.get(route.revision_id) + if health_config is None: + successes.append(route) + continue + + route_id_str = str(route.route_id) + record = records.get(route_id_str) + + if record is None: + continue + + if record.last_check > 0 and not record.is_stale(current_time) and record.healthy: + successes.append(route) + continue + + if current_time > record.initial_delay_until: + errors.append( + RouteExecutionError( + route_info=route, + reason="Route warming-up timed out waiting for healthy probe", + error_detail=f"Elapsed {current_time - record.initial_delay_until}s after initial_delay", + ) + ) + + return RouteExecutionResult(successes=successes, errors=errors) + async def check_route_health(self, routes: Sequence[RouteData]) -> RouteExecutionResult: """ Check health status of routes and push newly-healthy ones to AppProxy. @@ -895,7 +961,9 @@ async def unregister_routes_now( ) continue - route_ids = [route.route_id for route in routes_by_endpoint[endpoint_id]] + route_ids = cast( + list[UUID], [route.route_id for route in routes_by_endpoint[endpoint_id]] + ) items_by_target.setdefault((target.addr, target.api_token), []).append( UnregisterRoutesItem( deployment_id=endpoint_id, @@ -1065,7 +1133,7 @@ async def _provision_route( def _verify_route_session_status( self, route: RouteData, - session_statuses: Mapping[UUID, Any], + session_statuses: Mapping[ReplicaID, SessionStatus | None], ) -> None: """Verify that route's session is in a valid state.""" pool = RouteRecorderContext.current_pool() diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/__init__.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/__init__.py index c2cc03a1cdd..237517fb9d0 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/handlers/__init__.py +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/__init__.py @@ -5,8 +5,11 @@ from .health_check import HealthCheckRouteHandler from .provisioning import ProvisioningRouteHandler from .route_eviction import RouteEvictionHandler +from .running import RunningRouteHandler from .service_discovery_sync import ServiceDiscoverySyncHandler +from .starting import StartingRouteHandler from .terminating import TerminatingRouteHandler +from .warming_up import WarmingUpRouteHandler __all__ = [ "AppProxySyncRouteHandler", @@ -14,6 +17,9 @@ "ProvisioningRouteHandler", "RouteEvictionHandler", "RouteHandler", + "RunningRouteHandler", "ServiceDiscoverySyncHandler", + "StartingRouteHandler", "TerminatingRouteHandler", + "WarmingUpRouteHandler", ] diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/appproxy_sync.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/appproxy_sync.py index 8c4b7087424..bda30a65756 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/handlers/appproxy_sync.py +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/appproxy_sync.py @@ -52,7 +52,7 @@ def lock_id(self) -> LockID | None: @classmethod def category(cls) -> RouteHandlerCategory: - return RouteHandlerCategory.HEALTH + return RouteHandlerCategory.SYNC @classmethod def target_statuses(cls) -> RouteTargetStatuses: diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/provisioning.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/provisioning.py index cb150854af3..b7c28b4870c 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/handlers/provisioning.py +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/provisioning.py @@ -7,9 +7,9 @@ from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( RouteHandlerCategory, - RouteHealthStatus, RouteStatus, RouteStatusTransitions, + RouteSubStatus, RouteTargetStatuses, RouteTransitionTarget, ) @@ -24,7 +24,7 @@ class ProvisioningRouteHandler(RouteHandler): - """Handler for provisioning routes (PROVISIONING -> UNHEALTHY).""" + """Handler for provisioning routes: enqueues sessions (PROVISIONING+PENDING → STARTING).""" def __init__( self, @@ -52,18 +52,19 @@ def category(cls) -> RouteHandlerCategory: def target_statuses(cls) -> RouteTargetStatuses: return RouteTargetStatuses( lifecycle=[RouteStatus.PROVISIONING], - health=list(RouteHealthStatus), + sub_status=[RouteSubStatus.PENDING], ) @classmethod def status_transitions(cls) -> RouteStatusTransitions: - """Provisioning → RUNNING + NOT_CHECKED on success, FAILED_TO_START on failure.""" + """Session enqueued → sub_status=STARTING; enqueue failed → FAILED_TO_START.""" return RouteStatusTransitions( success=RouteTransitionTarget( - status=RouteStatus.RUNNING, - health_status=RouteHealthStatus.NOT_CHECKED, + sub_status=RouteSubStatus.STARTING, + ), + failure=RouteTransitionTarget( + status=RouteStatus.FAILED_TO_START, ), - failure=RouteTransitionTarget(status=RouteStatus.FAILED_TO_START), stale=None, ) diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/running.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/running.py index 27fadf497d1..59fec85817a 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/handlers/running.py +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/running.py @@ -24,7 +24,7 @@ class RunningRouteHandler(RouteHandler): - """Handler for checking running routes (HEALTHY/UNHEALTHY).""" + """Checks that RUNNING routes still have live sessions; terminates dead ones.""" def __init__( self, @@ -51,8 +51,7 @@ def category(cls) -> RouteHandlerCategory: @classmethod def target_statuses(cls) -> RouteTargetStatuses: return RouteTargetStatuses( - lifecycle=[RouteStatus.RUNNING, RouteStatus.FAILED_TO_START], - health=list(RouteHealthStatus), + lifecycle=[RouteStatus.RUNNING], ) @classmethod diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/service_discovery_sync.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/service_discovery_sync.py index 8e5e74b7d30..73c2d74381b 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/handlers/service_discovery_sync.py +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/service_discovery_sync.py @@ -45,7 +45,7 @@ def lock_id(self) -> LockID | None: @classmethod def category(cls) -> RouteHandlerCategory: - return RouteHandlerCategory.HEALTH + return RouteHandlerCategory.SYNC @classmethod def target_statuses(cls) -> RouteTargetStatuses: diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/starting.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/starting.py new file mode 100644 index 00000000000..16cd3211705 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/starting.py @@ -0,0 +1,82 @@ +"""Handler for PROVISIONING+STARTING routes: checks host/port readiness.""" + +import logging +from collections.abc import Sequence + +from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.data.deployment.types import ( + RouteHandlerCategory, + RouteStatus, + RouteStatusTransitions, + RouteSubStatus, + RouteTargetStatuses, + RouteTransitionTarget, +) +from ai.backend.manager.repositories.deployment.types import RouteData +from ai.backend.manager.sokovan.deployment.route.executor import RouteExecutor +from ai.backend.manager.sokovan.deployment.route.types import RouteExecutionResult + +from .base import RouteHandler + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +class StartingRouteHandler(RouteHandler): + """Checks if PROVISIONING+STARTING routes have replica host/port available. + + Success (→ WARMING_UP): session is running and host/port is populated. + Failure (→ FAILED_TO_START): session terminated or not found. + Routes running but without host/port yet are silently retried next tick. + """ + + def __init__( + self, + route_executor: RouteExecutor, + event_producer: EventProducer, + ) -> None: + self._route_executor = route_executor + self._event_producer = event_producer + + @classmethod + def name(cls) -> str: + return "check-starting-routes" + + @property + def lock_id(self) -> None: + return None + + @classmethod + def category(cls) -> RouteHandlerCategory: + return RouteHandlerCategory.LIFECYCLE + + @classmethod + def target_statuses(cls) -> RouteTargetStatuses: + return RouteTargetStatuses( + lifecycle=[RouteStatus.PROVISIONING], + sub_status=[RouteSubStatus.STARTING], + ) + + @classmethod + def status_transitions(cls) -> RouteStatusTransitions: + """Host/port ready → sub_status=WARMING_UP; session dead → FAILED_TO_START.""" + return RouteStatusTransitions( + success=RouteTransitionTarget( + sub_status=RouteSubStatus.WARMING_UP, + ), + failure=RouteTransitionTarget( + status=RouteStatus.TERMINATING, + ), + stale=None, + ) + + async def execute(self, routes: Sequence[RouteData]) -> RouteExecutionResult: + log.debug("Checking {} starting routes for host/port readiness", len(routes)) + return await self._route_executor.check_starting_routes(routes) + + async def post_process(self, result: RouteExecutionResult) -> None: + log.info( + "Starting check: {} routes ready (→ warming_up), {} failed", + len(result.successes), + len(result.errors), + ) diff --git a/src/ai/backend/manager/sokovan/deployment/route/handlers/warming_up.py b/src/ai/backend/manager/sokovan/deployment/route/handlers/warming_up.py new file mode 100644 index 00000000000..dc816010dcf --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/route/handlers/warming_up.py @@ -0,0 +1,85 @@ +"""Handler for PROVISIONING+WARMING_UP routes: initial health check to activate traffic.""" + +import logging +from collections.abc import Sequence + +from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.data.deployment.types import ( + RouteHandlerCategory, + RouteStatus, + RouteStatusTransitions, + RouteSubStatus, + RouteTargetStatuses, + RouteTrafficStatus, + RouteTransitionTarget, +) +from ai.backend.manager.repositories.deployment.types import RouteData +from ai.backend.manager.sokovan.deployment.route.executor import RouteExecutor +from ai.backend.manager.sokovan.deployment.route.types import RouteExecutionResult + +from .base import RouteHandler + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +class WarmingUpRouteHandler(RouteHandler): + """Runs initial health probe for PROVISIONING+WARMING_UP routes. + + Success (→ RUNNING+ACTIVE): health probe passes after initial_delay. + Failed or still within initial_delay: no transition (route stays WARMING_UP, retried). + """ + + def __init__( + self, + route_executor: RouteExecutor, + event_producer: EventProducer, + ) -> None: + self._route_executor = route_executor + self._event_producer = event_producer + + @classmethod + def name(cls) -> str: + return "check-warming-up-routes" + + @property + def lock_id(self) -> None: + return None + + @classmethod + def category(cls) -> RouteHandlerCategory: + return RouteHandlerCategory.LIFECYCLE + + @classmethod + def target_statuses(cls) -> RouteTargetStatuses: + return RouteTargetStatuses( + lifecycle=[RouteStatus.PROVISIONING], + sub_status=[RouteSubStatus.WARMING_UP], + ) + + @classmethod + def status_transitions(cls) -> RouteStatusTransitions: + """Health probe passes → RUNNING + HEALTHY + traffic=ACTIVE. + Failure stays WARMING_UP (retry); expired/give_up → TERMINATING. + """ + return RouteStatusTransitions( + success=RouteTransitionTarget( + status=RouteStatus.RUNNING, + traffic_status=RouteTrafficStatus.ACTIVE, + ), + failure=RouteTransitionTarget( + status=RouteStatus.TERMINATING, + ), + stale=None, + ) + + async def execute(self, routes: Sequence[RouteData]) -> RouteExecutionResult: + log.debug("Checking {} warming-up routes for initial health", len(routes)) + return await self._route_executor.check_warming_up_health(routes) + + async def post_process(self, result: RouteExecutionResult) -> None: + log.info( + "Warming-up check: {} routes activated (→ running), {} still probing", + len(result.successes), + len(result.errors) + len(result.stale), + ) diff --git a/src/ai/backend/manager/sokovan/deployment/route/types.py b/src/ai/backend/manager/sokovan/deployment/route/types.py index 62617977813..3cbd0dbd981 100644 --- a/src/ai/backend/manager/sokovan/deployment/route/types.py +++ b/src/ai/backend/manager/sokovan/deployment/route/types.py @@ -10,6 +10,8 @@ class RouteLifecycleType(StrEnum): """Types of route lifecycle operations.""" PROVISIONING = "provisioning" + CHECK_STARTING = "check_starting" + CHECK_WARMING_UP = "check_warming_up" RUNNING = "running" HEALTH_CHECK = "health_check" ROUTE_EVICTION = "route_eviction" diff --git a/tests/unit/manager/repositories/deployment/test_deployment_repository.py b/tests/unit/manager/repositories/deployment/test_deployment_repository.py index 8487bd3690e..875f206a39f 100644 --- a/tests/unit/manager/repositories/deployment/test_deployment_repository.py +++ b/tests/unit/manager/repositories/deployment/test_deployment_repository.py @@ -21,6 +21,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID from ai.backend.common.identifier.image import ImageID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.identifier.runtime_variant import RuntimeVariantID from ai.backend.common.identifier.vfolder import VFolderUUID from ai.backend.common.types import ( @@ -736,7 +737,9 @@ async def test_fetch_single_route_with_inference_port( """Test fetching service discovery info for a single route with inference port.""" kernel_id, kernel_host, inference_port = test_kernel_with_inference_port - result = await deployment_repository.fetch_route_service_discovery_info({test_route_id}) + result = await deployment_repository.fetch_route_service_discovery_info({ + ReplicaID(test_route_id) + }) assert len(result) == 1 info = result[0] @@ -777,7 +780,9 @@ async def test_fetch_route_without_inference_port( db_sess.add(route) await db_sess.flush() - result = await deployment_repository.fetch_route_service_discovery_info({route_id}) + result = await deployment_repository.fetch_route_service_discovery_info({ + ReplicaID(route_id) + }) # Should return empty list because kernel has no inference port assert len(result) == 0 @@ -796,7 +801,7 @@ async def test_fetch_nonexistent_route_ids( deployment_repository: DeploymentRepository, ) -> None: """Test that nonexistent route IDs return empty list.""" - nonexistent_id = uuid.uuid4() + nonexistent_id = ReplicaID(uuid.uuid4()) result = await deployment_repository.fetch_route_service_discovery_info({nonexistent_id}) @@ -815,7 +820,7 @@ async def test_fetch_multiple_routes( ) -> None: """Test fetching service discovery info for multiple routes.""" # Create 3 sets of endpoint/session/kernel/route - route_ids = set() + route_ids: set[ReplicaID] = set() endpoint_ids = [] async with db_with_cleanup.begin_session() as db_sess: @@ -969,7 +974,7 @@ async def test_fetch_multiple_routes( db_sess.add(kernel) # Create route - route_id = uuid.uuid4() + route_id = ReplicaID(uuid.uuid4()) route = RoutingRow( id=route_id, endpoint=endpoint_id, diff --git a/tests/unit/manager/repositories/deployment/test_deployment_repository_history.py b/tests/unit/manager/repositories/deployment/test_deployment_repository_history.py index 09f8348d59a..2ca8dcc19b1 100644 --- a/tests/unit/manager/repositories/deployment/test_deployment_repository_history.py +++ b/tests/unit/manager/repositories/deployment/test_deployment_repository_history.py @@ -14,6 +14,7 @@ from ai.backend.common.data.endpoint.types import EndpointLifecycle from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.image import ImageID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import AccessKey, BinarySize, ResourceSlot from ai.backend.manager.data.auth.hash import PasswordHashAlgorithm from ai.backend.manager.data.deployment.types import RouteHandlerCategory, RouteStatus @@ -882,7 +883,7 @@ async def test_updates_status_and_creates_history_atomically( ] history_specs = [ RouteHistoryCreatorSpec( - route_id=test_provisioning_route_id, + route_id=ReplicaID(test_provisioning_route_id), deployment_id=test_endpoint_id, category=RouteHandlerCategory.LIFECYCLE, phase="provisioning", @@ -1507,7 +1508,7 @@ async def test_merge_same_phase_error_to_status( ] history_specs = [ RouteHistoryCreatorSpec( - route_id=route_id, + route_id=ReplicaID(route_id), deployment_id=endpoint_id, category=RouteHandlerCategory.LIFECYCLE, phase="provisioning", # Same @@ -1550,7 +1551,7 @@ async def test_no_merge_different_to_status( ] history_specs = [ RouteHistoryCreatorSpec( - route_id=route_id, + route_id=ReplicaID(route_id), deployment_id=endpoint_id, category=RouteHandlerCategory.LIFECYCLE, phase="provisioning", # Same diff --git a/tests/unit/manager/services/scheduling_history/test_scheduling_history_service.py b/tests/unit/manager/services/scheduling_history/test_scheduling_history_service.py index e9a9dc67564..4f9165a235d 100644 --- a/tests/unit/manager/services/scheduling_history/test_scheduling_history_service.py +++ b/tests/unit/manager/services/scheduling_history/test_scheduling_history_service.py @@ -12,6 +12,7 @@ import pytest from dateutil.tz import tzutc +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( DeploymentHandlerCategory, @@ -282,7 +283,7 @@ async def test_scope_filters_by_route_id( mock_repository: MagicMock, querier: BatchQuerier, ) -> None: - route_id = uuid4() + route_id = ReplicaID(uuid4()) history_item = _make_route_history() mock_repository.search_route_scoped_history.return_value = RouteHistoryListResult( items=[history_item], diff --git a/tests/unit/manager/sokovan/deployment/executor/conftest.py b/tests/unit/manager/sokovan/deployment/executor/conftest.py index 2717fbfd124..1e142fd480d 100644 --- a/tests/unit/manager/sokovan/deployment/executor/conftest.py +++ b/tests/unit/manager/sokovan/deployment/executor/conftest.py @@ -13,6 +13,7 @@ from ai.backend.common.data.endpoint.types import EndpointLifecycle, ScalingState from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import RuntimeVariant from ai.backend.manager.data.deployment.types import ( DeploymentInfo, @@ -178,7 +179,7 @@ def _create_route_data( ) -> RouteData: """Create RouteData for tests.""" return RouteData( - route_id=route_id or uuid4(), + route_id=ReplicaID(route_id) if route_id is not None else ReplicaID(uuid4()), deployment_id=DeploymentID(endpoint_id or uuid4()), session_id=None, status=status, diff --git a/tests/unit/manager/sokovan/deployment/route/executor/conftest.py b/tests/unit/manager/sokovan/deployment/route/executor/conftest.py index 659c06f01cd..f7dd03c678d 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/conftest.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/conftest.py @@ -12,6 +12,7 @@ from ai.backend.common.data.endpoint.types import EndpointLifecycle, ScalingState from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( DeploymentInfo, @@ -193,7 +194,7 @@ def _create_route_data( ) -> RouteData: """Create RouteData for tests.""" return RouteData( - route_id=route_id or uuid4(), + route_id=ReplicaID(route_id) if route_id is not None else ReplicaID(uuid4()), deployment_id=deployment_id or DeploymentID(uuid4()), session_id=session_id, status=status, diff --git a/tests/unit/manager/sokovan/deployment/route/executor/test_check_route_health_register.py b/tests/unit/manager/sokovan/deployment/route/executor/test_check_route_health_register.py index ee115f8b725..c05810bf3ae 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/test_check_route_health_register.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/test_check_route_health_register.py @@ -18,6 +18,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -32,7 +33,7 @@ def _route(health_status: RouteHealthStatus) -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=SessionId(uuid4()), status=RouteStatus.RUNNING, diff --git a/tests/unit/manager/sokovan/deployment/route/executor/test_initial_delay.py b/tests/unit/manager/sokovan/deployment/route/executor/test_initial_delay.py index 561d2be5aca..27dee701f0e 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/test_initial_delay.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/test_initial_delay.py @@ -21,13 +21,14 @@ from ai.backend.common.config import ModelHealthCheck from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, RouteStatus, RouteTrafficStatus, ) -from ai.backend.manager.repositories.deployment.types import RouteData +from ai.backend.manager.repositories.deployment.types import RouteData, RouteSessionKernelInfo from ai.backend.manager.sokovan.deployment.route.executor import RouteExecutor from ai.backend.manager.sokovan.deployment.route.handlers.observer.health_check import ( RouteHealthObserver, @@ -39,7 +40,7 @@ def _make_route( session_id: SessionId | None = None, ) -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=session_id or SessionId(uuid4()), status=RouteStatus.RUNNING, @@ -86,7 +87,7 @@ async def test_running_at_present_uses_running_at( await route_executor._initialize_health_records( [route], - {route.route_id: ("10.0.0.1", 8000)}, + {route.route_id: RouteSessionKernelInfo(replica_host="10.0.0.1", replica_port=8000)}, ) call_args = mock_valkey_schedule.initialize_route_health_records_batch.call_args @@ -117,7 +118,7 @@ async def test_running_at_none_falls_back_to_redis_time( await route_executor._initialize_health_records( [route], - {route.route_id: ("10.0.0.1", 8000)}, + {route.route_id: RouteSessionKernelInfo(replica_host="10.0.0.1", replica_port=8000)}, ) call_args = mock_valkey_schedule.initialize_route_health_records_batch.call_args @@ -153,7 +154,7 @@ async def test_created_at_expired_but_running_at_not_expired( await route_executor._initialize_health_records( [route], - {route.route_id: ("10.0.0.1", 8000)}, + {route.route_id: RouteSessionKernelInfo(replica_host="10.0.0.1", replica_port=8000)}, ) call_args = mock_valkey_schedule.initialize_route_health_records_batch.call_args diff --git a/tests/unit/manager/sokovan/deployment/route/executor/test_register_unregister_routes.py b/tests/unit/manager/sokovan/deployment/route/executor/test_register_unregister_routes.py index 2ab0c106747..123cb2c432e 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/test_register_unregister_routes.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/test_register_unregister_routes.py @@ -34,6 +34,7 @@ ) from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -53,7 +54,7 @@ def _route_with_replica( session_id: SessionId | None = None, ) -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=endpoint_id, session_id=session_id if session_id is not None else SessionId(uuid4()), status=RouteStatus.RUNNING, diff --git a/tests/unit/manager/sokovan/deployment/route/executor/test_route_executor.py b/tests/unit/manager/sokovan/deployment/route/executor/test_route_executor.py index 48ff19a0f8c..6b0f31bc3e1 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/test_route_executor.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/test_route_executor.py @@ -29,6 +29,7 @@ from ai.backend.common.dto.appproxy_coordinator.v2.endpoint.types import UpdatedRoutesItem from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -578,7 +579,7 @@ async def test_orphan_revision_route_marked_for_cleanup( current_revision_id = DeploymentRevisionID(uuid4()) deploying_revision_id = DeploymentRevisionID(uuid4()) orphan_route = RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=deployment_id, session_id=SessionId(uuid4()), status=RouteStatus.RUNNING, @@ -622,7 +623,7 @@ async def test_provisioning_route_for_deploying_revision_kept( deployment_id = DeploymentID(uuid4()) deploying_revision_id = DeploymentRevisionID(uuid4()) provisioning_route = RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=deployment_id, session_id=None, status=RouteStatus.PROVISIONING, @@ -666,7 +667,7 @@ async def test_orphan_check_skipped_when_no_known_revisions( """ deployment_id = DeploymentID(uuid4()) bootstrap_route = RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=deployment_id, session_id=SessionId(uuid4()), status=RouteStatus.RUNNING, @@ -920,7 +921,7 @@ async def test_multiple_routes_synced( def _route_for_endpoint(endpoint_id: DeploymentID) -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=endpoint_id, session_id=SessionId(uuid4()), status=RouteStatus.RUNNING, diff --git a/tests/unit/manager/sokovan/deployment/route/executor/test_terminate_routes_drain.py b/tests/unit/manager/sokovan/deployment/route/executor/test_terminate_routes_drain.py index 8c946a16cb5..100ca19f379 100644 --- a/tests/unit/manager/sokovan/deployment/route/executor/test_terminate_routes_drain.py +++ b/tests/unit/manager/sokovan/deployment/route/executor/test_terminate_routes_drain.py @@ -18,6 +18,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -35,7 +36,7 @@ def _terminating_route() -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=SessionId(uuid4()), status=RouteStatus.TERMINATING, @@ -153,7 +154,7 @@ async def test_routes_without_session_still_unregister( """ with_session = _terminating_route() without_session = RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=None, status=RouteStatus.TERMINATING, diff --git a/tests/unit/manager/sokovan/deployment/route/handlers/test_health_check_handler.py b/tests/unit/manager/sokovan/deployment/route/handlers/test_health_check_handler.py index 3bf5fb71d48..0602c89e260 100644 --- a/tests/unit/manager/sokovan/deployment/route/handlers/test_health_check_handler.py +++ b/tests/unit/manager/sokovan/deployment/route/handlers/test_health_check_handler.py @@ -17,6 +17,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -32,7 +33,7 @@ def _route(health_status: RouteHealthStatus) -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=SessionId(uuid4()), status=RouteStatus.RUNNING, diff --git a/tests/unit/manager/sokovan/deployment/route/handlers/test_terminating_handler.py b/tests/unit/manager/sokovan/deployment/route/handlers/test_terminating_handler.py index 66f455a9307..9ac3e535b82 100644 --- a/tests/unit/manager/sokovan/deployment/route/handlers/test_terminating_handler.py +++ b/tests/unit/manager/sokovan/deployment/route/handlers/test_terminating_handler.py @@ -16,6 +16,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.common.types import SessionId from ai.backend.manager.data.deployment.types import ( RouteHealthStatus, @@ -31,7 +32,7 @@ def _terminating_route() -> RouteData: return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=SessionId(uuid4()), status=RouteStatus.TERMINATING, diff --git a/tests/unit/manager/sokovan/deployment/route/test_coordinator_history.py b/tests/unit/manager/sokovan/deployment/route/test_coordinator_history.py index 74cdf80e5a6..59a05cceae1 100644 --- a/tests/unit/manager/sokovan/deployment/route/test_coordinator_history.py +++ b/tests/unit/manager/sokovan/deployment/route/test_coordinator_history.py @@ -12,6 +12,7 @@ from ai.backend.common.identifier.deployment import DeploymentID from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.identifier.replica import ReplicaID from ai.backend.manager.data.deployment.types import ( RouteHandlerCategory, RouteHealthStatus, @@ -44,7 +45,7 @@ def sample_route_data() -> RouteData: """Sample RouteData for testing.""" return RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=None, status=RouteStatus.PROVISIONING, @@ -389,7 +390,7 @@ async def test_records_history_on_stale( mock_deployment_repository.search_route_datas_with_last_history = AsyncMock( return_value=[ RouteData( - route_id=uuid4(), + route_id=ReplicaID(uuid4()), deployment_id=DeploymentID(uuid4()), session_id=None, status=RouteStatus.RUNNING,