Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
http_app = create_streamable_http_app(
server=mcp,
streamable_http_path="/mcp",
auth=mcp.auth,
json_response=True,
stateless_http=True,
)
81 changes: 13 additions & 68 deletions packages/gg_api_core/src/gg_api_core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@

import httpx

from gg_api_core.host import is_self_hosted_instance
from gg_api_core.settings import get_settings

# Setup logger
logger = logging.getLogger(__name__)


class DownstreamUnauthorizedError(Exception):
"""Raised when the downstream GitGuardian API returns 401.

Bridged to an HTTP 401 + ``WWW-Authenticate`` response by middleware so
the MCP client can re-run the OAuth flow.
"""


class IncidentSeverity(str, Enum):
"""Enum for incident severity levels."""

Expand Down Expand Up @@ -277,10 +284,12 @@ def __init__(
self._token_info: Any | None = None

def _init_urls(self, gitguardian_url: str | None = None):
from .urls import derive_public_api_url

# Use provided raw URL or get from environment with default fallback
raw_url = gitguardian_url or get_settings().gitguardian_url

self.public_api_url = self._normalize_api_url(raw_url)
self.public_api_url = derive_public_api_url(raw_url)
logger.info(f"Using API URL: {self.public_api_url}")

# Extract the base URL for dashboard (needed for OAuth)
Expand All @@ -289,72 +298,6 @@ def _init_urls(self, gitguardian_url: str | None = None):
self.private_api_url = f"{self.dashboard_url}/api/v1"
logger.info(f"Using private API URL: {self.private_api_url}")

def _normalize_api_url(self, api_url: str) -> str:
"""
Normalize the API URL for different GitGuardian instance types.

Args:
api_url: Raw API URL or base URL

Returns:
str: Normalized API URL
"""
from urllib.parse import urlparse

# Strip trailing slashes
api_url = api_url.rstrip("/")

try:
parsed = urlparse(api_url)

# Special handling for localhost and 127.0.0.1 - always treat as self-hosted
# regardless of SAAS_HOSTNAMES list (used for local development)
is_localhost = parsed.netloc.startswith("localhost") or parsed.netloc.startswith("127.0.0.1")

# Check if this is a SaaS URL (dashboard or API)
if not is_localhost and not is_self_hosted_instance(api_url):
# Convert dashboard URLs to API URLs with /v1 suffix
if "dashboard" in parsed.netloc:
api_netloc = parsed.netloc.replace("dashboard", "api")
normalized_url = f"{parsed.scheme}://{api_netloc}/v1"
logger.debug(f"Normalized SaaS dashboard URL: {api_url} -> {normalized_url}")
return normalized_url
# For API URLs, ensure they have /v1 suffix
elif not parsed.path.endswith("/v1"):
normalized_url = f"{api_url}/v1"
logger.debug(f"Normalized SaaS API URL: {api_url} -> {normalized_url}")
return normalized_url
else:
logger.debug(f"SaaS API URL already has /v1: {api_url}")
return api_url

# Check if this already has the API path structure
path = parsed.path.lower()
if path.endswith("/v1") or path.endswith("/exposed/v1"):
logger.debug(f"API URL already has API path: {api_url}")
return api_url

# This appears to be a self-hosted base URL - append the API path
if not path or path == "/" or not path.startswith("/exposed"):
normalized_url = f"{api_url}/exposed/v1"
logger.info(f"Normalized self-hosted base URL: {api_url} -> {normalized_url}")
return normalized_url

# If it has /exposed but no /v1, append /v1
if path.startswith("/exposed") and not path.endswith("/v1"):
normalized_url = f"{api_url}/v1"
logger.info(f"Normalized self-hosted API URL: {api_url} -> {normalized_url}")
return normalized_url

# Default: return as-is
logger.debug(f"Using API URL as provided: {api_url}")
return api_url

except Exception as e:
logger.warning(f"Failed to parse API URL '{api_url}': {e}")
logger.warning("Using API URL as provided")
return api_url

def _get_dashboard_url(self) -> str:
"""
Get the GitGuardian dashboard URL by deriving it from the API URL.
Expand Down Expand Up @@ -603,6 +546,8 @@ async def _request(self, method: str, endpoint: str, **kwargs) -> Any:
logger.exception(f"HTTP error occurred: {e.response.status_code} - {e.response.reason_phrase}")
logger.debug(f"Error response content: {e.response.text}")
logger.debug(f"Failed URL: {url}")
if e.response.status_code == 401:
raise DownstreamUnauthorizedError(f"GitGuardian API returned 401 for {url}") from e
raise
except httpx.RequestError as e:
logger.exception(f"Request error occurred: {str(e)}")
Expand Down
96 changes: 60 additions & 36 deletions packages/gg_api_core/src/gg_api_core/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@

from fastmcp import FastMCP
from fastmcp.exceptions import ValidationError
from fastmcp.server.dependencies import get_http_headers
from fastmcp.server.dependencies import get_access_token
from fastmcp.server.middleware import Middleware
from fastmcp.tools import Tool

from gg_api_core.client import GitGuardianClient
from gg_api_core.client import DownstreamUnauthorizedError, GitGuardianClient
from gg_api_core.icons import get_gitguardian_icons
from gg_api_core.oauth_proxy_auth import (
PassThroughTokenVerifier,
create_oauth_proxy,
mark_downstream_unauthorized,
)
from gg_api_core.settings import get_settings
from gg_api_core.utils import get_client

Expand All @@ -31,6 +36,9 @@ class AuthenticationMode(Enum):
PERSONAL_ACCESS_TOKEN_ENV_VAR = "PERSONAL_ACCESS_TOKEN_ENV_VAR"
# Use per-request Authorization header
AUTHORIZATION_HEADER = "AUTHORIZATION_HEADER"
# Use FastMCP OAuthProxy : declares the MCP server as a Protected Resource (RFC 9728)
# whose Authorization Server is api.gitguardian.com
OAUTH_PROXY = "OAUTH_PROXY"


class CachedTokenInfoMixin:
Expand Down Expand Up @@ -92,6 +100,22 @@ async def get_token_info(self) -> dict[str, Any]:
return self._token_info


class DownstreamUnauthorizedMiddleware(Middleware):
"""Flag the request when a tool surfaces a downstream 401.

The exception still propagates so FastMCP serializes a JSON-RPC error
body for clients that ignore the HTTP status. The ASGI middleware
rewrites the status to 401 based on the flag set here.
"""

async def on_message(self, context, call_next):
try:
return await call_next(context)
except DownstreamUnauthorizedError:
mark_downstream_unauthorized()
raise


class ScopeFilteringMiddleware(Middleware):
"""Middleware to filter tools based on token scopes."""

Expand Down Expand Up @@ -143,6 +167,7 @@ def __init__(self, *args, default_scopes: list[str] | None = None, **kwargs):
self._tool_scopes: dict[str, set[str]] = {}

self.add_middleware(ScopeFilteringMiddleware(self))
self.add_middleware(DownstreamUnauthorizedMiddleware())

def clear_cache(self) -> None:
"""Clear cached data. Override in subclasses that cache."""
Expand Down Expand Up @@ -325,60 +350,59 @@ def get_personal_access_token(self) -> str:
return self.personal_access_token


class GitGuardianAuthorizationHeaderMCP(AbstractGitGuardianFastMCP):
"""GitGuardian MCP server using per-request Authorization header (HTTP/SSE mode)."""
class _BearerTokenMCP(AbstractGitGuardianFastMCP):
"""Base for modes where the bearer token is installed in the request scope.

authentication_mode = AuthenticationMode.AUTHORIZATION_HEADER
Requires an ``auth=`` provider that populates an ``AccessToken`` on the
request — :class:`PassThroughTokenVerifier` for raw header mode, or
:class:`GitGuardianOAuthThinProxy` for OAuth proxy mode. Both classes
below differ only in :attr:`authentication_mode`.
"""

def get_personal_access_token(self) -> str:
headers = get_http_headers(include={"authorization"})
if not headers:
raise ValidationError("No HTTP headers available - Authorization header required")

auth_header = headers.get("authorization")
if not auth_header:
raise ValidationError("Missing Authorization header")

token = self._default_extract_token(auth_header)
if not token:
raise ValidationError("Invalid Authorization header format")
access_token = get_access_token()
if not access_token:
raise ValidationError("No access token available - bearer authentication required")
return access_token.token

return token
async def get_token_info(self) -> dict[str, Any]:
return await self._fetch_token_info_from_api()

@staticmethod
def _default_extract_token(auth_header: str) -> str | None:
"""Extract token from Authorization header.

Supports formats:
- Bearer <token>
- Token <token>
- <token> (raw)
"""
auth_header = auth_header.strip()
class GitGuardianAuthorizationHeaderMCP(_BearerTokenMCP):
"""GitGuardian MCP server using per-request Authorization header (HTTP/SSE mode)."""

if auth_header.lower().startswith("bearer "):
return auth_header[7:].strip()
authentication_mode = AuthenticationMode.AUTHORIZATION_HEADER

if auth_header.lower().startswith("token "):
return auth_header[6:].strip()

if auth_header:
return auth_header
class GitGuardianOAuthProxyMCP(_BearerTokenMCP):
"""GitGuardian MCP server with thin OAuth proxy to the GG dashboard.

return None
Same-origin OAuth endpoints proxy auth requests to the GG dashboard.
The MCP client gets the real GG PAT directly as Bearer token.
"""

async def get_token_info(self) -> dict[str, Any]:
return await self._fetch_token_info_from_api()
authentication_mode = AuthenticationMode.OAUTH_PROXY


def get_mcp_server(*args, **kwargs) -> AbstractGitGuardianFastMCP:
kwargs.setdefault("icons", get_gitguardian_icons())

settings = get_settings()

if settings.is_oauth_proxy_enabled:
oauth_proxy = create_oauth_proxy(
base_url=settings.mcp_base_url,
gg_url=settings.gitguardian_url,
gg_api_url=settings.gitguardian_api_url,
advertised_scopes=settings.effective_scopes,
)
return GitGuardianOAuthProxyMCP(*args, auth=oauth_proxy, **kwargs)

if settings.is_oauth_enabled:
return GitGuardianLocalOAuthMCP(*args, **kwargs)

if personal_access_token := settings.gitguardian_personal_access_token:
return GitGuardianPATEnvMCP(*args, personal_access_token=personal_access_token, **kwargs)

return GitGuardianAuthorizationHeaderMCP(*args, **kwargs)
return GitGuardianAuthorizationHeaderMCP(*args, auth=PassThroughTokenVerifier(), **kwargs)
Loading
Loading