Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/twinkle/server/gateway/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def build_server_app(deploy_options: dict[str, Any],
async def verify_token(request: Request, call_next):
return await verify_request_token(request=request, call_next=call_next)

from twinkle.server.utils.metrics import create_metrics_middleware
app.middleware('http')(create_metrics_middleware('Gateway'))

def get_self() -> GatewayServer:
return serve.get_replica_context().servable_object

Expand Down
14 changes: 14 additions & 0 deletions src/twinkle/server/gateway/twinkle_gateway_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,17 @@ async def get_checkpoint_path(request: Request, run_id: str, checkpoint_id: str)

ckpt_dir = checkpoint_manager.get_ckpt_dir(run_id, checkpoint_id)
return types.CheckpointPathResponse(path=str(ckpt_dir), twinkle_path=checkpoint.twinkle_path)

@app.get('/twinkle/status')
async def status(
request: Request,
self: GatewayServer = Depends(self_fn),
) -> dict:
cleanup_stats = await self.state.get_cleanup_stats()
return {
'resources': cleanup_stats['resource_counts'],
'cleanup': {
'running': cleanup_stats['cleanup_running'],
'expiration_timeout': cleanup_stats['expiration_timeout'],
},
}
7 changes: 6 additions & 1 deletion src/twinkle/server/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ def _init_ray(self) -> None:
# Use runtime_env to apply patches in worker processes
# This is required because Ray Serve's ProxyActor runs in separate processes
runtime_env = get_runtime_env_for_patches()
ray.init(namespace=namespace, runtime_env=runtime_env)
# Connect to existing cluster if available, otherwise start local instance
ray.init(
address='auto',
namespace=namespace,
runtime_env=runtime_env,
)
logger.info(f'Ray initialized with namespace={namespace}')
Comment on lines +131 to 137
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray.init(address='auto', ...) does not "start local instance" when no cluster is running; it raises an error if it can't connect. If this launcher is used in single-node mode, initialization will fail. Consider trying address='auto' first and falling back to a plain ray.init(namespace=..., runtime_env=...) (or making the address configurable) so local startup still works.

Suggested change
# Connect to existing cluster if available, otherwise start local instance
ray.init(
address='auto',
namespace=namespace,
runtime_env=runtime_env,
)
logger.info(f'Ray initialized with namespace={namespace}')
# Try to connect to an existing cluster first, and fall back to
# starting a local Ray instance when no cluster is available.
try:
ray.init(
address='auto',
namespace=namespace,
runtime_env=runtime_env,
)
logger.info(f'Connected to existing Ray cluster with namespace={namespace}')
except Exception as exc:
logger.info(
'Failed to connect to an existing Ray cluster with '
f"address='auto'; starting a local Ray instance instead: {exc}")
ray.init(
namespace=namespace,
runtime_env=runtime_env,
)
logger.info(f'Started local Ray instance with namespace={namespace}')

Copilot uses AI. Check for mistakes.

self._ray_initialized = True
Expand Down
5 changes: 4 additions & 1 deletion src/twinkle/server/model/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self,
self._replica_registered = False

# Initialize mixins
self._init_task_queue(TaskQueueConfig.from_dict(queue_config))
self._init_task_queue(TaskQueueConfig.from_dict(queue_config), deployment_name='Model')
self._init_adapter_manager(**adapter_config)
# Note: countdown task is started lazily in _ensure_sticky()

Expand Down Expand Up @@ -164,6 +164,9 @@ def build_model_app(model_id: str,
async def verify_token(request: Request, call_next):
return await verify_request_token(request=request, call_next=call_next)

from twinkle.server.utils.metrics import create_metrics_middleware
app.middleware('http')(create_metrics_middleware('Model'))

def get_self() -> ModelManagement:
return serve.get_replica_context().servable_object

Expand Down
3 changes: 3 additions & 0 deletions src/twinkle/server/processor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ def build_processor_app(ncpu_proc_per_node: int,
async def verify_token(request: Request, call_next):
return await verify_request_token(request=request, call_next=call_next)

from twinkle.server.utils.metrics import create_metrics_middleware
app.middleware('http')(create_metrics_middleware('Processor'))

def get_self() -> ProcessorManagement:
return serve.get_replica_context().servable_object

Expand Down
5 changes: 4 additions & 1 deletion src/twinkle/server/sampler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self,
self.state: ServerStateProxy = get_server_state()

# Initialize task queue mixin
self._init_task_queue(TaskQueueConfig.from_dict(queue_config))
self._init_task_queue(TaskQueueConfig.from_dict(queue_config), deployment_name='Sampler')

@serve.multiplexed(max_num_models_per_replica=5)
async def _sticky_entry(self, sticky_key: str):
Expand Down Expand Up @@ -135,6 +135,9 @@ def build_sampler_app(model_id: str,
async def verify_token(request: Request, call_next):
return await verify_request_token(request=request, call_next=call_next)

from twinkle.server.utils.metrics import create_metrics_middleware
app.middleware('http')(create_metrics_middleware('Sampler'))

def get_self() -> SamplerManagement:
return serve.get_replica_context().servable_object

Expand Down
237 changes: 237 additions & 0 deletions src/twinkle/server/utils/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
# Copyright (c) ModelScope Contributors. All rights reserved.
"""
Central metrics module for Twinkle server observability.

Provides ray.util.metrics instruments that feed both the Ray Dashboard
(port 8265) and Prometheus (via /api/prometheus).

All metric names use the ``twinkle_`` prefix. Metric instances are
cached per deployment to avoid duplicate registration.

Public entry-points:

* ``create_metrics_middleware(deployment)`` – FastAPI HTTP middleware
* ``get_task_metrics(deployment)`` – task-queue / rate-limit gauges
* ``get_resource_metrics()`` – ServerState resource gauges
"""
from __future__ import annotations

import time
from collections import namedtuple
from ray.util.metrics import Counter, Gauge, Histogram
from typing import Any, Callable

from twinkle.utils.logger import get_logger

logger = get_logger()

# ---------------------------------------------------------------------------
# Histogram bucket boundaries (seconds) – shared by all histograms
# ---------------------------------------------------------------------------
_HISTOGRAM_BOUNDARIES = [
0.01,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
30.0,
60.0,
120.0,
300.0,
]

# ---------------------------------------------------------------------------
# Lazy caches – populated on first call per deployment / globally
# ---------------------------------------------------------------------------
_task_metrics_cache: dict[str, Any] = {}
_resource_metrics_cache: dict[str, Any] | None = None
_request_metrics_cache: dict[str, Any] = {}

# ---------------------------------------------------------------------------
# Named tuples for structured metric access
# ---------------------------------------------------------------------------
TaskMetrics = namedtuple('TaskMetrics', [
'queue_depth',
'tasks_total',
'execution_seconds',
'queue_wait_seconds',
'rate_limit_rejections',
'rate_limiter_active_tokens',
])

ResourceMetrics = namedtuple('ResourceMetrics', [
'active_sessions',
'active_models',
'active_sampling_sessions',
'active_futures',
])

_RequestMetrics = namedtuple('_RequestMetrics', [
'requests_total',
'request_duration_seconds',
])

# ---------------------------------------------------------------------------
# A. Request-level metrics (FastAPI middleware)
# ---------------------------------------------------------------------------


def _get_request_metrics(deployment: str) -> _RequestMetrics:
"""Return (or create) per-deployment HTTP request metrics."""
if deployment in _request_metrics_cache:
return _request_metrics_cache[deployment]

metrics = _RequestMetrics(
requests_total=Counter(
'twinkle_requests_total',
description='Total HTTP requests.',
tag_keys=('deployment', 'method', 'status'),
),
request_duration_seconds=Histogram(
'twinkle_request_duration_seconds',
description='End-to-end HTTP request latency in seconds.',
boundaries=_HISTOGRAM_BOUNDARIES,
tag_keys=('deployment', 'method'),
),
)
_request_metrics_cache[deployment] = metrics
return metrics


def create_metrics_middleware(deployment: str) -> Callable:
"""Return a FastAPI ``http`` middleware that records request metrics.

Usage inside a ``build_*_app()`` function::

from twinkle.server.utils.metrics import create_metrics_middleware
metrics_mw = create_metrics_middleware("Model")
app.middleware('http')(metrics_mw)

Because FastAPI executes middleware in LIFO order, registering this
**after** ``verify_token`` means it wraps the outermost layer and
captures full end-to-end latency including auth.
"""

async def metrics_middleware(request: Any, call_next: Callable) -> Any:
start = time.monotonic()
response = await call_next(request)
elapsed = time.monotonic() - start
status = str(response.status_code)
method = request.url.path
m = _get_request_metrics(deployment)
m.requests_total.inc(tags={
'deployment': deployment,
'method': method,
'status': status,
})
m.request_duration_seconds.observe(
elapsed, tags={
'deployment': deployment,
'method': method,
})
return response

return metrics_middleware


# ---------------------------------------------------------------------------
# B. Task-queue metrics
# ---------------------------------------------------------------------------


def get_task_metrics(deployment: str) -> TaskMetrics:
"""Return (or create) per-deployment task-queue metrics.

Returns a :class:`TaskMetrics` namedtuple with:

- ``queue_depth`` – Gauge
- ``tasks_total`` – Counter
- ``execution_seconds`` – Histogram
- ``queue_wait_seconds`` – Histogram
- ``rate_limit_rejections`` – Counter
- ``rate_limiter_active_tokens`` – Gauge
"""
if deployment in _task_metrics_cache:
return _task_metrics_cache[deployment]

metrics = TaskMetrics(
queue_depth=Gauge(
'twinkle_task_queue_depth',
description='Current number of queued tasks.',
tag_keys=('deployment', ),
),
tasks_total=Counter(
'twinkle_tasks_total',
description='Total task completions.',
tag_keys=('deployment', 'task_type', 'status'),
),
execution_seconds=Histogram(
'twinkle_task_execution_seconds',
description='Pure task execution time in seconds.',
boundaries=_HISTOGRAM_BOUNDARIES,
tag_keys=('deployment', 'task_type'),
),
queue_wait_seconds=Histogram(
'twinkle_task_queue_wait_seconds',
description='Time from enqueue to execution start in seconds.',
boundaries=_HISTOGRAM_BOUNDARIES,
tag_keys=('deployment', 'task_type'),
),
rate_limit_rejections=Counter(
'twinkle_rate_limit_rejections_total',
description='Total rate-limit rejections.',
tag_keys=('deployment', ),
),
rate_limiter_active_tokens=Gauge(
'twinkle_rate_limiter_active_tokens',
description='Number of tokens tracked by the rate limiter.',
tag_keys=('deployment', ),
),
)
_task_metrics_cache[deployment] = metrics
return metrics


# ---------------------------------------------------------------------------
# D. Resource gauges (ServerState actor, updated every 15 s)
# ---------------------------------------------------------------------------


def get_resource_metrics() -> ResourceMetrics:
"""Return (or create) global resource gauge metrics.

Returns a :class:`ResourceMetrics` namedtuple with:

- ``active_sessions`` – Gauge
- ``active_models`` – Gauge
- ``active_sampling_sessions`` – Gauge
- ``active_futures`` – Gauge
"""
global _resource_metrics_cache
if _resource_metrics_cache is not None:
return _resource_metrics_cache

metrics = ResourceMetrics(
active_sessions=Gauge(
'twinkle_active_sessions',
description='Current active session count.',
),
active_models=Gauge(
'twinkle_active_models',
description='Current registered model count.',
),
active_sampling_sessions=Gauge(
'twinkle_active_sampling_sessions',
description='Current sampling session count.',
),
active_futures=Gauge(
'twinkle_active_futures',
description='Current future/request count.',
),
)
_resource_metrics_cache = metrics
return metrics
15 changes: 15 additions & 0 deletions src/twinkle/server/utils/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def __init__(
window_seconds: float = 1.0,
token_cleanup_multiplier: float = 10.0,
token_cleanup_interval: float = 60.0,
active_tokens_gauge=None,
deployment_name: str = '',
):
"""Initialize the rate limiter.

Expand All @@ -53,6 +55,8 @@ def __init__(
will be removed. Default is 10.0 (10x the window).
token_cleanup_interval: How often to run the cleanup task in seconds.
Default is 60.0 (every minute).
active_tokens_gauge: Optional ray.util.metrics Gauge for tracking active token count.
deployment_name: Deployment name for metrics labels.
"""
self.rps_limit = rps_limit
self.tps_limit = tps_limit
Expand All @@ -72,6 +76,10 @@ def __init__(
self._cleanup_task: asyncio.Task | None = None
self._cleanup_started = False

# Metrics gauge for active token count
self._active_tokens_gauge = active_tokens_gauge
self._deployment_name = deployment_name

def _cleanup_old_requests(self, token: str, current_time: float) -> None:
"""Remove requests outside the sliding window.

Expand Down Expand Up @@ -122,6 +130,10 @@ async def _cleanup_inactive_tokens(self) -> None:
logger.debug(f'[RateLimiter] Cleaned up {len(tokens_to_remove)} inactive tokens. '
f'Active tokens remaining: {len(self._token_requests)}')

if self._active_tokens_gauge is not None:
tags = {'deployment': self._deployment_name} if self._deployment_name else {}
self._active_tokens_gauge.set(len(self._token_requests), tags=tags)

except asyncio.CancelledError:
logger.debug('[RateLimiter] Cleanup task cancelled')
break
Expand Down Expand Up @@ -193,6 +205,9 @@ async def check_and_record(self, token: str, input_tokens: int) -> tuple[bool, s

# Record this request
self._token_requests[token].append((current_time, input_tokens))
if self._active_tokens_gauge is not None:
tags = {'deployment': self._deployment_name} if self._deployment_name else {}
self._active_tokens_gauge.set(len(self._token_requests), tags=tags)
return True, None

def get_stats(self, token: str) -> dict[str, Any]:
Expand Down
Loading
Loading