Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
769a5bc
add autogen support for langfuse trace
SajidAlamQB Jan 20, 2026
64346cf
Merge branch 'main' into feat/add-auto-gen-support-to-langfuse
SajidAlamQB Jan 23, 2026
90d0e4d
changes based on review
SajidAlamQB Jan 26, 2026
9dcde34
Update pyproject.toml
SajidAlamQB Jan 28, 2026
980b170
Update pyproject.toml
SajidAlamQB Jan 30, 2026
31bfa3a
Update pyproject.toml
SajidAlamQB Jan 30, 2026
96d8062
Update langfuse_trace_dataset.py
SajidAlamQB Jan 30, 2026
e472ca1
add to langfuse group
SajidAlamQB Jan 30, 2026
14269c1
update docstring
SajidAlamQB Jan 30, 2026
3ded6e4
Update test_langfuse_trace_dataset.py
SajidAlamQB Jan 30, 2026
b574b1f
Update langfuse_trace_dataset.py
SajidAlamQB Feb 2, 2026
7d08069
Update test_langfuse_trace_dataset.py
SajidAlamQB Feb 2, 2026
688f0a0
fix opentelemetry warning
SajidAlamQB Feb 3, 2026
e088349
add host to credentials for langfuse autogen
SajidAlamQB Feb 3, 2026
35ea57b
Merge branch 'main' into feat/add-auto-gen-support-to-langfuse
SajidAlamQB Feb 3, 2026
669ea55
Update langfuse_trace_dataset.py
SajidAlamQB Feb 4, 2026
fcecb67
lint and fix tests
SajidAlamQB Feb 4, 2026
58aa9eb
endpoint from user
SajidAlamQB Feb 4, 2026
56dc163
update docstring for self-hosted
SajidAlamQB Feb 4, 2026
49b2d07
Update langfuse_trace_dataset.py
SajidAlamQB Feb 5, 2026
aac306b
replace with openlit
SajidAlamQB Feb 5, 2026
94ffb36
lint
SajidAlamQB Feb 5, 2026
d2b61a7
fix tests
SajidAlamQB Feb 5, 2026
b40dfd5
fix ci
SajidAlamQB Feb 5, 2026
da65e0b
pin openlit <1.36.8
SajidAlamQB Feb 6, 2026
7507dc4
revert back to otlp
SajidAlamQB Feb 9, 2026
8611432
Merge branch 'main' into feat/add-auto-gen-support-to-langfuse
SajidAlamQB Feb 9, 2026
9127a97
Merge branch 'main' into feat/add-auto-gen-support-to-langfuse
SajidAlamQB Feb 10, 2026
511d1b4
docstring fix
SajidAlamQB Feb 10, 2026
8e71a2d
Merge branch 'feat/add-auto-gen-support-to-langfuse' of https://githu…
SajidAlamQB Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Bug fixes and other changes

- Fixed `ibis.TableDataset` `exists` method to account for `database` (i.e. the collection of tables, or schema).
- Added `autogen` mode to `LangfuseTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration.
- `api.APIDataset` now stores the response received from a `PUT` or `POST` request via the `response_dataset` parameter.
- Added `autogen` mode to `OpikTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from kedro.io import AbstractDataset, DatasetError

REQUIRED_LANGFUSE_CREDENTIALS = {"public_key", "secret_key"}
REQUIRED_LANGFUSE_CREDENTIALS_AUTOGEN = {"endpoint"}
OPTIONAL_LANGFUSE_CREDENTIALS = {"host"}


Expand All @@ -18,6 +19,9 @@ class LangfuseTraceDataset(AbstractDataset):

- **langchain:** Returns a `CallbackHandler` for LangChain integration.
- **openai:** Returns a wrapped OpenAI client with automatic tracing.
- **autogen:** Returns a configured `Tracer` for AutoGen integration via OTLP.
Note: Langfuse's graph visualisation is in beta and may not render
complex multi-agent workflows correctly.
- **sdk:** Returns a raw Langfuse client for manual tracing.

