Skip to content
Merged
8 changes: 7 additions & 1 deletion api/apps/restful_apis/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,14 @@ def delete_index(tenant_id, dataset_id, index_type):
index_type = index_type.lower()
if index_type not in dataset_api_service._VALID_INDEX_TYPES:
return get_error_argument_result(f"Invalid index type '{index_type}'")
# `wipe` controls whether the persisted index artefacts (graph rows /
# raptor summaries) are removed. Default true preserves historical
# behaviour; pass wipe=false to cancel the running task while keeping
# prior progress so it can be resumed later.
wipe_arg = (request.args.get("wipe", "true") or "true").strip().lower()
wipe = wipe_arg not in ("false", "0", "no", "off")
try:
success, result = dataset_api_service.delete_index(dataset_id, tenant_id, index_type)
success, result = dataset_api_service.delete_index(dataset_id, tenant_id, index_type, wipe=wipe)
if success:
return get_result(data=result)
else:
Expand Down
29 changes: 22 additions & 7 deletions api/apps/services/dataset_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,12 @@ def delete_knowledge_graph(dataset_id: str, tenant_id: str):
return False, "No authorization."
_, kb = KnowledgebaseService.get_by_id(dataset_id)
from rag.nlp import search

settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), dataset_id)
from rag.graphrag.phase_markers import clear_phase_markers
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation", "community_report"]},
search.index_name(kb.tenant_id), dataset_id)
# Wiping the graph invalidates any phase-completion markers used to
# short-circuit resolution / community detection on resume.
clear_phase_markers(dataset_id)

return True, True

Expand Down Expand Up @@ -770,13 +774,17 @@ def get_ingestion_log(dataset_id: str, tenant_id: str, log_id: str):
return True, log.to_dict()


def delete_index(dataset_id: str, tenant_id: str, index_type: str):
def delete_index(dataset_id: str, tenant_id: str, index_type: str, wipe: bool = True):
"""
Delete an indexing task (graph/raptor/mindmap) for a dataset.

:param dataset_id: dataset ID
:param tenant_id: tenant ID
:param index_type: one of "graph", "raptor", "mindmap"
:param wipe: when True (default) the persisted artefacts (graph rows,
raptor summaries) are removed from the doc store and any GraphRAG
phase-completion markers are cleared. Pass False to cancel the
running task while keeping prior progress so it can be resumed.
:return: (success, result) or (success, error_message)
"""
if index_type not in _VALID_INDEX_TYPES:
Expand All @@ -796,6 +804,8 @@ def delete_index(dataset_id: str, tenant_id: str, index_type: str):
task_finish_at_field = f"{task_id_field.replace('_task_id', '_task_finish_at')}"
task_id = getattr(kb, task_id_field, None)

logging.info("delete_index: dataset=%s index_type=%s wipe=%s", dataset_id, index_type, wipe)

if task_id:
from rag.utils.redis_conn import REDIS_CONN

Expand All @@ -805,11 +815,16 @@ def delete_index(dataset_id: str, tenant_id: str, index_type: str):
logging.exception(e)
TaskService.delete_by_id(task_id)

if index_type == "graph":
if wipe and index_type == "graph":
from rag.nlp import search

settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), dataset_id)
elif index_type == "raptor":
from rag.graphrag.phase_markers import clear_phase_markers
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation", "community_report"]},
search.index_name(kb.tenant_id), dataset_id)
# Wiping the graph invalidates any phase-completion markers used to
# short-circuit resolution / community detection on resume.
clear_phase_markers(dataset_id)
logging.info("delete_index: cleared GraphRAG artefacts and phase markers for dataset=%s", dataset_id)
elif wipe and index_type == "raptor":
from rag.nlp import search

