Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions kedro-agentic-workflows/conf/base/genai-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ intent_prompt_langfuse:
intent_tracer_langfuse:
type: kedro_datasets_experimental.langfuse.LangfuseTraceDataset
credentials: langfuse_credentials
mode: langchain # langchain | openai | sdk
mode: langchain # langchain | openai | sdk | autogen

# intent_tracer_opik:
# type: kedro_datasets_experimental.opik.OpikTraceDataset
# credentials: opik_credentials
# mode: openai # langchain | openai | sdk
# mode: openai # langchain | openai | sdk | autogen

autogen_tracer_langfuse:
type: kedro_datasets_experimental.langfuse.LangfuseTraceDataset
credentials: langfuse_credentials
mode: autogen
2 changes: 1 addition & 1 deletion kedro-agentic-workflows/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ipython>=8.10
jupyterlab>=3.0
notebook
kedro~=1.0.0
kedro-datasets[pandas-sqltabledataset,pandas-sqlquerydataset,yaml-yamldataset,opik,langchain-chatopenaidataset, langfuse]~=9.0.0
kedro-datasets[pandas-sqltabledataset,pandas-sqlquerydataset,yaml-yamldataset,opik,langchain-chatopenaidataset, langfuse, langfuse-langfusetracedataset-autogen]~=9.0.0
kedro-viz>=12.2.0
langgraph~=1.0.0
scikit-learn~=1.7.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from datetime import datetime
import logging
from typing import Any

from kedro.pipeline import LLMContext
from langchain_core.messages import AIMessage
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from sqlalchemy import text, Engine

from .agent import ResponseGenerationAgentAutogen
Expand All @@ -11,16 +14,40 @@
logger = logging.getLogger(__name__)


def setup_autogen_tracing(span_processor: Any) -> TracerProvider:
"""Set up OpenTelemetry tracing with Langfuse for AutoGen.

This function configures OpenTelemetry to send traces to Langfuse.
Must be called before running any AutoGen agents.

Args:
span_processor: LangfuseSpanProcessor from the LangfuseTraceDataset.

Returns:
TracerProvider: The configured tracer provider.
"""
provider = TracerProvider()
provider.add_span_processor(span_processor)
trace.set_tracer_provider(provider)
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated

logger.info("Langfuse tracing configured for AutoGen agents")
return provider


def generate_response(
response_generation_context: LLMContext,
intent_detection_result: dict,
user_context: dict,
session_config: dict,
tracer_provider: TracerProvider,
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated
) -> dict:
"""
Run the ResponseGenerationAgent to produce a final answer.
Accepts intent detection result + user context and session config.
"""
# Get a tracer for creating spans
tracer = trace.get_tracer(__name__)
Comment thread
SajidAlamQB marked this conversation as resolved.
Outdated

if intent_detection_result["intent"] == "clarification_needed":
message = (
"Failed to recognize intent. Please try to describe your problem briefly."
Expand All @@ -30,17 +57,37 @@ def generate_response(
result = {"messages": [AIMessage(content=message)]}

else:
agent = ResponseGenerationAgentAutogen(context=response_generation_context)
agent.compile()
# Wrap agent execution in a span for tracing
with tracer.start_as_current_span("response_generation") as span:
# Log input context to the span
span.set_attribute("intent", intent_detection_result["intent"])
span.set_attribute(
"intent_reason", intent_detection_result.get("reason", "")
)
span.set_attribute(
"user_id",
user_context.get("profile", {}).get("user_id", "unknown"),
)

agent = ResponseGenerationAgentAutogen(context=response_generation_context)
agent.compile()

context = {
"messages": [],
"intent": intent_detection_result["intent"],
"intent_generator_summary": intent_detection_result["reason"],
"user_context": user_context,
}

context = {
"messages": [],
"intent": intent_detection_result["intent"],
"intent_generator_summary": intent_detection_result["reason"],
"user_context": user_context,
}
result = agent.invoke(context, session_config)

result = agent.invoke(context, session_config)
# Log output to the span
if result.get("messages"):
span.set_attribute(
"response", result["messages"][-1].content[:500]
) # Truncate for safety
span.set_attribute("claim_created", result.get("claim_created", False))
span.set_attribute("escalated", result.get("escalated", False))

for m in result["messages"]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
from .nodes import (
generate_response,
log_response_and_end_session,
setup_autogen_tracing,
)
from .tools import build_lookup_docs, build_get_user_claims, build_create_claim


def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=setup_autogen_tracing,
inputs="autogen_tracer_langfuse",
outputs="tracer_provider",
name="setup_tracing_node",
),
llm_context_node(
name="response_agent_context_node",
outputs="response_generation_context",
Expand All @@ -28,6 +35,7 @@ def create_pipeline(**kwargs) -> Pipeline:
"intent_detection_result",
"user_context",
"session_config",
"tracer_provider",
],
outputs="final_response",
name="generate_response_node",
Expand Down
Loading