Examples:
Expand Down Expand Up @@ -59,13 +63,37 @@ class LangfuseTraceDataset(AbstractDataset):
# Load tracing client
client = dataset.load()
response = client.chat.completions.create(...) # Automatically traced

# AutoGen mode Langfuse cloud
dataset = LangfuseTraceDataset(
credentials={
"public_key": "pk_...",
"secret_key": "sk_...", # pragma: allowlist secret
"endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces",
},
mode="autogen",
)
tracer = dataset.load()

# AutoGen mode self-hosted
dataset = LangfuseTraceDataset(
credentials={
"public_key": "pk_...",
"secret_key": "sk_...", # pragma: allowlist secret
"host": "http://localhost:3000",
"endpoint": "http://localhost:3000/api/public/otel/v1/traces",
},
mode="autogen",
)
tracer = dataset.load()
# Use with AutoGen's runtime logging
```
"""

def __init__(
self,
credentials: dict[str, Any],
mode: Literal["langchain", "openai", "sdk"] = "sdk",
mode: Literal["langchain", "openai", "autogen", "sdk"] = "sdk",
**trace_kwargs: Any
):
"""Initialize LangfuseTraceDataset and configure environment variables.
Expand All @@ -77,9 +105,14 @@ def __init__(
Args:
credentials: Dictionary with Langfuse credentials. Required keys:
{public_key, secret_key}. Optional keys: {host} (defaults to
Langfuse cloud if not provided). For OpenAI mode, may also include
openai section with {openai_api_key, openai_api_base}.
mode: Tracing mode - "langchain", "openai", or "sdk" (default).
Langfuse cloud if not provided). For autogen mode, {endpoint} is
required — the full OTLP endpoint URL (e.g.
`https://cloud.langfuse.com/api/public/otel/v1/traces`).
For self-hosted Langfuse, provide `host` alongside `endpoint`
so that environment variables are configured correctly for all modes.
For OpenAI mode, may also include openai section with
{openai_api_key, openai_api_base}.
mode: Tracing mode - "langchain", "openai", "autogen", or "sdk" (default).
**trace_kwargs: Additional kwargs passed to the tracing client.

Raises:
Expand Down Expand Up @@ -108,6 +141,25 @@ def __init__(
... mode="openai"
... )

# AutoGen mode cloud
dataset = LangfuseTraceDataset(
... credentials={
... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret
... "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces",
... },
... mode="autogen"
... )

# AutoGen mode self-hosted
dataset = LangfuseTraceDataset(
... credentials={
... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret
... "host": "http://localhost:3000",
... "endpoint": "http://localhost:3000/api/public/otel/v1/traces",
... },
... mode="autogen"
... )

Note:
Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST
environment variables from the provided credentials. Also sets
Expand Down Expand Up @@ -150,6 +202,16 @@ def _validate_langfuse_credentials(self) -> None:
if not self._credentials[key] or not str(self._credentials[key]).strip():
raise DatasetError(f"Langfuse credential '{key}' cannot be empty if provided")

# AutoGen mode has additional required credentials
if self._mode == "autogen":
for key in REQUIRED_LANGFUSE_CREDENTIALS_AUTOGEN:
if not self._credentials.get(key):
raise DatasetError(
f"AutoGen mode requires '{key}' in credentials "
f"(e.g. 'https://cloud.langfuse.com/api/public/otel/v1/traces'). "
f"Provide the full OTLP endpoint URL for trace export."
)

def _describe(self) -> dict[str, Any]:
"""Return a description of the dataset for Kedro's internal use.

Expand Down Expand Up @@ -203,6 +265,60 @@ def _build_openai_client_params(self) -> dict[str, str]:

return client_params

def _build_autogen_tracer(self) -> Any:
"""Build and return a configured Tracer for AutoGen integration with Langfuse.

Sets up OpenTelemetry TracerProvider with OTLP exporter to Langfuse,
configures it as the global provider, and returns a ready-to-use Tracer.

