diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 25094d069..e3fb652d0 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -3,6 +3,7 @@ ## Bug fixes and other changes - Fixed `ibis.TableDataset` `exists` method to account for `database` (i.e. the collection of tables, or schema). +- Added `autogen` mode to `LangfuseTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration. - `api.APIDataset` now stores the response received from a `PUT` or `POST` request via the `response_dataset` parameter. - Added `autogen` mode to `OpikTraceDataset` for tracing AutoGen agent conversations with OpenTelemetry integration. diff --git a/kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py b/kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py index 934b0fbcb..996126bf4 100644 --- a/kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/langfuse/langfuse_trace_dataset.py @@ -4,6 +4,7 @@ from kedro.io import AbstractDataset, DatasetError REQUIRED_LANGFUSE_CREDENTIALS = {"public_key", "secret_key"} +REQUIRED_LANGFUSE_CREDENTIALS_AUTOGEN = {"endpoint"} OPTIONAL_LANGFUSE_CREDENTIALS = {"host"} @@ -18,6 +19,9 @@ class LangfuseTraceDataset(AbstractDataset): - **langchain:** Returns a `CallbackHandler` for LangChain integration. - **openai:** Returns a wrapped OpenAI client with automatic tracing. + - **autogen:** Returns a configured `Tracer` for AutoGen integration via OTLP. + Note: Langfuse's graph visualisation is in beta and may not render + complex multi-agent workflows correctly. - **sdk:** Returns a raw Langfuse client for manual tracing. Examples: @@ -59,13 +63,37 @@ class LangfuseTraceDataset(AbstractDataset): # Load tracing client client = dataset.load() response = client.chat.completions.create(...) # Automatically traced + + # AutoGen mode Langfuse cloud + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_...", + "secret_key": "sk_...", # pragma: allowlist secret + "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces", + }, + mode="autogen", + ) + tracer = dataset.load() + + # AutoGen mode self-hosted + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_...", + "secret_key": "sk_...", # pragma: allowlist secret + "host": "http://localhost:3000", + "endpoint": "http://localhost:3000/api/public/otel/v1/traces", + }, + mode="autogen", + ) + tracer = dataset.load() + # Use with AutoGen's runtime logging ``` """ def __init__( self, credentials: dict[str, Any], - mode: Literal["langchain", "openai", "sdk"] = "sdk", + mode: Literal["langchain", "openai", "autogen", "sdk"] = "sdk", **trace_kwargs: Any ): """Initialize LangfuseTraceDataset and configure environment variables. @@ -75,43 +103,66 @@ def __init__( during initialization for use by all tracing modes. Args: - credentials: Dictionary with Langfuse credentials. Required keys: - {public_key, secret_key}. Optional keys: {host} (defaults to - Langfuse cloud if not provided). For OpenAI mode, may also include - openai section with {openai_api_key, openai_api_base}. - mode: Tracing mode - "langchain", "openai", or "sdk" (default). + credentials: Dictionary with Langfuse credentials. Required: {public_key, secret_key}. + Optional: {host} (defaults to Langfuse cloud if not provided). + For autogen mode, {endpoint} is required — the full OTLP endpoint URL + (e.g. https://cloud.langfuse.com/api/public/otel/v1/traces). + For OpenAI mode, include openai section with {openai_api_key, openai_api_base}. + mode: Tracing mode - "langchain", "openai", "autogen", or "sdk" (default). **trace_kwargs: Additional kwargs passed to the tracing client. Raises: DatasetError: If required Langfuse credentials are missing or empty. Examples: - # Basic SDK mode (using default Langfuse cloud) - dataset = LangfuseTraceDataset( + >>> # Basic SDK mode (using default Langfuse cloud) + >>> dataset = LangfuseTraceDataset( ... credentials={"public_key": "pk_...", "secret_key": "sk_..."} # pragma: allowlist secret ... ) - # With custom host - dataset = LangfuseTraceDataset( + >>> # With custom host + >>> dataset = LangfuseTraceDataset( ... credentials={ - ... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret + ... "public_key": "pk_...", + ... "secret_key": "sk_...", # pragma: allowlist secret ... "host": "https://custom.langfuse.com" ... } ... ) - # OpenAI mode with API key - dataset = LangfuseTraceDataset( + >>> # OpenAI mode with API key + >>> dataset = LangfuseTraceDataset( ... credentials={ - ... "public_key": "pk_...", "secret_key": "sk_...", # pragma: allowlist secret - ... "openai": {"openai_api_key": "sk-...", "openai_api_base": "..."} # pragma: allowlist secret + ... "public_key": "pk_...", + ... "secret_key": "sk_...", # pragma: allowlist secret + ... "openai": {"openai_api_key": "sk-...", "openai_api_base": "..."} # pragma: allowlist secret ... }, ... mode="openai" ... ) + >>> # AutoGen mode cloud + >>> dataset = LangfuseTraceDataset( + ... credentials={ + ... "public_key": "pk_...", + ... "secret_key": "sk_...", # pragma: allowlist secret + ... "endpoint": "https://cloud.langfuse.com/api/public/otel/v1/traces", + ... }, + ... mode="autogen" + ... ) + + >>> # AutoGen mode self-hosted + >>> dataset = LangfuseTraceDataset( + ... credentials={ + ... "public_key": "pk_...", + ... "secret_key": "sk_...", # pragma: allowlist secret + ... "host": "http://localhost:3000", + ... "endpoint": "http://localhost:3000/api/public/otel/v1/traces", + ... }, + ... mode="autogen" + ... ) + Note: Sets LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_HOST - environment variables from the provided credentials. Also sets - OPENAI_API_KEY if provided for OpenAI mode compatibility. + environment variables from the provided credentials. """ self._credentials = credentials self._mode = mode @@ -150,6 +201,16 @@ def _validate_langfuse_credentials(self) -> None: if not self._credentials[key] or not str(self._credentials[key]).strip(): raise DatasetError(f"Langfuse credential '{key}' cannot be empty if provided") + # AutoGen mode has additional required credentials + if self._mode == "autogen": + for key in REQUIRED_LANGFUSE_CREDENTIALS_AUTOGEN: + if not self._credentials.get(key): + raise DatasetError( + f"AutoGen mode requires '{key}' in credentials " + f"(e.g. 'https://cloud.langfuse.com/api/public/otel/v1/traces'). " + f"Provide the full OTLP endpoint URL for trace export." + ) + def _describe(self) -> dict[str, Any]: """Return a description of the dataset for Kedro's internal use. @@ -203,6 +264,60 @@ def _build_openai_client_params(self) -> dict[str, str]: return client_params + def _build_autogen_tracer(self) -> Any: + """Build and return a configured Tracer for AutoGen integration with Langfuse. + + Sets up OpenTelemetry TracerProvider with OTLP exporter to Langfuse, + configures it as the global provider, and returns a ready-to-use Tracer. + + Returns: + Tracer configured to export traces to Langfuse. + + Raises: + DatasetError: If required OpenTelemetry dependencies are not installed. + """ + try: + from opentelemetry import trace # noqa: PLC0415 + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # noqa: PLC0415 + OTLPSpanExporter, + ) + from opentelemetry.sdk.trace import TracerProvider # noqa: PLC0415 + from opentelemetry.sdk.trace.export import ( # noqa: PLC0415 + BatchSpanProcessor, + ) + except ImportError as exc: + raise DatasetError( + "AutoGen mode requires OpenTelemetry. " + "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http" + ) from exc + + import base64 # noqa: PLC0415 + + auth = base64.b64encode( + f"{self._credentials['public_key']}:{self._credentials['secret_key']}".encode() + ).decode() + + # Endpoint is provided by user and validated in _validate_langfuse_credentials + endpoint = self._credentials["endpoint"] + + exporter = OTLPSpanExporter( + endpoint=endpoint, + headers={"Authorization": f"Basic {auth}"} + ) + + processor = BatchSpanProcessor(exporter) + + # Use existing provider if already set, otherwise create a new one. + existing_provider = trace.get_tracer_provider() + if hasattr(existing_provider, "add_span_processor"): + existing_provider.add_span_processor(processor) + else: + provider = TracerProvider() + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + return trace.get_tracer("langfuse.autogen") + def load(self) -> Any: """Load appropriate tracing client based on configured mode. @@ -214,10 +329,11 @@ def load(self) -> Any: Tracing client object based on mode: - langchain mode: CallbackHandler for LangChain integration - openai mode: Wrapped OpenAI client with automatic tracing + - autogen mode: Configured Tracer for OpenTelemetry integration - sdk mode: Raw Langfuse client for manual tracing Raises: - DatasetError: If OpenAI mode is used but OpenAI credentials are missing or invalid. + DatasetError: If mode-specific dependencies are missing or credentials are invalid. Examples: # LangChain mode @@ -230,6 +346,18 @@ def load(self) -> Any: client = dataset.load() response = client.chat.completions.create(model="gpt-4", messages=[...]) + # AutoGen mode + dataset = LangfuseTraceDataset(credentials=creds, mode="autogen") + tracer = dataset.load() # Returns configured Tracer + + # Option 1: Automatic tracing (LLM calls traced automatically) + agent.invoke(context) # Traces sent to Langfuse + + # Option 2: Add custom spans with context + with tracer.start_as_current_span("response_generation") as span: + span.set_attribute("intent", "claim_new") + agent.invoke(context) # Child spans nested under parent + # SDK mode dataset = LangfuseTraceDataset(credentials=creds, mode="sdk") langfuse = dataset.load() @@ -241,19 +369,21 @@ def load(self) -> Any: # Create and cache the appropriate client if self._mode == "langchain": - from langfuse.langchain import CallbackHandler # noqa PLC0415 - self._cached_client = CallbackHandler() + from langfuse.langchain import CallbackHandler # noqa: PLC0415 + self._cached_client = CallbackHandler(**self._trace_kwargs) elif self._mode == "openai": - from langfuse.openai import OpenAI # noqa PLC0415 + from langfuse.openai import OpenAI # noqa: PLC0415 client_params = self._build_openai_client_params() self._cached_client = OpenAI(**client_params) + elif self._mode == "autogen": + self._cached_client = self._build_autogen_tracer() else: try: - from langfuse import get_client # noqa PLC0415 + from langfuse import get_client # noqa: PLC0415 self._cached_client = get_client() except ImportError: - from langfuse import Langfuse # noqa PLC0415 - self._cached_client = Langfuse() + from langfuse import Langfuse # noqa: PLC0415 + self._cached_client = Langfuse(**self._trace_kwargs) return self._cached_client diff --git a/kedro-datasets/kedro_datasets_experimental/tests/langfuse/test_langfuse_trace_dataset.py b/kedro-datasets/kedro_datasets_experimental/tests/langfuse/test_langfuse_trace_dataset.py index a1866da28..959febec1 100644 --- a/kedro-datasets/kedro_datasets_experimental/tests/langfuse/test_langfuse_trace_dataset.py +++ b/kedro-datasets/kedro_datasets_experimental/tests/langfuse/test_langfuse_trace_dataset.py @@ -8,6 +8,8 @@ from kedro_datasets_experimental.langfuse import LangfuseTraceDataset +LANGFUSE_AUTOGEN_ENDPOINT = "https://cloud.langfuse.com/api/public/otel/v1/traces" + class TestLangfuseTraceDataset: def test_missing_credentials(self): @@ -168,10 +170,15 @@ def test_openai_mode(self, mocker): mock_openai_class.assert_called_once_with(api_key="sk-test") # pragma: allowlist secret assert result == mock_openai_instance - def test_openai_mode_missing_credentials(self): + def test_openai_mode_missing_credentials(self, mocker): """Test OpenAI mode raises error when OpenAI credentials missing.""" + mocker.patch.dict("os.environ", {}, clear=True) + + mock_openai_module = MagicMock() + mocker.patch.dict("sys.modules", {"langfuse.openai": mock_openai_module}) + dataset = LangfuseTraceDataset( - credentials={"public_key": "pk_test", "secret_key": "sk_test"}, # pragma: allowlist secret + credentials={"public_key": "pk_test", "secret_key": "sk_test"}, # pragma: allowlist secret mode="openai" ) @@ -187,3 +194,155 @@ def test_describe_method(self): description = dataset._describe() assert description == {"mode": "langchain", "credentials": "***"} + + # --- AutoGen mode tests --- + + def test_autogen_mode(self, mocker): + """Test AutoGen mode returns configured Tracer.""" + mocker.patch.dict("os.environ", {}, clear=True) + + mock_tracer = MagicMock() + mocker.patch( + "kedro_datasets_experimental.langfuse.langfuse_trace_dataset.LangfuseTraceDataset._build_autogen_tracer", + return_value=mock_tracer + ) + + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_test", + "secret_key": "sk_test", # pragma: allowlist secret + "endpoint": LANGFUSE_AUTOGEN_ENDPOINT, + }, + mode="autogen" + ) + + result = dataset.load() + assert result == mock_tracer + + def test_autogen_mode_caching(self, mocker): + """Test that AutoGen mode caches the tracer.""" + mocker.patch.dict("os.environ", {}, clear=True) + + mock_tracer = MagicMock() + build_tracer_mock = mocker.patch( + "kedro_datasets_experimental.langfuse.langfuse_trace_dataset.LangfuseTraceDataset._build_autogen_tracer", + return_value=mock_tracer + ) + + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_test", + "secret_key": "sk_test", # pragma: allowlist secret + "endpoint": LANGFUSE_AUTOGEN_ENDPOINT, + }, + mode="autogen" + ) + + # Call load twice + result1 = dataset.load() + result2 = dataset.load() + + # Should only build tracer once due to caching + build_tracer_mock.assert_called_once() + assert result1 is result2 + + def test_autogen_mode_sets_environment_variables(self, mocker): + """Test that AutoGen mode correctly sets Langfuse environment variables.""" + mocker.patch.dict("os.environ", {}, clear=True) + + # Mock the tracer builder to avoid actual OpenTelemetry imports + mocker.patch( + "kedro_datasets_experimental.langfuse.langfuse_trace_dataset.LangfuseTraceDataset._build_autogen_tracer", + return_value=MagicMock() + ) + + LangfuseTraceDataset( + credentials={ + "public_key": "pk_test_autogen", + "secret_key": "sk_test_autogen", # pragma: allowlist secret + "endpoint": LANGFUSE_AUTOGEN_ENDPOINT, + }, + mode="autogen" + ) + + assert os.environ["LANGFUSE_PUBLIC_KEY"] == "pk_test_autogen" + assert os.environ["LANGFUSE_SECRET_KEY"] == "sk_test_autogen" # pragma: allowlist secret + + def test_autogen_mode_missing_endpoint(self): + """Test that autogen mode raises error when endpoint is missing.""" + with pytest.raises(DatasetError, match="AutoGen mode requires 'endpoint'"): + LangfuseTraceDataset( + credentials={"public_key": "pk_test", "secret_key": "sk_test"}, # pragma: allowlist secret + mode="autogen" + ) + + def test_autogen_mode_empty_endpoint(self): + """Test that autogen mode raises error when endpoint is empty.""" + with pytest.raises(DatasetError, match="AutoGen mode requires 'endpoint'"): + LangfuseTraceDataset( + credentials={ + "public_key": "pk_test", + "secret_key": "sk_test", # pragma: allowlist secret + "endpoint": "", + }, + mode="autogen" + ) + + def test_autogen_mode_endpoint_not_required_for_other_modes(self): + """Test that endpoint is not required for non-autogen modes.""" + # Endpoint is only required for autogen mode + dataset = LangfuseTraceDataset( + credentials={"public_key": "pk_test", "secret_key": "sk_test"}, # pragma: allowlist secret + mode="sdk" + ) + assert dataset._mode == "sdk" + + def test_autogen_mode_import_error(self, mocker): + """Test AutoGen mode raises DatasetError when OpenTelemetry not installed.""" + mocker.patch.dict("os.environ", {}, clear=True) + + # Make the import fail by patching the method to raise ImportError + def raise_import_error(): + raise DatasetError( + "AutoGen mode requires OpenTelemetry. " + "Install with: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http" + ) + + mocker.patch( + "kedro_datasets_experimental.langfuse.langfuse_trace_dataset.LangfuseTraceDataset._build_autogen_tracer", + side_effect=raise_import_error + ) + + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_test", + "secret_key": "sk_test", # pragma: allowlist secret + "endpoint": LANGFUSE_AUTOGEN_ENDPOINT, + }, + mode="autogen" + ) + + with pytest.raises(DatasetError, match="AutoGen mode requires OpenTelemetry"): + dataset.load() + + def test_describe_method_autogen_mode(self, mocker): + """Test _describe returns correct format for autogen mode.""" + mocker.patch.dict("os.environ", {}, clear=True) + + # Mock the tracer builder to avoid actual OpenTelemetry imports + mocker.patch( + "kedro_datasets_experimental.langfuse.langfuse_trace_dataset.LangfuseTraceDataset._build_autogen_tracer", + return_value=MagicMock() + ) + + dataset = LangfuseTraceDataset( + credentials={ + "public_key": "pk_test", + "secret_key": "sk_test", # pragma: allowlist secret + "endpoint": LANGFUSE_AUTOGEN_ENDPOINT, + }, + mode="autogen" + ) + + description = dataset._describe() + assert description == {"mode": "autogen", "credentials": "***"} diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 736fc8f58..4b94bea34 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -215,7 +215,8 @@ langchain-langchainpromptdataset = ["langchain>=0.3.0"] langchain = ["kedro-datasets[langchain-chatopenaidataset,langchain-openaiembeddingsdataset,langchain-chatanthropicdataset,langchain-chatcoheredataset, langchain-langchainpromptdataset]"] langfuse-langfusepromptdataset = ["langfuse>=2.0.0"] langfuse-langfusetracedataset = ["langfuse>=2.0.0"] -langfuse = ["kedro-datasets[langfuse-langfusepromptdataset,langfuse-langfusetracedataset]", "openai>=2.3.0", "langchain>=0.2.0, <1.0"] +langfuse-langfusetracedataset-autogen = ["langfuse>=2.0.0", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http"] +langfuse = ["kedro-datasets[langfuse-langfusepromptdataset,langfuse-langfusetracedataset,langfuse-langfusetracedataset-autogen]", "openai>=2.3.0", "langchain>=0.2.0, <1.0"] opik-opikpromptdataset = ["opik>=1.8.0"] opik-opiktracedataset = ["opik>=1.8.0"] opik-opiktracedataset-autogen = ["opik>=1.8.0", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http"]