Skip to content
Merged
19 changes: 17 additions & 2 deletions api/apps/kb_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ def delete_kb_task():
if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]:
return get_error_data_result(message="Invalid task type")

# "wipe" controls whether the task's persisted artefacts (knowledge graph,
# raptor summaries, ...) are deleted from the doc store. Default is true
# to preserve the historical behaviour; pass wipe=false to cancel the
# running task while keeping prior progress so it can be resumed.
wipe_arg = (request.args.get("wipe", "true") or "true").strip().lower()
wipe = wipe_arg not in ("false", "0", "no", "off")
logging.info("unbind_task: kb=%s task_type=%s wipe=%s", kb_id, pipeline_task_type, wipe)

def cancel_task(task_id):
REDIS_CONN.set(f"{task_id}-cancel", "x")

Expand All @@ -828,13 +836,20 @@ def cancel_task(task_id):
task_id = kb.graphrag_task_id
kb_task_finish_at = "graphrag_task_finish_at"
cancel_task(task_id)
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id)
if wipe:
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation", "community_report"]}, search.index_name(kb.tenant_id), kb_id)
# Wiping the graph invalidates any phase-completion markers
# for resolution / community detection.
from rag.graphrag.phase_markers import clear_phase_markers
clear_phase_markers(kb_id)
logging.info("unbind_task: cleared GraphRAG artefacts and phase markers for kb=%s", kb_id)
case PipelineTaskType.RAPTOR:
kb_task_id_field = "raptor_task_id"
task_id = kb.raptor_task_id
kb_task_finish_at = "raptor_task_finish_at"
cancel_task(task_id)
settings.docStoreConn.delete({"raptor_kwd": ["raptor"]}, search.index_name(kb.tenant_id), kb_id)
if wipe:
settings.docStoreConn.delete({"raptor_kwd": ["raptor"]}, search.index_name(kb.tenant_id), kb_id)
case PipelineTaskType.MINDMAP:
kb_task_id_field = "mindmap_task_id"
task_id = kb.mindmap_task_id
Expand Down
6 changes: 5 additions & 1 deletion api/apps/services/dataset_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,12 @@ def delete_knowledge_graph(dataset_id: str, tenant_id: str):
return False, "No authorization."
_, kb = KnowledgebaseService.get_by_id(dataset_id)
from rag.nlp import search
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]},
from rag.graphrag.phase_markers import clear_phase_markers
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation", "community_report"]},
search.index_name(kb.tenant_id), dataset_id)
# Wiping the graph invalidates any phase-completion markers used to
# short-circuit resolution / community detection on resume.
clear_phase_markers(dataset_id)

return True, True

Expand Down
8 changes: 7 additions & 1 deletion rag/graphrag/entity_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,14 @@ async def limited_resolve_candidate(candidate_batch, result_set, result_lock):
connect_graph = nx.Graph()
connect_graph.add_edges_from(resolution_result)

# Merges must be serialized: _merge_graph_nodes mutates the shared
# nx.Graph (add_edge/remove_node) which is not thread-safe and can
# cause "dictionary keys changed during iteration" even with the
# neighbor snapshot, when two components share a node.
merge_semaphore = asyncio.Semaphore(1)

async def limited_merge_nodes(graph, nodes, change):
async with semaphore:
async with merge_semaphore:
await self._merge_graph_nodes(graph, nodes, change, task_id)
Comment thread
prpercival marked this conversation as resolved.
Outdated

