Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Fixed `ibis.TableDataset` `exists` method to account for `database` (i.e. the collection of tables, or schema).
- `api.APIDataset` now stores the response received from a `PUT` or `POST` request via the `response_dataset` parameter.
- Added `autogen` mode to `OpikTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration.

## Community contributions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class OpikTraceDataset(AbstractDataset):
- `sdk`: Returns a simple namespace-like client exposing the `track` decorator for manual tracing.
- `openai`: Returns an OpenAI client automatically wrapped for Opik tracing.
- `langchain`: Returns an `OpikTracer` callback handler for LangChain integration.
- `autogen`: Returns a configured ``Tracer`` for AutoGen integration via OTLP.
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated

**Examples**

Expand Down Expand Up @@ -89,6 +90,27 @@ def multiply(x: int, y: int) -> int:
)
tracer = dataset.load()
# Use tracer in your LangChain Runnable or chain.run(callbacks=[tracer])

# Example: AutoGen mode (agent tracing via OpenTelemetry)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "autogen-demo",
"url_override": "https://www.comet.com",
},
mode="autogen",
)
tracer = dataset.load() # Returns configured Tracer, ready to use

# Option 1: Automatic tracing (LLM calls traced automatically)
agent.invoke(context) # Traces sent to Opik

# Option 2: Add custom spans with business context (recommended)
with tracer.start_as_current_span("response_generation") as span:
span.set_attribute("intent", "claim_new")
span.set_attribute("user_id", "123")
agent.invoke(context) # Child spans nested under "response_generation"
```

