Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/db/services/dialog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ def decorate_answer(answer):
return {"answer": think + answer, "reference": refs, "prompt": re.sub(r"\n", " \n", prompt), "created_at": time.time()}

if langfuse_tracer:
langfuse_generation = langfuse_tracer.start_generation(
langfuse_generation = langfuse_tracer.start_observation(as_type="generation",
trace_context=trace_context, name="chat", model=llm_model_config["llm_name"],
input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg}
)
Comment on lines 784 to 788
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether `decorate_answer` directly closes over `langfuse_generation`.
python - <<'PY'
import ast
from pathlib import Path

path = Path("api/db/services/dialog_service.py")
tree = ast.parse(path.read_text())

async_chat = next(
    node for node in tree.body
    if isinstance(node, ast.AsyncFunctionDef) and node.name == "async_chat"
)
decorate_answer = next(
    node for node in async_chat.body
    if isinstance(node, ast.FunctionDef) and node.name == "decorate_answer"
)

names = {n.id for n in ast.walk(decorate_answer) if isinstance(n, ast.Name)}
string_check = any(
    isinstance(n, ast.Constant) and n.value == "langfuse_generation"
    for n in ast.walk(decorate_answer)
)

print("direct_name_reference =", "langfuse_generation" in names)
print("string_check_present =", string_check)
PY

Repository: infiniflow/ragflow

Length of output: 118


🏁 Script executed:

#!/bin/bash
# First, verify the file exists and check the relevant lines
head -n 800 api/db/services/dialog_service.py | tail -n 50

Repository: infiniflow/ragflow

Length of output: 2836


🏁 Script executed:

#!/bin/bash
# Search for the cleanup path that checks langfuse_generation
rg -n "langfuse_generation" api/db/services/dialog_service.py -A 2 -B 2

Repository: infiniflow/ragflow

Length of output: 1006


Fix the langfuse observation finalization guard condition.

The cleanup path at line 771 uses "langfuse_generation" in locals() which may fail even though the variable is directly referenced on lines 774–775. In a nested function, variables from the parent scope don't reliably appear in locals() before they're first assigned locally. Change the guard to directly test the variable instead:

Suggested fix
     langfuse_tracer = None
+    langfuse_generation = None
     trace_context = {}
@@
-        if langfuse_tracer and "langfuse_generation" in locals():
+        if langfuse_generation is not None:
             langfuse_output = "\n" + re.sub(r"^.*?(### Query:.*)", r"\1", prompt, flags=re.DOTALL)
             langfuse_output = {"time_elapsed:": re.sub(r"\n", "  \n", langfuse_output), "created_at": time.time()}
             langfuse_generation.update(output=langfuse_output)
             langfuse_generation.end()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/db/services/dialog_service.py` around lines 779 - 783, The cleanup guard
should not rely on "langfuse_generation" appearing in locals(); initialize
langfuse_generation = None before the if langfuse_tracer block (where you call
langfuse_tracer.start_observation) and then in the finalization/cleanup use a
direct test like "if langfuse_generation is not None:" (or truthiness) to decide
whether to call finish/close on langfuse_generation; this ensures references to
langfuse_generation (created by start_observation) are safe in the nested
cleanup code.

Expand Down
24 changes: 12 additions & 12 deletions api/db/services/llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def bind_tools(self, toolcall_session, tools):

def encode(self, texts: list):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode", model=self.model_config["llm_name"], input={"texts": texts})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="encode", model=self.model_config["llm_name"], input={"texts": texts})

safe_texts = []
for text in texts:
Expand All @@ -119,7 +119,7 @@ def encode(self, texts: list):

def encode_queries(self, query: str):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode_queries", model=self.model_config["llm_name"], input={"query": query})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="encode_queries", model=self.model_config["llm_name"], input={"query": query})

emd, used_tokens = self.mdl.encode_queries(query)
if self.model_config["llm_factory"] == "Builtin":
Expand All @@ -135,7 +135,7 @@ def encode_queries(self, query: str):

def similarity(self, query: str, texts: list):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="similarity", model=self.model_config["llm_name"], input={"query": query, "texts": texts})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="similarity", model=self.model_config["llm_name"], input={"query": query, "texts": texts})

sim, used_tokens = self.mdl.similarity(query, texts)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -149,7 +149,7 @@ def similarity(self, query: str, texts: list):

def describe(self, image, max_tokens=300):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="describe", metadata={"model": self.model_config["llm_name"]})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="describe", metadata={"model": self.model_config["llm_name"]})

txt, used_tokens = self.mdl.describe(image)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -163,7 +163,7 @@ def describe(self, image, max_tokens=300):

def describe_with_prompt(self, image, prompt):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="describe_with_prompt", metadata={"model": self.model_config["llm_name"], "prompt": prompt})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="describe_with_prompt", metadata={"model": self.model_config["llm_name"], "prompt": prompt})

txt, used_tokens = self.mdl.describe_with_prompt(image, prompt)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -177,7 +177,7 @@ def describe_with_prompt(self, image, prompt):

def transcription(self, audio):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="transcription", metadata={"model": self.model_config["llm_name"]})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="transcription", metadata={"model": self.model_config["llm_name"]})

txt, used_tokens = self.mdl.transcription(audio)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -194,7 +194,7 @@ def stream_transcription(self, audio):
supports_stream = hasattr(mdl, "stream_transcription") and callable(getattr(mdl, "stream_transcription"))
if supports_stream:
if self.langfuse:
generation = self.langfuse.start_generation(
generation = self.langfuse.start_observation(as_type="generation",
trace_context=self.trace_context,
name="stream_transcription",
metadata={"model": self.model_config["llm_name"]},
Expand Down Expand Up @@ -228,7 +228,7 @@ def stream_transcription(self, audio):
return

if self.langfuse:
generation = self.langfuse.start_generation(
generation = self.langfuse.start_observation(as_type="generation",
trace_context=self.trace_context,
name="stream_transcription",
metadata={"model": self.model_config["llm_name"]},
Expand All @@ -253,7 +253,7 @@ def stream_transcription(self, audio):

def tts(self, text: str) -> Generator[bytes, None, None]:
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="tts", input={"text": text})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="tts", input={"text": text})

for chunk in self.mdl.tts(text):
if isinstance(chunk, int):
Expand Down Expand Up @@ -376,7 +376,7 @@ async def async_chat(self, system: str, history: list, gen_conf: dict = {}, **kw

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat", model=self.model_config["llm_name"], input={"system": system, "history": history})

chat_partial = partial(base_fn, system, history, gen_conf)
use_kwargs = self._clean_param(chat_partial, **kwargs)
Expand Down Expand Up @@ -417,7 +417,7 @@ async def async_chat_streamly(self, system: str, history: list, gen_conf: dict =

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})

if stream_fn:
chat_partial = partial(stream_fn, system, history, gen_conf)
Expand Down Expand Up @@ -460,7 +460,7 @@ async def async_chat_streamly_delta(self, system: str, history: list, gen_conf:

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})

if stream_fn:
chat_partial = partial(stream_fn, system, history, gen_conf)
Expand Down