settings.docStoreConn.delete({"raptor_kwd": ["raptor"]}, search.index_name(kb.tenant_id), dataset_id)
Expand Down
9 changes: 8 additions & 1 deletion rag/graphrag/general/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: Gr
node1_attrs = graph.nodes[node1]
node0_attrs["description"] += f"{GRAPH_FIELD_SEP}{node1_attrs['description']}"
node0_attrs["source_id"] = sorted(set(node0_attrs["source_id"] + node1_attrs["source_id"]))
for neighbor in graph.neighbors(node1):
# Snapshot neighbors before mutation; otherwise networkx raises
# "dictionary keys changed during iteration" when concurrent merges
# or graph.add_edge/remove_node below touch the same adjacency dict.
for neighbor in list(graph.neighbors(node1)):
change.removed_edges.add(get_from_to(node1, neighbor))
if neighbor not in nodes_set:
edge1_attrs = graph.get_edge_data(node1, neighbor)
Expand All @@ -335,6 +338,10 @@ async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: Gr
graph.add_edge(nodes[0], neighbor, **edge0_attrs)
else:
graph.add_edge(nodes[0], neighbor, **edge1_attrs)
# Track the redirected neighbour so a later node1 in this
# merge that also points to it takes the merge branch
# above instead of overwriting the edge we just added.
node0_neighbors.add(neighbor)
graph.remove_node(node1)
node0_attrs["description"] = await self._handle_entity_relation_summary(nodes[0], node0_attrs["description"], task_id=task_id)
graph.nodes[nodes[0]].update(node0_attrs)
Expand Down
118 changes: 102 additions & 16 deletions rag/graphrag/general/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@
from api.db.services.document_service import DocumentService
from api.db.services.task_service import has_canceled
from common.exceptions import TaskCanceledException
from common.misc_utils import get_uuid
from common.connection_utils import timeout
from rag.graphrag.entity_resolution import EntityResolution
from rag.graphrag.general.community_reports_extractor import CommunityReportsExtractor
from rag.graphrag.general.extractor import Extractor
from rag.graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt
from rag.graphrag.light.graph_extractor import GraphExtractor as LightKGExt
from rag.graphrag.phase_markers import (
PHASE_COMMUNITY,
PHASE_RESOLUTION,
clear_phase_markers,
has_phase_marker,
set_phase_marker,
)
from rag.graphrag.utils import (
GraphChange,
chunk_id,
does_graph_contains,
get_graph,
graph_merge,
insert_chunks_bounded,
set_graph,
tidy_graph,
)
Expand Down Expand Up @@ -354,8 +361,16 @@ async def build_one(doc_id: str):
raise TaskCanceledException(f"Task {row['id']} was cancelled")

ok_docs = [d for d in doc_ids if d in subgraphs]
if not ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs generated successfully, end.")
final_graph = None

# Determine whether the resolution/community phases still need to run on
# this KB. Markers from a prior task let us skip already-completed phases
# even when no new docs are merged this round (the resume path).
resolution_pending = with_resolution and not has_phase_marker(kb_id, PHASE_RESOLUTION)
community_pending = with_community and not has_phase_marker(kb_id, PHASE_COMMUNITY)

if not ok_docs and not resolution_pending and not community_pending:
callback(msg=f"[GraphRAG] kb:{kb_id} no subgraphs to merge and no phases pending, end.")
now = asyncio.get_running_loop().time()
return {"ok_docs": [], "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

Expand All @@ -369,7 +384,6 @@ async def build_one(doc_id: str):

try:
union_nodes: set = set()
final_graph = None

for doc_id in ok_docs:
sg = subgraphs[doc_id]
Expand All @@ -386,10 +400,17 @@ async def build_one(doc_id: str):
if new_graph is not None:
final_graph = new_graph

if final_graph is None:
if ok_docs and final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished (no in-memory graph returned).")
else:
elif ok_docs:
callback(msg=f"[GraphRAG] kb:{kb_id} merge finished, graph ready.")
# New content was merged into the global graph; any prior
# resolution/community results are now stale and must be redone
# on this or a future run. Clear phase markers accordingly.
clear_phase_markers(kb_id)
resolution_pending = with_resolution
community_pending = with_community
callback(msg=f"[GraphRAG] kb:{kb_id} cleared phase markers after merge.")
finally:
kb_lock.release()

Expand All @@ -398,6 +419,11 @@ async def build_one(doc_id: str):
callback(msg=f"[GraphRAG] KB merge done in {now - start:.2f}s. ok={len(ok_docs)} / total={len(doc_ids)}")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

if not resolution_pending and not community_pending:
now = asyncio.get_running_loop().time()
callback(msg=f"[GraphRAG] kb:{kb_id} all requested phases already complete; nothing to do.")
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}

