diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 53a794d64..25094d069 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -4,6 +4,7 @@ - Fixed `ibis.TableDataset` `exists` method to account for `database` (i.e. the collection of tables, or schema). - `api.APIDataset` now stores the response received from a `PUT` or `POST` request via the `response_dataset` parameter. +- Added `autogen` mode to `OpikTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration. ## Community contributions diff --git a/kedro-datasets/kedro_datasets_experimental/opik/opik_trace_dataset.py b/kedro-datasets/kedro_datasets_experimental/opik/opik_trace_dataset.py index 5b3ab5570..a4398c6fa 100644 --- a/kedro-datasets/kedro_datasets_experimental/opik/opik_trace_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/opik/opik_trace_dataset.py @@ -8,6 +8,7 @@ logger = logging.getLogger(__name__) REQUIRED_OPIK_CREDENTIALS = {"api_key", "workspace"} +REQUIRED_OPIK_CREDENTIALS_AUTOGEN = {"endpoint"} OPTIONAL_OPIK_CREDENTIALS = {"project_name", "url_override"} @@ -23,6 +24,7 @@ class OpikTraceDataset(AbstractDataset): - `sdk`: Returns a simple namespace-like client exposing the `track` decorator for manual tracing. - `openai`: Returns an OpenAI client automatically wrapped for Opik tracing. - `langchain`: Returns an `OpikTracer` callback handler for LangChain integration. + - `autogen`: Returns a configured `Tracer` for AutoGen integration via OTLP (OpenTelemetry Protocol). **Examples** @@ -89,6 +91,40 @@ def multiply(x: int, y: int) -> int: ) tracer = dataset.load() # Use tracer in your LangChain Runnable or chain.run(callbacks=[tracer]) + + # Example: AutoGen mode Opik cloud + dataset = OpikTraceDataset( + credentials={ + "api_key": "opik_api_key", # pragma: allowlist secret + "workspace": "my-workspace", + "project_name": "autogen-demo", + "endpoint": "https://www.comet.com/opik/api/v1/private/otel/v1/traces", + }, + mode="autogen", + ) + tracer = dataset.load() # Returns configured Tracer, ready to use + + # Option 1: Automatic tracing (LLM calls traced automatically) + agent.invoke(context) # Traces sent to Opik + + # Option 2: Add custom spans with business context (recommended) + with tracer.start_as_current_span("response_generation") as span: + span.set_attribute("intent", "claim_new") + span.set_attribute("user_id", "123") + agent.invoke(context) # Child spans nested under "response_generation" + + # Example: AutoGen mode self-hosted + dataset = OpikTraceDataset( + credentials={ + "api_key": "opik_api_key", # pragma: allowlist secret + "workspace": "my-workspace", + "project_name": "autogen-demo", + "url_override": "http://localhost:5173", + "endpoint": "http://localhost:5173/opik/api/v1/private/otel/v1/traces", + }, + mode="autogen", + ) + tracer = dataset.load() ``` **Notes** @@ -102,7 +138,7 @@ def multiply(x: int, y: int) -> int: def __init__( self, credentials: dict[str, Any], - mode: Literal["sdk", "openai", "langchain"] = "sdk", + mode: Literal["sdk", "openai", "langchain", "autogen"] = "sdk", **trace_kwargs: Any, ): self._credentials = credentials @@ -111,7 +147,9 @@ def __init__( self._cached_client = None self._validate_opik_credentials() - self._configure_opik() + # Use OTLP directly + if self._mode != "autogen": + self._configure_opik() def _validate_opik_credentials(self) -> None: """Validate Opik credentials before configuring the environment.""" @@ -123,6 +161,16 @@ def _validate_opik_credentials(self) -> None: if key in self._credentials and not str(self._credentials[key]).strip(): raise DatasetError(f"Optional Opik credential '{key}' cannot be empty if provided") + # AutoGen mode has additional required credentials + if self._mode == "autogen": + for key in REQUIRED_OPIK_CREDENTIALS_AUTOGEN: + if not self._credentials.get(key): + raise DatasetError( + f"AutoGen mode requires '{key}' in credentials " + f"(e.g. 'https://www.comet.com/opik/api/v1/private/otel/v1/traces'). " + f"Provide the full OTLP endpoint URL for trace export." + ) + def _configure_opik(self) -> None: """Initialize Opik global configuration with awareness of project switching. @@ -178,6 +226,65 @@ def _build_openai_client_params(self) -> dict[str, str]: return params + def _build_autogen_tracer(self) -> Any: + """Build and return a configured Tracer for AutoGen integration with Opik. + + Sets up OpenTelemetry TracerProvider with OTLP exporter to Opik, + configures it as the global provider, and returns a ready-to-use Tracer. + + Returns: + Tracer configured to export traces to Opik. + + Raises: + DatasetError: If required OpenTelemetry dependencies are not installed. + """ + try: + from opentelemetry import trace # noqa: PLC0415 + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # noqa: PLC0415 + OTLPSpanExporter, + ) + from opentelemetry.sdk.trace import TracerProvider # noqa: PLC0415 + from opentelemetry.sdk.trace.export import ( # noqa: PLC0415 + BatchSpanProcessor, + ) + except ImportError as exc: + raise DatasetError( + "AutoGen mode requires OpenTelemetry. " + "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http" + ) from exc + + # Build headers for Opik authentication + headers = { + "Authorization": self._credentials["api_key"], + "Comet-Workspace": self._credentials["workspace"], + } + + # Add project name if specified + project_name = self._credentials.get("project_name") + if project_name: + headers["projectName"] = project_name + + # Endpoint is provided by user and validated in _validate_opik_credentials + endpoint = self._credentials["endpoint"] + + exporter = OTLPSpanExporter( + endpoint=endpoint, + headers=headers + ) + + processor = BatchSpanProcessor(exporter) + + # Use existing provider if already set, otherwise create a new one. + existing_provider = trace.get_tracer_provider() + if hasattr(existing_provider, "add_span_processor"): + existing_provider.add_span_processor(processor) + else: + provider = TracerProvider() + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + return trace.get_tracer("opik.autogen") + def _describe(self) -> dict[str, Any]: """Describe dataset configuration with credentials redacted.""" creds = self._credentials.copy() @@ -200,6 +307,8 @@ def load(self) -> Any: self._cached_client = self._load_openai_client() elif self._mode == "langchain": self._cached_client = self._load_langchain_tracer() + elif self._mode == "autogen": + self._cached_client = self._build_autogen_tracer() else: raise DatasetError(f"Unsupported mode '{self._mode}' for OpikTraceDataset") diff --git a/kedro-datasets/kedro_datasets_experimental/tests/opik/test_opik_trace_dataset.py b/kedro-datasets/kedro_datasets_experimental/tests/opik/test_opik_trace_dataset.py index b78bc2ae3..6c6d1ddfe 100644 --- a/kedro-datasets/kedro_datasets_experimental/tests/opik/test_opik_trace_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/tests/opik/test_opik_trace_dataset.py @@ -1,18 +1,25 @@ import os import sys -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from kedro.io import DatasetError from kedro_datasets_experimental.opik.opik_trace_dataset import OpikTraceDataset +OPIK_AUTOGEN_ENDPOINT = "https://www.comet.com/opik/api/v1/private/otel/v1/traces" + @pytest.fixture def base_credentials(): return {"api_key": "test-key", "workspace": "test-workspace"} # pragma: allowlist secret +@pytest.fixture +def autogen_credentials(base_credentials): + return base_credentials | {"endpoint": OPIK_AUTOGEN_ENDPOINT} + + @patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure") def test_init_with_valid_credentials(configure_mock, base_credentials): """Test that dataset initializes correctly with valid credentials and calls configure.""" @@ -79,8 +86,16 @@ def test_load_openai_client(openai_mock, track_openai_mock, configure_mock, base @patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure") -def test_openai_missing_credentials_raises(configure_mock, base_credentials): +def test_openai_missing_credentials_raises(configure_mock, base_credentials, mocker): """Test that missing OpenAI API key raises DatasetError.""" + # Mock openai and opik.integrations.openai to avoid real imports + mock_openai = MagicMock() + mock_opik_openai = MagicMock() + mocker.patch.dict("sys.modules", { + "openai": mock_openai, + "opik.integrations.openai": mock_opik_openai, + }) + creds = base_credentials | {"openai": {}} dataset = OpikTraceDataset(creds, mode="openai") with pytest.raises(DatasetError, match="Missing or empty OpenAI API key"): @@ -88,8 +103,16 @@ def test_openai_missing_credentials_raises(configure_mock, base_credentials): @patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure") -def test_openai_missing_section_raises(configure_mock, base_credentials): +def test_openai_missing_section_raises(configure_mock, base_credentials, mocker): """Test that missing OpenAI section raises DatasetError.""" + # Mock openai and opik.integrations.openai to avoid real imports + mock_openai = MagicMock() + mock_opik_openai = MagicMock() + mocker.patch.dict("sys.modules", { + "openai": mock_openai, + "opik.integrations.openai": mock_opik_openai, + }) + dataset = OpikTraceDataset(base_credentials, mode="openai") with pytest.raises(DatasetError, match="Missing 'openai' section in OpikTraceDataset credentials."): dataset.load() @@ -137,3 +160,113 @@ def test_save_not_implemented(configure_mock, base_credentials): dataset = OpikTraceDataset(base_credentials) with pytest.raises(DatasetError): dataset.save("data") + + +# AutoGen mode tests +class TestOpikTraceDatasetAutogenMode: + """Tests for AutoGen mode in OpikTraceDataset.""" + + def test_autogen_mode_returns_tracer(self, mocker, autogen_credentials): + """Test AutoGen mode returns configured Tracer.""" + mock_tracer = MagicMock() + mocker.patch( + "kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer", + return_value=mock_tracer + ) + + dataset = OpikTraceDataset(autogen_credentials, mode="autogen") + + result = dataset.load() + assert result == mock_tracer + + def test_autogen_mode_caching(self, mocker, autogen_credentials): + """Test that AutoGen mode caches the tracer.""" + mock_tracer = MagicMock() + build_tracer_mock = mocker.patch( + "kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer", + return_value=mock_tracer + ) + + dataset = OpikTraceDataset(autogen_credentials, mode="autogen") + + # Call load twice + result1 = dataset.load() + result2 = dataset.load() + + # Should only build tracer once due to caching + build_tracer_mock.assert_called_once() + assert result1 is result2 + + def test_autogen_mode_skips_opik_configure(self, mocker, autogen_credentials): + """Test that AutoGen mode does not call Opik SDK configure.""" + configure_mock = mocker.patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure") + + # Mock the tracer builder to avoid actual OpenTelemetry imports + mocker.patch( + "kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer", + return_value=MagicMock() + ) + + OpikTraceDataset(autogen_credentials, mode="autogen") + + # configure should not be called for autogen mode + configure_mock.assert_not_called() + + def test_autogen_mode_missing_endpoint(self, base_credentials): + """Test that autogen mode raises error when endpoint is missing.""" + with pytest.raises(DatasetError, match="AutoGen mode requires 'endpoint'"): + OpikTraceDataset(base_credentials, mode="autogen") + + def test_autogen_mode_empty_endpoint(self, base_credentials): + """Test that autogen mode raises error when endpoint is empty.""" + creds = base_credentials | {"endpoint": ""} + with pytest.raises(DatasetError, match="AutoGen mode requires 'endpoint'"): + OpikTraceDataset(creds, mode="autogen") + + def test_autogen_mode_endpoint_not_required_for_other_modes(self, mocker, base_credentials): + """Test that endpoint is not required for non-autogen modes.""" + mocker.patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure") + + # Endpoint is only required for autogen mode + dataset = OpikTraceDataset(base_credentials, mode="sdk") + assert dataset._mode == "sdk" + + def test_autogen_mode_import_error(self, mocker, autogen_credentials): + """Test AutoGen mode raises DatasetError when OpenTelemetry not installed.""" + + def raise_import_error(): + raise DatasetError( + "AutoGen mode requires OpenTelemetry. " + "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http" + ) + + mocker.patch( + "kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer", + side_effect=raise_import_error + ) + + dataset = OpikTraceDataset(autogen_credentials, mode="autogen") + + with pytest.raises(DatasetError, match="AutoGen mode requires OpenTelemetry"): + dataset.load() + + def test_describe_autogen_mode(self, mocker, autogen_credentials): + """Test _describe returns correct format for autogen mode.""" + # Mock the tracer builder to avoid actual OpenTelemetry imports + mocker.patch( + "kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer", + return_value=MagicMock() + ) + + dataset = OpikTraceDataset(autogen_credentials, mode="autogen") + desc = dataset._describe() + assert desc["mode"] == "autogen" + assert all(v == "***" for v in desc["credentials"].values()) + + +@pytest.fixture(autouse=True) +def clean_env(): + """Clean up environment variables after each test.""" + yield + if "OPIK_PROJECT_NAME" in os.environ: + del os.environ["OPIK_PROJECT_NAME"] diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 063019cad..405159d69 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -218,7 +218,8 @@ langfuse-langfusetracedataset = ["langfuse>=2.0.0"] langfuse = ["kedro-datasets[langfuse-langfusepromptdataset,langfuse-langfusetracedataset]", "openai>=2.3.0", "langchain>=0.2.0, <1.0"] opik-opikpromptdataset = ["opik>=1.8.0"] opik-opiktracedataset = ["opik>=1.8.0"] -opik = ["kedro-datasets[opik-opikpromptdataset, opik-opiktracedataset]", "openai>=2.3.0", "langchain>=0.2.0"] +opik-opiktracedataset-autogen = ["opik>=1.8.0", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http"] +opik = ["kedro-datasets[opik-opikpromptdataset, opik-opiktracedataset, opik-opiktracedataset-autogen]", "openai>=2.3.0", "langchain>=0.2.0"] netcdf-netcdfdataset = ["h5netcdf>=1.2.0","netcdf4>=1.6.4","xarray>=2023.1.0"] netcdf = ["kedro-datasets[netcdf-netcdfdataset]"]