diff --git a/.jules/bolt.md b/.jules/bolt.md index e880f274..6cdcdc7e 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -1,7 +1,3 @@ -## 2024-05-22 - Async File I/O in Synchronous Methods -**Learning:** Even small JSON files (1000 items) can cause significant blocking (23ms) when written synchronously in a tight loop. `dataclasses.asdict` is relatively slow (5ms) but the main bottleneck was redundant disk writes. Offloading to `ThreadPoolExecutor` and removing redundant saves in `tick()` improved performance by 5x (23ms -> 4.7ms). -**Action:** When optimizing "save on every change" patterns, check if intermediate saves (like in `tick`) are necessary and offload the actual I/O to a background thread to unblock the main execution path. - -## 2024-05-23 - Synchronous Audit Logging Bottleneck -**Learning:** `ToolOrchestrator._audit_action` was performing synchronous file I/O (open/write/close) for every tool invocation. This introduced ~68ms latency per 1000 calls. Moving this to a background thread with `queue.Queue` reduced it to ~3ms (20x improvement). -**Action:** For high-frequency logging or audit trails, always use an asynchronous writer or background thread to decouple I/O latency from the main execution path. +## 2026-05-29 - Graph Similarity Search N+1 Query Optimization +**Learning:** The entity similarity search performed N+1 queries by manually fetching related entities in a Python loop to calculate Jaccard similarity. +**Action:** Replaced the loop with a single SQL query using CTEs to compute intersections and total counts, calculating similarity on the database side to avoid N+1 queries. diff --git a/src/ippoc/mnemosyne/graph/manager.py b/src/ippoc/mnemosyne/graph/manager.py index b80121c8..f4d8a9d2 100644 --- a/src/ippoc/mnemosyne/graph/manager.py +++ b/src/ippoc/mnemosyne/graph/manager.py @@ -1,38 +1,57 @@ from typing import List, Dict, Any, Tuple, Optional from collections import defaultdict -from sqlalchemy import Column, Integer, String, Float, ForeignKey, text, DateTime, bindparam +from sqlalchemy import ( + Column, + Integer, + String, + Float, + ForeignKey, + text, + DateTime, + bindparam, +) from sqlalchemy.orm import relationship from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base import os import logging +import json from datetime import datetime logger = logging.getLogger(__name__) Base = declarative_base() + class Entity(Base): """A Node in the Knowledge Graph""" + __tablename__ = "kg_entities" id = Column(Integer, primary_key=True) name = Column(String, unique=True, index=True) - type = Column(String) # Person, Location, Concept - metadata_ = Column("metadata", String) # JSON string + type = Column(String) # Person, Location, Concept + metadata_ = Column("metadata", String) # JSON string + class Relation(Base): """An Edge in the Knowledge Graph""" + __tablename__ = "kg_relations" id = Column(Integer, primary_key=True) source_id = Column(Integer, ForeignKey("kg_entities.id"), index=True) target_id = Column(Integer, ForeignKey("kg_entities.id"), index=True) - relation = Column(String) # e.g. "authored", "is_located_in" + relation = Column(String) # e.g. "authored", "is_located_in" weight = Column(Float, default=1.0) + class GraphManager: def __init__(self, db_url: str = None): - self.db_url = db_url or os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost:5432/ippoc") + self.db_url = db_url or os.getenv( + "DATABASE_URL", "postgresql+asyncpg://user:pass@localhost:5432/ippoc" + ) self.engine = create_async_engine(self.db_url, echo=False) - self.Session = sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False) + self.Session = sessionmaker( + self.engine, class_=AsyncSession, expire_on_commit=False + ) self._initialized = False async def init_db(self): @@ -42,7 +61,14 @@ async def init_db(self): await conn.run_sync(Base.metadata.create_all) self._initialized = True - async def add_triple(self, source: str, relation: str, target: str, source_type="Concept", target_type="Concept"): + async def add_triple( + self, + source: str, + relation: str, + target: str, + source_type="Concept", + target_type="Concept", + ): """ Adds (Source) -> [Relation] -> (Target) to the graph. Idempotent (get_or_create). @@ -50,7 +76,10 @@ async def add_triple(self, source: str, relation: str, target: str, source_type= async with self.Session() as session: # Helper to get/create entity async def get_or_create(name, type_): - res = await session.execute(text("SELECT id FROM kg_entities WHERE name = :name"), {"name": name}) + res = await session.execute( + text("SELECT id FROM kg_entities WHERE name = :name"), + {"name": name}, + ) row = res.fetchone() if row: return row[0] @@ -61,12 +90,14 @@ async def get_or_create(name, type_): s_id = await get_or_create(source, source_type) t_id = await get_or_create(target, target_type) - + # Add relation # Check if exists res = await session.execute( - text("SELECT id FROM kg_relations WHERE source_id=:s AND target_id=:t AND relation=:r"), - {"s": s_id, "t": t_id, "r": relation} + text( + "SELECT id FROM kg_relations WHERE source_id=:s AND target_id=:t AND relation=:r" + ), + {"s": s_id, "t": t_id, "r": relation}, ) if not res.fetchone(): rel = Relation(source_id=s_id, target_id=t_id, relation=relation) @@ -83,7 +114,9 @@ async def get_neighbors(self, entity_name: str) -> List[str]: await self.init_db() async with self.Session() as session: # 1. Find Entity ID - res = await session.execute(text("SELECT id FROM kg_entities WHERE name = :n"), {"n": entity_name}) + res = await session.execute( + text("SELECT id FROM kg_entities WHERE name = :n"), {"n": entity_name} + ) row = res.fetchone() if not row: return [] @@ -91,61 +124,67 @@ async def get_neighbors(self, entity_name: str) -> List[str]: # 2. Find outgoing edges stmt = text(""" - SELECT e.name, r.relation - FROM kg_relations r - JOIN kg_entities e ON r.target_id = e.id + SELECT e.name, r.relation + FROM kg_relations r + JOIN kg_entities e ON r.target_id = e.id WHERE r.source_id = :eid """) out = await session.execute(stmt, {"eid": eid}) - + return [f"-[{row[1]}]-> {row[0]}" for row in out.fetchall()] - - async def find_relationship_path(self, source_entity: str, target_entity: str, max_depth: int = 3) -> List[Dict[str, Any]]: + + async def find_relationship_path( + self, source_entity: str, target_entity: str, max_depth: int = 3 + ) -> List[Dict[str, Any]]: """ Find paths between two entities in the knowledge graph. - + Args: source_entity: Starting entity name target_entity: Target entity name max_depth: Maximum path depth to search - + Returns: List of relationship paths with metadata """ await self.init_db() paths = [] - + try: async with self.Session() as session: # Get entity IDs source_res = await session.execute( - text("SELECT id FROM kg_entities WHERE name = :name"), - {"name": source_entity} + text("SELECT id FROM kg_entities WHERE name = :name"), + {"name": source_entity}, ) source_row = source_res.fetchone() if not source_row: return [] source_id = source_row[0] - + target_res = await session.execute( - text("SELECT id FROM kg_entities WHERE name = :name"), - {"name": target_entity} + text("SELECT id FROM kg_entities WHERE name = :name"), + {"name": target_entity}, ) target_row = target_res.fetchone() if not target_row: return [] target_id = target_row[0] - + # Use Recursive CTE for optimized path finding - paths = await self._find_paths_cte(session, source_id, target_id, max_depth) - + paths = await self._find_paths_cte( + session, source_id, target_id, max_depth + ) + return paths - + except Exception as e: logger.error(f"Path finding failed: {e}") return [] - - async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id: int, max_depth: int) -> List[Dict[str, Any]]: + + async def _find_paths_cte( + self, session: AsyncSession, source_id: int, target_id: int, max_depth: int + ) -> List[Dict[str, Any]]: """Recursive CTE based path finding - Faster than BFS and avoids N+1 queries""" # Recursive CTE query # We use simple string concatenation for path tracking which is portable between Postgres and SQLite @@ -179,11 +218,10 @@ async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id LIMIT 10 """) - result = await session.execute(cte_query, { - "source_id": source_id, - "target_id": target_id, - "max_depth": max_depth - }) + result = await session.execute( + cte_query, + {"source_id": source_id, "target_id": target_id, "max_depth": max_depth}, + ) rows = result.fetchall() @@ -193,13 +231,13 @@ async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id # Collect all unique node IDs to fetch names in bulk all_node_ids = set() parsed_rows = [] - + for row in rows: # Parse path IDs and relations # path_ids is "id1,id2,id3" - ids = [int(x) for x in row[0].split(',')] + ids = [int(x) for x in row[0].split(",")] # path_rels is "rel1,rel2" - rels = row[1].split(',') + rels = row[1].split(",") # Basic cycle check: if IDs are not unique, skip cyclic path if len(ids) != len(set(ids)): @@ -207,7 +245,7 @@ async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id all_node_ids.update(ids) parsed_rows.append((ids, rels)) - + if not parsed_rows: return [] @@ -217,7 +255,7 @@ async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id name_res = await session.execute(name_stmt, {"ids": list(all_node_ids)}) id_to_name = {row.id: row.name for row in name_res} - + paths = [] # Construct result objects for ids, rels in parsed_rows: @@ -233,32 +271,36 @@ async def _find_paths_cte(self, session: AsyncSession, source_id: int, target_id nodes.append(name) if valid_path: - paths.append({ - "nodes": nodes, - "relations": rels, - "length": len(rels), - "confidence": 1.0 - (len(rels) * 0.1) - }) + paths.append( + { + "nodes": nodes, + "relations": rels, + "length": len(rels), + "confidence": 1.0 - (len(rels) * 0.1), + } + ) return paths - - async def get_entity_context(self, entity_name: str, context_types: List[str] = None) -> Dict[str, Any]: + + async def get_entity_context( + self, entity_name: str, context_types: List[str] = None + ) -> Dict[str, Any]: """ Get comprehensive context for an entity including relationships and metadata. - + Args: entity_name: Entity to get context for context_types: Types of context to include ['relationships', 'attributes', 'history'] - + Returns: Dictionary with entity context information """ if context_types is None: - context_types = ['relationships', 'attributes'] - + context_types = ["relationships", "attributes"] + await self.init_db() context = {"entity": entity_name} - + try: async with self.Session() as session: # Get entity details @@ -269,21 +311,23 @@ async def get_entity_context(self, entity_name: str, context_types: List[str] = """) entity_res = await session.execute(entity_stmt, {"name": entity_name}) entity_row = entity_res.fetchone() - + if not entity_row: return {"error": f"Entity '{entity_name}' not found"} - + entity_id, entity_type, metadata_str = entity_row context["type"] = entity_type - + # Parse metadata try: - context["metadata"] = json.loads(metadata_str) if metadata_str else {} - except: + context["metadata"] = ( + json.loads(metadata_str) if metadata_str else {} + ) + except Exception: context["metadata"] = {} - + # Get relationships if requested - if 'relationships' in context_types: + if "relationships" in context_types: # Incoming relationships incoming_stmt = text(""" SELECT e.name, r.relation @@ -291,12 +335,14 @@ async def get_entity_context(self, entity_name: str, context_types: List[str] = JOIN kg_entities e ON r.source_id = e.id WHERE r.target_id = :entity_id """) - incoming_res = await session.execute(incoming_stmt, {"entity_id": entity_id}) + incoming_res = await session.execute( + incoming_stmt, {"entity_id": entity_id} + ) context["incoming_relations"] = [ - {"from": row[0], "relation": row[1]} + {"from": row[0], "relation": row[1]} for row in incoming_res.fetchall() ] - + # Outgoing relationships outgoing_stmt = text(""" SELECT e.name, r.relation @@ -304,14 +350,16 @@ async def get_entity_context(self, entity_name: str, context_types: List[str] = JOIN kg_entities e ON r.target_id = e.id WHERE r.source_id = :entity_id """) - outgoing_res = await session.execute(outgoing_stmt, {"entity_id": entity_id}) + outgoing_res = await session.execute( + outgoing_stmt, {"entity_id": entity_id} + ) context["outgoing_relations"] = [ - {"to": row[0], "relation": row[1]} + {"to": row[0], "relation": row[1]} for row in outgoing_res.fetchall() ] - + # Get attributes if requested - if 'attributes' in context_types: + if "attributes" in context_types: # This would query attribute nodes connected to the entity attr_stmt = text(""" SELECT e.name, r.relation @@ -320,54 +368,62 @@ async def get_entity_context(self, entity_name: str, context_types: List[str] = WHERE r.source_id = :entity_id AND r.relation IN ('has_attribute', 'described_as', 'characterized_by') """) - attr_res = await session.execute(attr_stmt, {"entity_id": entity_id}) + attr_res = await session.execute( + attr_stmt, {"entity_id": entity_id} + ) context["attributes"] = [ {"attribute": row[0], "type": row[1]} for row in attr_res.fetchall() ] - + context["timestamp"] = datetime.now().isoformat() - + return context - + except Exception as e: logger.error(f"Entity context retrieval failed: {e}") return {"error": str(e)} - - async def find_similar_entities(self, entity_name: str, similarity_threshold: float = 0.7) -> List[Dict[str, Any]]: + + async def find_similar_entities( + self, entity_name: str, similarity_threshold: float = 0.7 + ) -> List[Dict[str, Any]]: """ Find entities similar to the given entity based on shared relationships. - + Args: entity_name: Reference entity similarity_threshold: Minimum similarity score (0.0 to 1.0) - + Returns: List of similar entities with similarity scores """ await self.init_db() similar_entities = [] - + try: async with self.Session() as session: # Get reference entity ID ref_id_stmt = text("SELECT id FROM kg_entities WHERE name = :name") ref_id_res = await session.execute(ref_id_stmt, {"name": entity_name}) ref_row = ref_id_res.fetchone() - + if not ref_row: return [] ref_id = ref_row[0] - + # Get reference entity relation count - ref_count_stmt = text("SELECT COUNT(*) FROM kg_relations WHERE source_id = :ref_id") - ref_count_res = await session.execute(ref_count_stmt, {"ref_id": ref_id}) + ref_count_stmt = text( + "SELECT COUNT(*) FROM kg_relations WHERE source_id = :ref_id" + ) + ref_count_res = await session.execute( + ref_count_stmt, {"ref_id": ref_id} + ) ref_total = ref_count_res.scalar() - + if ref_total == 0: return [] - # Intersection-first optimization using CTEs + # Optimization: Intersection-first using CTEs # 1. Identify candidates (entities sharing >=1 relation) -> O(Neighbors) # 2. Count totals for candidates only # 3. Calculate Jaccard in SQL @@ -408,21 +464,26 @@ async def find_similar_entities(self, entity_name: str, similarity_threshold: fl ORDER BY similarity DESC """) - res = await session.execute(stmt, { - "ref_id": ref_id, - "ref_total": ref_total, - "threshold": similarity_threshold - }) + res = await session.execute( + stmt, + { + "ref_id": ref_id, + "ref_total": ref_total, + "threshold": similarity_threshold, + }, + ) for row in res.fetchall(): - similar_entities.append({ - "entity": row[0], - "similarity": row[3], - "shared_relations": row[1] - }) - + similar_entities.append( + { + "entity": row[0], + "similarity": row[3], + "shared_relations": row[1], + } + ) + return similar_entities - + except Exception as e: logger.error(f"Similar entity search failed: {e}") return [] @@ -441,7 +502,10 @@ async def delete_entity(self, entity_name: str) -> int: await self.init_db() async with self.Session() as session: # Find entity ID - res = await session.execute(text("SELECT id FROM kg_entities WHERE name = :n"), {"n": entity_name}) + res = await session.execute( + text("SELECT id FROM kg_entities WHERE name = :n"), + {"n": entity_name}, + ) row = res.fetchone() if not row: return 0 @@ -449,7 +513,9 @@ async def delete_entity(self, entity_name: str) -> int: eid = row[0] # Delete relations where source or target is this entity - stmt = text("DELETE FROM kg_relations WHERE source_id = :eid OR target_id = :eid") + stmt = text( + "DELETE FROM kg_relations WHERE source_id = :eid OR target_id = :eid" + ) result = await session.execute(stmt, {"eid": eid}) deleted_relations = result.rowcount @@ -460,7 +526,9 @@ async def delete_entity(self, entity_name: str) -> int: await session.commit() total = 1 + deleted_relations - logger.info(f"Deleted entity '{entity_name}' and {deleted_relations} relations") + logger.info( + f"Deleted entity '{entity_name}' and {deleted_relations} relations" + ) return total except Exception as e: