Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 136 additions & 180 deletions app/core/clients/rate_limit_reset_credits.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
)
from app.core.clients.headers import build_chatgpt_auth_headers
from app.core.clients.http import lease_retry_client
from app.core.clients.usage import (
_retry_delay_seconds,
_safe_codex_json,
)
from app.core.config.settings import get_settings
from app.core.types import JsonObject
from app.core.upstream_proxy import ResolvedUpstreamRoute
Expand Down Expand Up @@ -125,39 +129,40 @@ async def fetch_reset_credits(

try:
if route is not None:
return await _fetch_reset_credits_via_codex(
data = await _fetch_reset_credits_via_codex(
url=url,
route=route,
headers=headers,
timeout_seconds=timeout_seconds or settings.usage_fetch_timeout_seconds,
retries=retries,
codex_client=codex_client,
)
async with lease_retry_client(client) as retry_client:
async with retry_client.request(
"GET",
url,
headers=headers,
timeout=timeout,
retry_options=retry_options,
) as resp:
data = await _safe_json(resp)
if resp.status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits fetch failed ({resp.status})"
logger.warning(
"Reset credits fetch failed request_id=%s status=%s code=%s message=%s",
get_request_id(),
resp.status,
code,
message,
)
raise ResetCreditFetchError(resp.status, message, code=code)
try:
return ResetCreditsResponse.model_validate(_success_payload(data))
except (ValueError, ValidationError) as exc:
logger.warning("Reset credits fetch invalid payload request_id=%s", get_request_id())
raise ResetCreditFetchError(502, "Invalid reset credits payload") from exc
else:
async with lease_retry_client(client) as retry_client:
async with retry_client.request(
"GET",
url,
headers=headers,
timeout=timeout,
retry_options=retry_options,
) as resp:
data = await _safe_json(resp)
if resp.status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits fetch failed ({resp.status})"
logger.warning(
"Reset credits fetch failed request_id=%s status=%s code=%s message=%s",
get_request_id(),
resp.status,
code,
message,
)
raise ResetCreditFetchError(resp.status, message, code=code)
try:
return ResetCreditsResponse.model_validate(_success_payload(data))
except (ValueError, ValidationError) as exc:
logger.warning("Reset credits fetch invalid payload request_id=%s", get_request_id())
raise ResetCreditFetchError(502, "Invalid reset credits payload") from exc
except (aiohttp.ClientError, asyncio.TimeoutError, CodexTransportError) as exc:
logger.warning("Reset credits fetch error request_id=%s error=%s", get_request_id(), exc)
raise ResetCreditFetchError(0, f"Reset credits fetch failed: {exc}") from exc
Expand All @@ -180,6 +185,8 @@ async def consume_reset_credit(
usage_base = base_url or settings.upstream_base_url
url = _consume_url(usage_base)
timeout = aiohttp.ClientTimeout(total=timeout_seconds or settings.usage_fetch_timeout_seconds)
# Consume is non-idempotent, so omitted max_retries must not inherit the
# fetch retry budget and risk replaying a successful upstream redemption.
retries = max_retries if max_retries is not None else 0
headers = build_chatgpt_auth_headers(
access_token,
Expand Down Expand Up @@ -237,123 +244,6 @@ async def consume_reset_credit(
raise ConsumeResetCreditError(0, f"Reset credits consume failed: {exc}") from exc


async def _fetch_reset_credits_via_codex(
*,
url: str,
route: ResolvedUpstreamRoute,
headers: dict[str, str],
timeout_seconds: float,
retries: int,
codex_client: CodexClient | None,
) -> ResetCreditsResponse:
attempts = max(1, retries + 1)
owns_codex_client = codex_client is None
active_codex_client = codex_client or CodexClient(create_codex_session())
try:
for attempt in range(attempts):
try:
data, status = await _request_json_via_codex(
active_codex_client,
"GET",
url,
route=route,
headers=headers,
timeout_seconds=timeout_seconds,
)
except CodexTransportError:
if attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
raise
if status in RETRYABLE_STATUS and attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
if status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits fetch failed ({status})"
raise ResetCreditFetchError(status, message, code=code)
try:
return ResetCreditsResponse.model_validate(_success_payload(data))
except (ValueError, ValidationError) as exc:
raise ResetCreditFetchError(502, "Invalid reset credits payload") from exc
finally:
if owns_codex_client:
close = getattr(active_codex_client, "close", None)
if callable(close):
await close()
raise RuntimeError("unreachable reset credits fetch retry state")


async def _consume_reset_credit_via_codex(
*,
url: str,
route: ResolvedUpstreamRoute,
headers: dict[str, str],
body: JsonObject,
timeout_seconds: float,
retries: int,
codex_client: CodexClient | None,
) -> ConsumeResetCreditResponse:
attempts = max(1, retries + 1)
owns_codex_client = codex_client is None
active_codex_client = codex_client or CodexClient(create_codex_session())
try:
for attempt in range(attempts):
try:
data, status = await _request_json_via_codex(
active_codex_client,
"POST",
url,
route=route,
headers=headers,
json_body=body,
timeout_seconds=timeout_seconds,
)
except CodexTransportError:
if attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
raise
if status in RETRYABLE_STATUS and attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
if status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits consume failed ({status})"
raise ConsumeResetCreditError(status, message, code=code)
try:
return ConsumeResetCreditResponse.model_validate(_success_payload(data))
except (ValueError, ValidationError) as exc:
raise ConsumeResetCreditError(502, "Invalid reset credits consume payload") from exc
finally:
if owns_codex_client:
close = getattr(active_codex_client, "close", None)
if callable(close):
await close()
raise RuntimeError("unreachable reset credits consume retry state")


async def _request_json_via_codex(
codex_client: CodexClient,
method: str,
url: str,
*,
route: ResolvedUpstreamRoute,
headers: dict[str, str],
timeout_seconds: float,
json_body: JsonObject | None = None,
) -> tuple[JsonObject, int]:
request_kwargs: dict[str, object] = {
"route": route,
"headers": headers,
"timeout": timeout_seconds,
}
if json_body is not None:
request_kwargs["json"] = json_body
resp = await codex_client.request(method, url, **request_kwargs)
return await _safe_codex_json(resp), _codex_response_status(resp)


def build_snapshot(response: ResetCreditsResponse) -> RateLimitResetCreditsSnapshot:
"""Project an upstream list response into the cached snapshot shape."""
nearest = _nearest_available_expires_at(response.credits)
Expand Down Expand Up @@ -382,41 +272,6 @@ def _consume_url(base_url: str) -> str:
return f"{_reset_credits_url(base_url)}/consume"


def _codex_response_status(response: object) -> int:
value = getattr(response, "status_code", getattr(response, "status", None))
if value is None:
return 0
return int(value)


async def _safe_codex_json(response: object) -> JsonObject:
try:
json_method = getattr(response, "json", None)
if callable(json_method):
data = json_method()
if asyncio.iscoroutine(data):
data = await data
return data if isinstance(data, dict) else {"error": {"message": str(data)}}
except Exception:
pass
content = getattr(response, "content", None)
if isinstance(content, bytes):
return {"error": {"message": content.decode("utf-8", errors="replace").strip()}}
if isinstance(content, str):
return {"error": {"message": content.strip()}}
text_method = getattr(response, "text", None)
if callable(text_method):
try:
text = text_method()
if asyncio.iscoroutine(text):
text = await text
if isinstance(text, str):
return {"error": {"message": text.strip()}}
except Exception:
pass
return {"error": {"message": ""}}


async def _safe_json(resp: aiohttp.ClientResponse) -> JsonObject:
try:
data = await resp.json(content_type=None)
Expand Down Expand Up @@ -478,5 +333,106 @@ def _retry_options(attempts: int) -> ExponentialRetry:
)


def _retry_delay_seconds(attempt: int) -> float:
return min(RETRY_MAX_TIMEOUT, RETRY_START_TIMEOUT * (2.0**attempt))
async def _fetch_reset_credits_via_codex(
*,
url: str,
route: ResolvedUpstreamRoute,
headers: dict[str, str],
timeout_seconds: float,
retries: int,
codex_client: CodexClient | None,
) -> JsonObject:
attempts = max(1, retries + 1)
owns_codex_client = codex_client is None
active_codex_client = codex_client or CodexClient(create_codex_session())
try:
for attempt in range(attempts):
try:
resp = await active_codex_client.request(
"GET",
url,
route=route,
headers=headers,
timeout=timeout_seconds,
)
except CodexTransportError:
if attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
raise

data = await _safe_codex_json(resp)
status = _codex_response_status(resp)
if status in RETRYABLE_STATUS and attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
if status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits fetch failed ({status})"
raise ResetCreditFetchError(status, message, code=code)
return data if isinstance(data, dict) else {"error": {"message": str(data)}}
finally:
if owns_codex_client:
close = getattr(active_codex_client, "close", None)
if callable(close):
await close()
raise RuntimeError("unreachable reset credits fetch retry state")


async def _consume_reset_credit_via_codex(
*,
url: str,
route: ResolvedUpstreamRoute,
headers: dict[str, str],
body: dict[str, str],
timeout_seconds: float,
retries: int,
codex_client: CodexClient | None,
) -> ConsumeResetCreditResponse:
attempts = max(1, retries + 1)
owns_codex_client = codex_client is None
active_codex_client = codex_client or CodexClient(create_codex_session())
try:
for attempt in range(attempts):
try:
resp = await active_codex_client.request(
"POST",
url,
route=route,
headers=headers,
json=body,
timeout=timeout_seconds,
)
except CodexTransportError:
if attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
raise

data = await _safe_codex_json(resp)
status = _codex_response_status(resp)
if status in RETRYABLE_STATUS and attempt < attempts - 1:
await asyncio.sleep(_retry_delay_seconds(attempt))
continue
if status >= 400:
code = _extract_error_code(data)
message = _extract_error_message(data) or f"Reset credits consume failed ({status})"
raise ConsumeResetCreditError(status, message, code=code)
try:
return ConsumeResetCreditResponse.model_validate(_success_payload(data))
except (ValueError, ValidationError) as exc:
logger.warning("Reset credits consume invalid payload request_id=%s", get_request_id())
raise ConsumeResetCreditError(502, "Invalid reset credits consume payload") from exc
finally:
if owns_codex_client:
close = getattr(active_codex_client, "close", None)
if callable(close):
await close()
raise RuntimeError("unreachable reset credits consume retry state")


def _codex_response_status(response: object) -> int:
value = getattr(response, "status_code", getattr(response, "status", None))
if value is None:
return 0
return int(value)
7 changes: 7 additions & 0 deletions app/core/usage/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class CreditsPayload(BaseModel):
balance: str | None = None


class RateLimitResetCreditsSummary(BaseModel):
model_config = ConfigDict(extra="ignore")

available_count: int | None = None


class AdditionalRateLimitPayload(BaseModel):
model_config = ConfigDict(extra="ignore")

Expand All @@ -44,4 +50,5 @@ class UsagePayload(BaseModel):
seat_type: str | None = None
rate_limit: RateLimitPayload | None = None
credits: CreditsPayload | None = None
rate_limit_reset_credits: RateLimitResetCreditsSummary | None = None
additional_rate_limits: list[AdditionalRateLimitPayload] | None = None
Loading