if has_canceled(row["id"]):
callback(msg=f"Task {row['id']} cancelled before resolution/community extraction.")
raise TaskCanceledException(f"Task {row['id']} was cancelled")
Expand All @@ -406,11 +432,26 @@ async def build_one(doc_id: str):
callback(msg=f"[GraphRAG] kb:{kb_id} post-merge lock acquired for resolution/community")

try:
# Resume path: no docs were merged this round but pending phases
# require the previously-persisted graph. Load it from the doc store.
if final_graph is None:
final_graph = await get_graph(tenant_id, kb_id)
if final_graph is None:
callback(msg=f"[GraphRAG] kb:{kb_id} no persisted graph found; cannot run resolution/community.")
now = asyncio.get_running_loop().time()
return {"ok_docs": ok_docs, "failed_docs": failed_docs, "total_docs": len(doc_ids), "total_chunks": total_chunks, "seconds": now - start}
callback(msg=f"[GraphRAG] kb:{kb_id} loaded persisted graph for resume.")

subgraph_nodes = set()
for sg in subgraphs.values():
subgraph_nodes.update(set(sg.nodes()))
# On a pure-resume run (no new docs) the union of "newly added" nodes
# is empty, but resolution still needs *some* anchor set. Fall back to
# all graph nodes so candidate pairing actually finds something.
if not subgraph_nodes:
subgraph_nodes = set(final_graph.nodes())

if with_resolution:
if resolution_pending:
await resolve_entities(
final_graph,
subgraph_nodes,
Expand All @@ -422,8 +463,11 @@ async def build_one(doc_id: str):
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_RESOLUTION)
elif with_resolution:
callback(msg=f"[GraphRAG] kb:{kb_id} resolution already completed previously, skipping.")

if with_community:
if community_pending:
await extract_community(
final_graph,
tenant_id,
Expand All @@ -434,6 +478,9 @@ async def build_one(doc_id: str):
callback,
task_id=row["id"],
)
set_phase_marker(kb_id, PHASE_COMMUNITY)
elif with_community:
callback(msg=f"[GraphRAG] kb:{kb_id} community detection already completed previously, skipping.")
finally:
kb_lock.release()