Returns:
Tracer configured to export traces to Langfuse.

Raises:
DatasetError: If required OpenTelemetry dependencies are not installed.
"""
try:
from opentelemetry import trace # noqa: PLC0415
from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # noqa: PLC0415
OTLPSpanExporter,
)
from opentelemetry.sdk.trace import TracerProvider # noqa: PLC0415
from opentelemetry.sdk.trace.export import ( # noqa: PLC0415
BatchSpanProcessor,
)
except ImportError as exc:
raise DatasetError(
"AutoGen mode requires OpenTelemetry. "
"Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
) from exc

import base64 # noqa: PLC0415

auth = base64.b64encode(
f"{self._credentials['public_key']}:{self._credentials['secret_key']}".encode()
).decode()

# Endpoint is provided by user and validated in _validate_langfuse_credentials
endpoint = self._credentials["endpoint"]

exporter = OTLPSpanExporter(
endpoint=endpoint,
headers={"Authorization": f"Basic {auth}"}
)

processor = BatchSpanProcessor(exporter)

# Use existing provider if already set, otherwise create a new one.
existing_provider = trace.get_tracer_provider()
if hasattr(existing_provider, "add_span_processor"):
existing_provider.add_span_processor(processor)
else:
provider = TracerProvider()
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

return trace.get_tracer("langfuse.autogen")

def load(self) -> Any:
"""Load appropriate tracing client based on configured mode.

Expand All @@ -214,10 +330,11 @@ def load(self) -> Any:
Tracing client object based on mode:
- langchain mode: CallbackHandler for LangChain integration
- openai mode: Wrapped OpenAI client with automatic tracing
- autogen mode: Configured Tracer for OpenTelemetry integration
- sdk mode: Raw Langfuse client for manual tracing

Raises:
DatasetError: If OpenAI mode is used but OpenAI credentials are missing or invalid.
DatasetError: If mode-specific dependencies are missing or credentials are invalid.

Examples:
# LangChain mode
Expand All @@ -230,6 +347,18 @@ def load(self) -> Any:
client = dataset.load()
response = client.chat.completions.create(model="gpt-4", messages=[...])

# AutoGen mode
Comment thread
SajidAlamQB marked this conversation as resolved.
dataset = LangfuseTraceDataset(credentials=creds, mode="autogen")
tracer = dataset.load() # Returns configured Tracer

# Option 1: Automatic tracing (LLM calls traced automatically)
agent.invoke(context) # Traces sent to Langfuse

# Option 2: Add custom spans with context
with tracer.start_as_current_span("response_generation") as span:
span.set_attribute("intent", "claim_new")
agent.invoke(context) # Child spans nested under parent

# SDK mode
dataset = LangfuseTraceDataset(credentials=creds, mode="sdk")
langfuse = dataset.load()
Expand All @@ -241,19 +370,21 @@ def load(self) -> Any:

# Create and cache the appropriate client
if self._mode == "langchain":
from langfuse.langchain import CallbackHandler # noqa PLC0415
self._cached_client = CallbackHandler()
from langfuse.langchain import CallbackHandler # noqa: PLC0415
self._cached_client = CallbackHandler(**self._trace_kwargs)
Comment thread
SajidAlamQB marked this conversation as resolved.
elif self._mode == "openai":
from langfuse.openai import OpenAI # noqa PLC0415
from langfuse.openai import OpenAI # noqa: PLC0415
client_params = self._build_openai_client_params()
self._cached_client = OpenAI(**client_params)
elif self._mode == "autogen":
self._cached_client = self._build_autogen_tracer()
else:
try:
from langfuse import get_client # noqa PLC0415
from langfuse import get_client # noqa: PLC0415
self._cached_client = get_client()
except ImportError:
from langfuse import Langfuse # noqa PLC0415
self._cached_client = Langfuse()
from langfuse import Langfuse # noqa: PLC0415
self._cached_client = Langfuse(**self._trace_kwargs)

return self._cached_client

Expand Down
Loading