Fix: serialize GraphRAG entity resolution merges to avoid graph mutation races #14237
Conversation
📝 WalkthroughWalkthroughTwo files were modified to fix concurrent graph mutation errors during entity resolution's merge phase. The entity resolution module now serializes merge operations using a dedicated asyncio lock, while the extractor module materializes a graph neighbors iterator into a list before iteration to ensure stable iteration semantics when the graph is modified. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rag/graphrag/general/extractor.py (1)
322-337:⚠️ Potential issue | 🟠 MajorUpdate
node0_neighborsafter adding a new edge.The
list(...)snapshot fixes the iterator crash, butnode0_neighborsis still stale after the firstgraph.add_edge(nodes[0], neighbor, **edge1_attrs). If two merged nodes both point to the same external neighbor thatnodes[0]did not originally have, the later one will hit theelsebranch again and overwrite the existing edge attrs instead of aggregating them.💡 Proposed fix
for neighbor in list(graph.neighbors(node1)): change.removed_edges.add(get_from_to(node1, neighbor)) if neighbor not in nodes_set: edge1_attrs = graph.get_edge_data(node1, neighbor) if neighbor in node0_neighbors: # Merge two edges change.added_updated_edges.add(get_from_to(nodes[0], neighbor)) edge0_attrs = graph.get_edge_data(nodes[0], neighbor) edge0_attrs["weight"] += edge1_attrs["weight"] edge0_attrs["description"] += f"{GRAPH_FIELD_SEP}{edge1_attrs['description']}" for attr in ["keywords", "source_id"]: edge0_attrs[attr] = sorted(set(edge0_attrs[attr] + edge1_attrs[attr])) edge0_attrs["description"] = await self._handle_entity_relation_summary(f"({nodes[0]}, {neighbor})", edge0_attrs["description"], task_id=task_id) graph.add_edge(nodes[0], neighbor, **edge0_attrs) else: graph.add_edge(nodes[0], neighbor, **edge1_attrs) + node0_neighbors.add(neighbor)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rag/graphrag/general/extractor.py` around lines 322 - 337, The loop creates new edges to nodes[0] but never updates the node0_neighbors set, so when a later neighbor also connects to the same external node you end up overwriting instead of merging; after any graph.add_edge(nodes[0], neighbor, ...) (both in the merge and in the else branch where edge1_attrs is copied) update node0_neighbors (e.g., node0_neighbors.add(neighbor) or re-fetch neighbors via set(graph.neighbors(nodes[0]))) so subsequent iterations see the newly-added neighbor and go through the merge logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@rag/graphrag/general/extractor.py`:
- Around line 322-337: The loop creates new edges to nodes[0] but never updates
the node0_neighbors set, so when a later neighbor also connects to the same
external node you end up overwriting instead of merging; after any
graph.add_edge(nodes[0], neighbor, ...) (both in the merge and in the else
branch where edge1_attrs is copied) update node0_neighbors (e.g.,
node0_neighbors.add(neighbor) or re-fetch neighbors via
set(graph.neighbors(nodes[0]))) so subsequent iterations see the newly-added
neighbor and go through the merge logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c24eaddb-739a-4c04-89a2-b685f23fdf1b
📒 Files selected for processing (2)
rag/graphrag/entity_resolution.pyrag/graphrag/general/extractor.py
6ba3i
left a comment
There was a problem hiding this comment.
Thanks for the fix! I traced the path for #14236 and I agree with the main diagnosis: the crash happens in the merge phase, where multiple merge coroutines can mutate the same shared networkx graph at the same time.
Because of that, I think the fix here is the merge serialization in rag/graphrag/entity_resolution.py.
I would recommend keeping:
merge_lock = asyncio.Lock()async with merge_lock:inlimited_merge_nodes(...)
and dropping the list(graph.neighbors(node1)) change in rag/graphrag/general/extractor.py.
My reasoning is that once merge coroutines are serialized, that snapshot is no longer needed for the reported crash, so removing it keeps the patch smaller and more tightly scoped to the actual issue.
Suggested minimal diff:
diff --git a/rag/graphrag/entity_resolution.py b/rag/graphrag/entity_resolution.py
index 6c3c48aeb..d75898ae2 100644
--- a/rag/graphrag/entity_resolution.py
+++ b/rag/graphrag/entity_resolution.py
@@ -158,9 +158,10 @@ class EntityResolution(Extractor):
change = GraphChange()
connect_graph = nx.Graph()
connect_graph.add_edges_from(resolution_result)
+ merge_lock = asyncio.Lock()
async def limited_merge_nodes(graph, nodes, change):
- async with semaphore:
+ async with merge_lock:
await self._merge_graph_nodes(graph, nodes, change, task_id)Thanks again for your contribution !
|
@6ba3i Thanks for the review. I updated the fix to follow your suggestion and kept the patch minimal. The current change keeps only the merge serialization in rag/graphrag/entity_resolution.py: merge_lock = asyncio.Lock() I would appreciate your feedback. Thanks :) |
6ba3i
left a comment
There was a problem hiding this comment.
thank you for the modifications ! seems to be correct but do you mind sharing some logs of pre and post fix ? following that i'll be ready to recommend a merge.
Thank you @spider-yamet
|
@6ba3i Thanks for the follow-up. I ran a focused local repro of the merge-phase race and got the following before/after behavior.
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #14237 +/- ##
==========================================
- Coverage 98.11% 95.97% -2.15%
==========================================
Files 10 10
Lines 690 695 +5
Branches 108 111 +3
==========================================
- Hits 677 667 -10
- Misses 4 11 +7
- Partials 9 17 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|

What problem does this PR solve?
This PR fixes the merge-phase crash reported in #14236 during GraphRAG entity resolution.
The issue happens after candidate pair resolution completes, when multiple merge coroutines mutate the same shared
networkxgraph concurrently. In_merge_graph_nodes, the code iterates overgraph.neighbors(node1)and also awaits during edge/description merging. That allows another coroutine to modify the graph adjacency structure in between, which can triggerRuntimeError: dictionary keys changed during iterationand can also lead to unsafe shared-graph mutation.This change keeps the PR scoped to that single issue by:
graph.neighbors(node1)withlist(...)before iterationTogether, these changes prevent concurrent mutation of the shared graph during the merge phase and make the merge loop safe against live-view invalidation.
Fixes #14236
Type of change