Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 161 additions & 67 deletions kedro-datasets/kedro_datasets_experimental/opik/opik_trace_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,89 +20,112 @@ class OpikTraceDataset(AbstractDataset):

**Modes:**

- `sdk`: Returns a simple namespace-like client exposing the `track` decorator for manual tracing.
- `openai`: Returns an OpenAI client automatically wrapped for Opik tracing.
- `langchain`: Returns an `OpikTracer` callback handler for LangChain integration.
- ``sdk``: Returns a simple namespace-like client exposing the ``track`` decorator for manual tracing.
- ``openai``: Returns an OpenAI client automatically wrapped for Opik tracing.
- ``langchain``: Returns an ``OpikTracer`` callback handler for LangChain integration.
- ``autogen``: Returns a configured ``Tracer`` for AutoGen integration via OTLP.

**Examples**

Using catalog YAML configuration:
```yaml
opik_trace:
type: kedro_datasets_experimental.opik.OpikTraceDataset
credentials: opik_credentials
mode: openai
```

.. code-block:: yaml

opik_trace:
type: kedro_datasets_experimental.opik.OpikTraceDataset
credentials: opik_credentials
mode: openai

Using Python API:
```python
from kedro_datasets_experimental.opik import OpikTraceDataset

# Example: OpenAI mode (traced completions)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "kedro-demo",
"openai": {
"openai_api_key": "sk-...", # pragma: allowlist secret
"openai_api_base": "https://api.openai.com/v1",

.. code-block:: python

from kedro_datasets_experimental.opik import OpikTraceDataset

# Example: OpenAI mode (traced completions)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "kedro-demo",
"openai": {
"openai_api_key": "sk-...", # pragma: allowlist secret
"openai_api_base": "https://api.openai.com/v1",
},
},
},
mode="openai",
)
client = dataset.load()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Summarize Kedro in one sentence."},
],
)

# Example: SDK mode (manual tracing via decorator)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "kedro-sdk-demo",
},
mode="sdk",
)
client = dataset.load()


@client.track(name="demo_workflow")
def multiply(x: int, y: int) -> int:
return x * y


print(multiply(3, 4))

# Example: LangChain mode
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
},
mode="langchain",
)
tracer = dataset.load()
# Use tracer in your LangChain Runnable or chain.run(callbacks=[tracer])
```
mode="openai",
)
client = dataset.load()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Summarize Kedro in one sentence."},
],
)

# Example: SDK mode (manual tracing via decorator)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "kedro-sdk-demo",
},
mode="sdk",
)
client = dataset.load()


@client.track(name="demo_workflow")
def multiply(x: int, y: int) -> int:
return x * y


print(multiply(3, 4))

# Example: LangChain mode
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
},
mode="langchain",
)
tracer = dataset.load()
# Use tracer in your LangChain Runnable or chain.run(callbacks=[tracer])

# Example: AutoGen mode (agent tracing via OpenTelemetry)
dataset = OpikTraceDataset(
credentials={
"api_key": "opik_api_key", # pragma: allowlist secret
"workspace": "my-workspace",
"project_name": "autogen-demo",
},
mode="autogen",
)
tracer = dataset.load() # Returns configured Tracer, ready to use

# Option 1: Automatic tracing (LLM calls traced automatically)
agent.invoke(context) # Traces sent to Opik

# Option 2: Add custom spans with business context (recommended)
with tracer.start_as_current_span("response_generation") as span:
span.set_attribute("intent", "claim_new")
span.set_attribute("user_id", "123")
agent.invoke(context) # Child spans nested under "response_generation"

**Notes**

- Opik configuration is global within the Python process.
Using multiple `OpikTraceDataset` instances with different projects in the same session
Using multiple ``OpikTraceDataset`` instances with different projects in the same session
may cause all traces to log to the first configured project.
- To switch projects, restart the Python process or reload the Opik module.
"""

def __init__(
self,
credentials: dict[str, Any],
mode: Literal["sdk", "openai", "langchain"] = "sdk",
mode: Literal["sdk", "openai", "langchain", "autogen"] = "sdk",
**trace_kwargs: Any,
):
self._credentials = credentials
Expand All @@ -111,7 +134,9 @@ def __init__(
self._cached_client = None

self._validate_opik_credentials()
self._configure_opik()
# Skip Opik SDK configuration for autogen mode (uses OTLP directly)
if self._mode != "autogen":
self._configure_opik()

def _validate_opik_credentials(self) -> None:
"""Validate Opik credentials before configuring the environment."""
Expand Down Expand Up @@ -178,6 +203,62 @@ def _build_openai_client_params(self) -> dict[str, str]:

return params

def _build_autogen_tracer(self) -> Any:
"""Build and return a configured Tracer for AutoGen integration with Opik.

Sets up OpenTelemetry TracerProvider with OTLP exporter to Opik,
configures it as the global provider, and returns a ready-to-use Tracer.

Returns:
Tracer configured to export traces to Opik.

Raises:
DatasetError: If required OpenTelemetry dependencies are not installed.
"""
try:
from opentelemetry import trace # noqa: PLC0415
from opentelemetry.sdk.trace import TracerProvider # noqa: PLC0415
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: PLC0415
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: PLC0415
except ImportError as exc:
raise DatasetError(
"AutoGen mode requires OpenTelemetry. "
"Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
) from exc

# Build headers for Opik authentication
headers = {
"Authorization": self._credentials["api_key"],
"Comet-Workspace": self._credentials["workspace"],
}

# Add project name if specified
project_name = self._credentials.get("project_name")
if project_name:
headers["projectName"] = project_name

# Use Opik's OTLP endpoint (or custom url_override)
base_url = self._credentials.get("url_override", "https://www.comet.com")
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated
endpoint = f"{base_url}/opik/api/v1/private/otel/v1/traces"
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated

exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=headers
)

processor = BatchSpanProcessor(exporter)

# Use existing provider if already set, otherwise create a new one.
existing_provider = trace.get_tracer_provider()
if hasattr(existing_provider, "add_span_processor"):
existing_provider.add_span_processor(processor)
else:
provider = TracerProvider()
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

return trace.get_tracer("opik.autogen")

def _describe(self) -> dict[str, Any]:
"""Describe dataset configuration with credentials redacted."""
creds = self._credentials.copy()
Expand All @@ -190,7 +271,18 @@ def _describe(self) -> dict[str, Any]:
}

def load(self) -> Any:
"""Load the appropriate tracing client based on the configured mode."""
"""Load the appropriate tracing client based on the configured mode.

Returns:
Tracing client object based on mode:
- sdk mode: SDKClient wrapper exposing the track decorator
- openai mode: Wrapped OpenAI client with automatic tracing
- langchain mode: OpikTracer callback handler
- autogen mode: Configured Tracer for OpenTelemetry integration

Raises:
DatasetError: If mode-specific dependencies are missing or credentials are invalid.
"""
if self._cached_client is not None:
return self._cached_client

Expand All @@ -200,6 +292,8 @@ def load(self) -> Any:
self._cached_client = self._load_openai_client()
elif self._mode == "langchain":
self._cached_client = self._load_langchain_tracer()
elif self._mode == "autogen":
self._cached_client = self._build_autogen_tracer()
else:
raise DatasetError(f"Unsupported mode '{self._mode}' for OpikTraceDataset")

Expand Down
Loading
Loading