**Notes**
Expand All @@ -102,7 +124,7 @@ def multiply(x: int, y: int) -> int:
def __init__(
self,
credentials: dict[str, Any],
mode: Literal["sdk", "openai", "langchain"] = "sdk",
mode: Literal["sdk", "openai", "langchain", "autogen"] = "sdk",
**trace_kwargs: Any,
):
self._credentials = credentials
Expand All @@ -111,7 +133,9 @@ def __init__(
self._cached_client = None

self._validate_opik_credentials()
self._configure_opik()
# Use OTLP directly
if self._mode != "autogen":
self._configure_opik()

def _validate_opik_credentials(self) -> None:
"""Validate Opik credentials before configuring the environment."""
Expand All @@ -123,6 +147,13 @@ def _validate_opik_credentials(self) -> None:
if key in self._credentials and not str(self._credentials[key]).strip():
raise DatasetError(f"Optional Opik credential '{key}' cannot be empty if provided")

# AutoGen mode requires 'url_override' to construct the OTLP endpoint
if self._mode == "autogen" and not self._credentials.get("url_override"):
raise DatasetError(
"AutoGen mode requires 'url_override' in credentials (e.g. 'https://www.comet.com'). "
"This is needed to construct the OTLP endpoint for trace export."
)

def _configure_opik(self) -> None:
"""Initialize Opik global configuration with awareness of project switching.

Expand Down Expand Up @@ -178,6 +209,66 @@ def _build_openai_client_params(self) -> dict[str, str]:

return params

def _build_autogen_tracer(self) -> Any:
"""Build and return a configured Tracer for AutoGen integration with Opik.

Sets up OpenTelemetry TracerProvider with OTLP exporter to Opik,
configures it as the global provider, and returns a ready-to-use Tracer.

Returns:
Tracer configured to export traces to Opik.

Raises:
DatasetError: If required OpenTelemetry dependencies are not installed.
"""
try:
from opentelemetry import trace # noqa: PLC0415
from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # noqa: PLC0415
OTLPSpanExporter,
)
from opentelemetry.sdk.trace import TracerProvider # noqa: PLC0415
from opentelemetry.sdk.trace.export import ( # noqa: PLC0415
BatchSpanProcessor,
)
except ImportError as exc:
raise DatasetError(
"AutoGen mode requires OpenTelemetry. "
"Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
) from exc

# Build headers for Opik authentication
headers = {
"Authorization": self._credentials["api_key"],
"Comet-Workspace": self._credentials["workspace"],
}

# Add project name if specified
project_name = self._credentials.get("project_name")
if project_name:
headers["projectName"] = project_name

# Use Opik's OTLP endpoint (url_override validated in _validate_opik_credentials)
base_url = self._credentials["url_override"]
endpoint = f"{base_url}/opik/api/v1/private/otel/v1/traces"
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated

exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers
)

processor = BatchSpanProcessor(exporter)

# Use existing provider if already set, otherwise create a new one.
existing_provider = trace.get_tracer_provider()
if hasattr(existing_provider, "add_span_processor"):
existing_provider.add_span_processor(processor)
else:
provider = TracerProvider()
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

return trace.get_tracer("opik.autogen")

def _describe(self) -> dict[str, Any]:
"""Describe dataset configuration with credentials redacted."""
creds = self._credentials.copy()
Expand All @@ -200,6 +291,8 @@ def load(self) -> Any:
self._cached_client = self._load_openai_client()
elif self._mode == "langchain":
self._cached_client = self._load_langchain_tracer()
elif self._mode == "autogen":
self._cached_client = self._build_autogen_tracer()
else:
raise DatasetError(f"Unsupported mode '{self._mode}' for OpikTraceDataset")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import sys
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from kedro.io import DatasetError
Expand All @@ -13,6 +13,11 @@ def base_credentials():
return {"api_key": "test-key", "workspace": "test-workspace"} # pragma: allowlist secret


@pytest.fixture
def autogen_credentials(base_credentials):
return base_credentials | {"url_override": "https://www.comet.com"}


@patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure")
def test_init_with_valid_credentials(configure_mock, base_credentials):
"""Test that dataset initializes correctly with valid credentials and calls configure."""
Expand Down Expand Up @@ -79,17 +84,33 @@ def test_load_openai_client(openai_mock, track_openai_mock, configure_mock, base


@patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure")
def test_openai_missing_credentials_raises(configure_mock, base_credentials):
def test_openai_missing_credentials_raises(configure_mock, base_credentials, mocker):
"""Test that missing OpenAI API key raises DatasetError."""
# Mock openai and opik.integrations.openai to avoid real imports
mock_openai = MagicMock()
mock_opik_openai = MagicMock()
mocker.patch.dict("sys.modules", {
"openai": mock_openai,
"opik.integrations.openai": mock_opik_openai,
})

creds = base_credentials | {"openai": {}}
dataset = OpikTraceDataset(creds, mode="openai")
with pytest.raises(DatasetError, match="Missing or empty OpenAI API key"):
dataset.load()


@patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure")
def test_openai_missing_section_raises(configure_mock, base_credentials):
def test_openai_missing_section_raises(configure_mock, base_credentials, mocker):
"""Test that missing OpenAI section raises DatasetError."""
# Mock openai and opik.integrations.openai to avoid real imports
mock_openai = MagicMock()
mock_opik_openai = MagicMock()
mocker.patch.dict("sys.modules", {
"openai": mock_openai,
"opik.integrations.openai": mock_opik_openai,
})

dataset = OpikTraceDataset(base_credentials, mode="openai")
with pytest.raises(DatasetError, match="Missing 'openai' section in OpikTraceDataset credentials."):
dataset.load()
Expand Down Expand Up @@ -137,3 +158,99 @@ def test_save_not_implemented(configure_mock, base_credentials):
dataset = OpikTraceDataset(base_credentials)
with pytest.raises(DatasetError):
dataset.save("data")


# AutoGen mode tests
class TestOpikTraceDatasetAutogenMode:
"""Tests for AutoGen mode in OpikTraceDataset."""

def test_autogen_mode_returns_tracer(self, mocker, autogen_credentials):
"""Test AutoGen mode returns configured Tracer."""
mock_tracer = MagicMock()
mocker.patch(
"kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer",
return_value=mock_tracer
)

dataset = OpikTraceDataset(autogen_credentials, mode="autogen")

result = dataset.load()
assert result == mock_tracer

def test_autogen_mode_caching(self, mocker, autogen_credentials):
"""Test that AutoGen mode caches the tracer."""
mock_tracer = MagicMock()
build_tracer_mock = mocker.patch(
"kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer",
return_value=mock_tracer
)

dataset = OpikTraceDataset(autogen_credentials, mode="autogen")

# Call load twice
result1 = dataset.load()
result2 = dataset.load()

# Should only build tracer once due to caching
build_tracer_mock.assert_called_once()
assert result1 is result2

def test_autogen_mode_skips_opik_configure(self, mocker, autogen_credentials):
"""Test that AutoGen mode does not call Opik SDK configure."""
configure_mock = mocker.patch("kedro_datasets_experimental.opik.opik_trace_dataset.configure")

# Mock the tracer builder to avoid actual OpenTelemetry imports
mocker.patch(
"kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer",
return_value=MagicMock()
)

OpikTraceDataset(autogen_credentials, mode="autogen")

# configure should not be called for autogen mode
configure_mock.assert_not_called()

def test_autogen_mode_missing_url_override(self, base_credentials):
"""Test that autogen mode raises error when url_override is missing."""
with pytest.raises(DatasetError, match="AutoGen mode requires 'url_override'"):
OpikTraceDataset(base_credentials, mode="autogen")

def test_autogen_mode_import_error(self, mocker, autogen_credentials):
"""Test AutoGen mode raises DatasetError when OpenTelemetry not installed."""

def raise_import_error():
raise DatasetError(
"AutoGen mode requires OpenTelemetry. "
"Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
)

mocker.patch(
"kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer",
side_effect=raise_import_error
)

dataset = OpikTraceDataset(autogen_credentials, mode="autogen")

with pytest.raises(DatasetError, match="AutoGen mode requires OpenTelemetry"):
dataset.load()

def test_describe_autogen_mode(self, mocker, autogen_credentials):
"""Test _describe returns correct format for autogen mode."""
# Mock the tracer builder to avoid actual OpenTelemetry imports
mocker.patch(
"kedro_datasets_experimental.opik.opik_trace_dataset.OpikTraceDataset._build_autogen_tracer",
return_value=MagicMock()
)

dataset = OpikTraceDataset(autogen_credentials, mode="autogen")
desc = dataset._describe()
assert desc["mode"] == "autogen"
assert all(v == "***" for v in desc["credentials"].values())


@pytest.fixture(autouse=True)
def clean_env():
"""Clean up environment variables after each test."""
yield
if "OPIK_PROJECT_NAME" in os.environ:
del os.environ["OPIK_PROJECT_NAME"]
3 changes: 2 additions & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ langfuse-langfusetracedataset = ["langfuse>=2.0.0"]
langfuse = ["kedro-datasets[langfuse-langfusepromptdataset,langfuse-langfusetracedataset]", "openai>=2.3.0", "langchain>=0.2.0, <1.0"]
opik-opikpromptdataset = ["opik>=1.8.0"]
opik-opiktracedataset = ["opik>=1.8.0"]
opik = ["kedro-datasets[opik-opikpromptdataset, opik-opiktracedataset]", "openai>=2.3.0", "langchain>=0.2.0"]
opik-opiktracedataset-autogen = ["opik", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http"]
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated
opik = ["kedro-datasets[opik-opikpromptdataset, opik-opiktracedataset, opik-opiktracedataset-autogen]", "openai>=2.3.0", "langchain>=0.2.0"]
Comment thread
SajidAlamQB marked this conversation as resolved.

netcdf-netcdfdataset = ["h5netcdf>=1.2.0","netcdf4>=1.6.4","xarray>=2023.1.0"]
netcdf = ["kedro-datasets[netcdf-netcdfdataset]"]
Expand Down