tasks = []
Expand Down
9 changes: 8 additions & 1 deletion rag/graphrag/general/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: Gr
node1_attrs = graph.nodes[node1]
node0_attrs["description"] += f"{GRAPH_FIELD_SEP}{node1_attrs['description']}"
node0_attrs["source_id"] = sorted(set(node0_attrs["source_id"] + node1_attrs["source_id"]))
for neighbor in graph.neighbors(node1):
# Snapshot neighbors before mutation; otherwise networkx raises
# "dictionary keys changed during iteration" when concurrent merges
# or graph.add_edge/remove_node below touch the same adjacency dict.
for neighbor in list(graph.neighbors(node1)):
change.removed_edges.add(get_from_to(node1, neighbor))
if neighbor not in nodes_set:
edge1_attrs = graph.get_edge_data(node1, neighbor)
Expand All @@ -335,6 +338,10 @@ async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: Gr
graph.add_edge(nodes[0], neighbor, **edge0_attrs)
else:
graph.add_edge(nodes[0], neighbor, **edge1_attrs)
# Track the redirected neighbour so a later node1 in this
# merge that also points to it takes the merge branch
# above instead of overwriting the edge we just added.
node0_neighbors.add(neighbor)
graph.remove_node(node1)
node0_attrs["description"] = await self._handle_entity_relation_summary(nodes[0], node0_attrs["description"], task_id=task_id)
graph.nodes[nodes[0]].update(node0_attrs)
Expand Down
109 changes: 100 additions & 9 deletions rag/graphrag/general/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
from rag.graphrag.general.extractor import Extractor
from rag.graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt
from rag.graphrag.light.graph_extractor import GraphExtractor as LightKGExt
from rag.graphrag.phase_markers import (
PHASE_COMMUNITY,
PHASE_RESOLUTION,
clear_phase_markers,
has_phase_marker,
set_phase_marker,
)
from rag.graphrag.utils import (
GraphChange,
chunk_id,
Expand Down Expand Up @@ -354,8 +361,16 @@ async def build_one(doc_id: str):
raise TaskCanceledException(f"Task {row['id']} was cancelled")

ok_docs = [d for d in doc_ids if d in subgraphs]
if not ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs generated successfully, end.")
final_graph = None

# Determine whether the resolution/community phases still need to run on
# this KB. Markers from a prior task let us skip already-completed phases
# even when no new docs are merged this round (the resume path).
resolution_pending = with_resolution and not has_phase_marker(kb_id, PHASE_RESOLUTION)
community_pending = with_community and not has_phase_marker(kb_id, PHASE_COMMUNITY)

if not ok_docs and not resolution_pending and not community_pending:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs to merge and no phases pending, end.")
now = asyncio.get_running_loop().time()
return {"ok_docs": [], "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

Expand All @@ -369,7 +384,6 @@ async def build_one(doc_id: str):

try:
union_nodes: set = set()
final_graph = None

for doc_id in ok_docs:
sg = subgraphs[doc_id]
Expand All @@ -386,10 +400,17 @@ async def build_one(doc_id: str):
if new_graph is not None:
final_graph = new_graph

if final_graph is None:
if ok_docs and final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished (no in-memory graph returned).")
else:
elif ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.")
# New content was merged into the global graph; any prior
# resolution/community results are now stale and must be redone
# on this or a future run. Clear phase markers accordingly.
clear_phase_markers(kb_id)
resolution_pending = with_resolution
community_pending = with_community
callback(msg=f"[GraphRAG] kb:{kb_id} cleared phase markers after merge.")
finally:
kb_lock.release()

Expand All @@ -398,6 +419,11 @@ async def build_one(doc_id: str):
callback(msg=f"[GraphRAG] KB merge done in {now - start:.2f}s. ok={len(ok_docs)} / total={len(doc_ids)}")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

if not resolution_pending and not community_pending:
now = asyncio.get_running_loop().time()
callback(msg=f"[GraphRAG] kb:{kb_id} all requested phases already complete; nothing to do.")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled before resolution/community extraction.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
Expand All @@ -406,11 +432,26 @@ async def build_one(doc_id: str):
callback(msg=f"[GraphRAG] kb:{kb_id} post-merge lock acquired for resolution/community")

try:
# Resume path: no docs were merged this round but pending phases
# require the previously-persisted graph. Load it from the doc store.
if final_graph is None:
final_graph = await get_graph(tenant_id, kb_id)
if final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} no persisted graph found; cannot run resolution/community.")
now = asyncio.get_running_loop().time()
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
callback(msg=f"[GraphRAG] kb:{kb_id} loaded persisted graph for resume.")

subgraph_nodes = set()
for sg in subgraphs.values():
subgraph_nodes.update(set(sg.nodes()))
# On a pure-resume run (no new docs) the union of "newly added" nodes
# is empty, but resolution still needs *some* anchor set. Fall back to
# all graph nodes so candidate pairing actually finds something.
if not subgraph_nodes:
subgraph_nodes = set(final_graph.nodes())

if with_resolution:
if resolution_pending:
await resolve_entities(
final_graph,
subgraph_nodes,
Expand All @@ -422,8 +463,11 @@ async def build_one(doc_id: str):
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_RESOLUTION)
elif with_resolution:
callback(msg=f"[GraphRAG] kb:{kb_id} resolution already completed previously, skipping.")

if with_community:
if community_pending:
await extract_community(
final_graph,
tenant_id,
Expand All @@ -434,6 +478,9 @@ async def build_one(doc_id: str):
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_COMMUNITY)
elif with_community:
callback(msg=f"[GraphRAG] kb:{kb_id} community detection already completed previously, skipping.")
finally:
kb_lock.release()

Expand Down Expand Up @@ -632,8 +679,17 @@ async def extract_community(
"report": rep,
"evidences": "\n".join([f.get("explanation", "") for f in stru["findings"]]),
}
# Deterministic id derived from (kb_id, community title) so reruns of
# extract_community produce stable ids. Combined with insert-then-
# prune below, this means a crash mid-insert leaves the prior set of
# community reports intact -- never the partial-delete state the old
# delete-then-insert order produced.
chunk_payload_for_id = {
"content_with_weight": f"community_report::{stru['title']}",
"kb_id": kb_id,
}
chunk = {
"id": get_uuid(),
"id": chunk_id(chunk_payload_for_id),
"docnm_kwd": stru["title"],
"title_tks": rag_tokenizer.tokenize(stru["title"]),
"content_with_weight": json.dumps(obj, ensure_ascii=False),
Expand All @@ -649,14 +705,49 @@ async def extract_community(
chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(chunk["content_ltks"])
chunks.append(chunk)

await thread_pool_exec(settings.docStoreConn.delete,{"knowledge_graph_kwd": "community_report", "kb_id": kb_id},search.index_name(tenant_id),kb_id,)
new_ids: set[str] = {c["id"] for c in chunks}

# Snapshot existing community_report ids BEFORE inserting so we can
# delete exactly the stale set afterwards. If the search fails we fall
# back to the prior delete-everything-then-insert behaviour rather than
# leaving an inconsistent mix.
old_ids: list[str] = []
try:
existing_res = await thread_pool_exec(
settings.docStoreConn.search,
["id"], [], {"knowledge_graph_kwd": ["community_report"]}, [], OrderByExpr(),
0, 10000, search.index_name(tenant_id), [kb_id],
)
existing_fields = settings.docStoreConn.get_fields(existing_res, ["id"])
old_ids = list(existing_fields.keys())
except Exception:
logging.exception("Failed to enumerate existing community reports for kb %s; falling back to delete-then-insert.", kb_id)
await thread_pool_exec(settings.docStoreConn.delete, {"knowledge_graph_kwd": "community_report", "kb_id": kb_id}, search.index_name(tenant_id), kb_id)
old_ids = []

es_bulk_size = 4
for b in range(0, len(chunks), es_bulk_size):
doc_store_result = await thread_pool_exec(settings.docStoreConn.insert,chunks[b : b + es_bulk_size],search.index_name(tenant_id),kb_id,)
if doc_store_result:
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
raise Exception(error_message)

# Now that all new reports are persisted, prune stale rows. Anything in
# old_ids that is not also in new_ids is no longer current (community
# composition changed across runs). A failure here just leaves stale
# rows; the new rows are already in place.
stale_ids = [i for i in old_ids if i not in new_ids]
if stale_ids:
try:
await thread_pool_exec(
settings.docStoreConn.delete,
{"knowledge_graph_kwd": ["community_report"], "id": stale_ids},
search.index_name(tenant_id),
kb_id,
)
except Exception:
logging.exception("Failed to prune %d stale community reports for kb %s", len(stale_ids), kb_id)

if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled after community indexing.")
raise TaskCanceledException(f"Task {task_id} was cancelled")
Expand Down
85 changes: 85 additions & 0 deletions rag/graphrag/phase_markers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""GraphRAG phase-completion markers.

Markers let a re-run of GraphRAG skip phases that already completed in a
prior (possibly cancelled or crashed) task on the same KB.

Markers are stored in Redis under ``graphrag:phase:{kb_id}:{phase}`` with a
7-day TTL. They are intentionally KB-scoped (not task-scoped) so they
survive task cancellation and the creation of a new task on resume.

Invalidation rules (callers responsibility):
* ``clear_phase_markers`` is invoked by ``run_graphrag_for_kb`` whenever new
document content is merged into the global graph -- the merged graph has
changed, so prior resolution and community results are stale.
* ``clear_phase_markers`` is invoked by the unbind-task endpoint when the
caller asks to wipe the graph.
"""

from __future__ import annotations

import logging

from rag.utils.redis_conn import REDIS_CONN


PHASE_RESOLUTION = "resolution_done"
PHASE_COMMUNITY = "community_done"

ALL_PHASES = (PHASE_RESOLUTION, PHASE_COMMUNITY)

# 7 days is well above any expected single GraphRAG run on typical hardware
# and keeps stale markers self-pruning if invalidation paths are missed.
_DEFAULT_TTL_SECONDS = 7 * 24 * 3600


def _phase_key(kb_id: str, phase: str) -> str:
return f"graphrag:phase:{kb_id}:{phase}"


def has_phase_marker(kb_id: str, phase: str) -> bool:
"""Return True iff the marker for (kb_id, phase) exists."""
if not kb_id or not phase:
return False
try:
return bool(REDIS_CONN.exist(_phase_key(kb_id, phase)))
except Exception:
# Markers are an optimization; a Redis miss must NEVER block a run.
logging.exception("has_phase_marker(%s, %s) failed", kb_id, phase)
return False


def set_phase_marker(kb_id: str, phase: str, ttl: int = _DEFAULT_TTL_SECONDS) -> bool:
"""Persist a marker indicating the named phase has completed for kb_id."""
if not kb_id or not phase:
return False
try:
return bool(REDIS_CONN.set(_phase_key(kb_id, phase), "1", ttl))
except Exception:
logging.exception("set_phase_marker(%s, %s) failed", kb_id, phase)
return False


def clear_phase_markers(kb_id: str, phases: tuple[str, ...] = ALL_PHASES) -> None:
"""Drop the named phase markers for kb_id (no-op on miss)."""
if not kb_id:
return
for phase in phases:
try:
REDIS_CONN.delete(_phase_key(kb_id, phase))
except Exception:
logging.exception("clear_phase_markers(%s, %s) failed", kb_id, phase)
Loading