Expand Down Expand Up @@ -632,8 +679,17 @@ async def extract_community(
"report": rep,
"evidences": "\n".join([f.get("explanation", "") for f in stru["findings"]]),
}
# Deterministic id derived from (kb_id, community title) so reruns of
# extract_community produce stable ids. Combined with insert-then-
# prune below, this means a crash mid-insert leaves the prior set of
# community reports intact -- never the partial-delete state the old
# delete-then-insert order produced.
chunk_payload_for_id = {
"content_with_weight": f"community_report::{stru['title']}",
"kb_id": kb_id,
}
chunk = {
"id": get_uuid(),
"id": chunk_id(chunk_payload_for_id),
"docnm_kwd": stru["title"],
"title_tks": rag_tokenizer.tokenize(stru["title"]),
"content_with_weight": json.dumps(obj, ensure_ascii=False),
Expand All @@ -649,13 +705,43 @@ async def extract_community(
chunk["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(chunk["content_ltks"])
chunks.append(chunk)

await thread_pool_exec(settings.docStoreConn.delete,{"knowledge_graph_kwd": "community_report", "kb_id": kb_id},search.index_name(tenant_id),kb_id,)
es_bulk_size = 4
for b in range(0, len(chunks), es_bulk_size):
doc_store_result = await thread_pool_exec(settings.docStoreConn.insert,chunks[b : b + es_bulk_size],search.index_name(tenant_id),kb_id,)
if doc_store_result:
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
raise Exception(error_message)
new_ids: set[str] = {c["id"] for c in chunks}

# Snapshot existing community_report ids BEFORE inserting so we can
# delete exactly the stale set afterwards. If the search fails we fall
# back to the prior delete-everything-then-insert behaviour rather than
# leaving an inconsistent mix.
old_ids: list[str] = []
try:
existing_res = await thread_pool_exec(
settings.docStoreConn.search,
["id"], [], {"knowledge_graph_kwd": ["community_report"]}, [], OrderByExpr(),
0, 10000, search.index_name(tenant_id), [kb_id],
)
existing_fields = settings.docStoreConn.get_fields(existing_res, ["id"])
old_ids = list(existing_fields.keys())
except Exception:
logging.exception("Failed to enumerate existing community reports for kb %s; falling back to delete-then-insert.", kb_id)
await thread_pool_exec(settings.docStoreConn.delete, {"knowledge_graph_kwd": "community_report", "kb_id": kb_id}, search.index_name(tenant_id), kb_id)
old_ids = []

await insert_chunks_bounded(chunks, tenant_id, kb_id, callback=callback, label="Insert community reports")

# Now that all new reports are persisted, prune stale rows. Anything in
# old_ids that is not also in new_ids is no longer current (community
# composition changed across runs). A failure here just leaves stale
# rows; the new rows are already in place.
stale_ids = [i for i in old_ids if i not in new_ids]
if stale_ids:
try:
await thread_pool_exec(
settings.docStoreConn.delete,
{"knowledge_graph_kwd": ["community_report"], "id": stale_ids},
search.index_name(tenant_id),
kb_id,
)
except Exception:
logging.exception("Failed to prune %d stale community reports for kb %s", len(stale_ids), kb_id)

if task_id and has_canceled(task_id):
callback(msg=f"Task {task_id} cancelled after community indexing.")
Expand Down
85 changes: 85 additions & 0 deletions rag/graphrag/phase_markers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""GraphRAG phase-completion markers.

Markers let a re-run of GraphRAG skip phases that already completed in a
prior (possibly cancelled or crashed) task on the same KB.

Markers are stored in Redis under ``graphrag:phase:{kb_id}:{phase}`` with a
7-day TTL. They are intentionally KB-scoped (not task-scoped) so they
survive task cancellation and the creation of a new task on resume.

Invalidation rules (callers responsibility):
* ``clear_phase_markers`` is invoked by ``run_graphrag_for_kb`` whenever new
document content is merged into the global graph -- the merged graph has
changed, so prior resolution and community results are stale.
* ``clear_phase_markers`` is invoked by the unbind-task endpoint when the
caller asks to wipe the graph.
"""

from __future__ import annotations

import logging

from rag.utils.redis_conn import REDIS_CONN


PHASE_RESOLUTION = "resolution_done"
PHASE_COMMUNITY = "community_done"

ALL_PHASES = (PHASE_RESOLUTION, PHASE_COMMUNITY)

# 7 days is well above any expected single GraphRAG run on typical hardware
# and keeps stale markers self-pruning if invalidation paths are missed.
_DEFAULT_TTL_SECONDS = 7 * 24 * 3600


def _phase_key(kb_id: str, phase: str) -> str:
return f"graphrag:phase:{kb_id}:{phase}"


def has_phase_marker(kb_id: str, phase: str) -> bool:
"""Return True iff the marker for (kb_id, phase) exists."""
if not kb_id or not phase:
return False
try:
return bool(REDIS_CONN.exist(_phase_key(kb_id, phase)))
except Exception:
# Markers are an optimization; a Redis miss must NEVER block a run.
logging.exception("has_phase_marker(%s, %s) failed", kb_id, phase)
return False


def set_phase_marker(kb_id: str, phase: str, ttl: int = _DEFAULT_TTL_SECONDS) -> bool:
"""Persist a marker indicating the named phase has completed for kb_id."""
if not kb_id or not phase:
return False
try:
return bool(REDIS_CONN.set(_phase_key(kb_id, phase), "1", ttl))
except Exception:
logging.exception("set_phase_marker(%s, %s) failed", kb_id, phase)
return False


def clear_phase_markers(kb_id: str, phases: tuple[str, ...] = ALL_PHASES) -> None:
"""Drop the named phase markers for kb_id (no-op on miss)."""
if not kb_id:
return
for phase in phases:
try:
REDIS_CONN.delete(_phase_key(kb_id, phase))
except Exception:
logging.exception("clear_phase_markers(%s, %s) failed", kb_id, phase)
Loading
Loading