diff --git a/docs/en/api/05-sessions.md b/docs/en/api/05-sessions.md index 7837639dad..a523fc2d90 100644 --- a/docs/en/api/05-sessions.md +++ b/docs/en/api/05-sessions.md @@ -1231,6 +1231,7 @@ if task != nil { "result": { "session_id": "a1b2c3d4", "archive_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001", + "memory_diff_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001/memory_diff.json", "memories_extracted": { "profile": 1, "preferences": 2, @@ -1379,7 +1380,7 @@ viking://user/{user_id}/sessions/{session_id}/ | +-- .abstract.md # Written in Phase 2 (background) | +-- .overview.md # Written in Phase 2 (background) | +-- .meta.json # Archive metadata - | +-- memory_diff.json # Written in Phase 2 (background, on memory changes) + | +-- memory_diff.json # Written when long-term memory extraction completes | +-- .done # Phase 2 completion marker | +-- .failed.json # Phase 2 failure marker +-- archive_002/ @@ -1387,7 +1388,7 @@ viking://user/{user_id}/sessions/{session_id}/ ### memory_diff.json Structure -Each commit writes a `memory_diff.json` to the archive directory, recording all memory changes for auditing and rollback: +When long-term memory extraction runs successfully, the commit writes a `memory_diff.json` to the archive directory, recording all memory changes for auditing and rollback: ```json { @@ -1436,7 +1437,7 @@ Each commit writes a `memory_diff.json` to the archive directory, recording all | `summary.total_updates` | int | Number of modified memories | | `summary.total_deletes` | int | Number of deleted memories | -An empty `memory_diff.json` (all counts zero) is written even when no memory operations occurred. +An empty `memory_diff.json` (all counts zero) is written when long-term memory extraction runs but produces no memory operations. --- diff --git a/docs/zh/api/05-sessions.md b/docs/zh/api/05-sessions.md index ea54e3af4d..4ed7ed3acd 100644 --- a/docs/zh/api/05-sessions.md +++ b/docs/zh/api/05-sessions.md @@ -1205,6 +1205,7 @@ if task != nil { "result": { "session_id": "a1b2c3d4", "archive_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001", + "memory_diff_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001/memory_diff.json", "memories_extracted": { "profile": 1, "preferences": 2, @@ -1351,7 +1352,7 @@ viking://user/{user_id}/sessions/{session_id}/ │ ├── .abstract.md # Phase 2 写入(后台) │ ├── .overview.md # Phase 2 写入(后台) │ ├── .meta.json # 归档元数据 - │ ├── memory_diff.json # Phase 2 写入(后台,记忆变更时) + │ ├── memory_diff.json # 长记忆抽取完成时写入 │ ├── .done # Phase 2 完成标记 │ └── .failed.json # Phase 2 失败标记 └── archive_002/ @@ -1359,7 +1360,7 @@ viking://user/{user_id}/sessions/{session_id}/ ### memory_diff.json 数据结构 -每次提交会在归档目录写入 `memory_diff.json`,记录所有记忆变更,便于审计和回溯: +长记忆抽取成功运行时,会在归档目录写入 `memory_diff.json`,记录所有记忆变更,便于审计和回溯: ```json { @@ -1408,7 +1409,7 @@ viking://user/{user_id}/sessions/{session_id}/ | `summary.total_updates` | int | 修改记忆数 | | `summary.total_deletes` | int | 删除记忆数 | -即使没有记忆操作,也会写入空结构的 `memory_diff.json`(所有计数为零)。 +如果长记忆抽取已运行但没有产生记忆操作,也会写入空结构的 `memory_diff.json`(所有计数为零)。 --- diff --git a/openviking/session/compressor_v2.py b/openviking/session/compressor_v2.py index 9ba691b22b..5d220d87bb 100644 --- a/openviking/session/compressor_v2.py +++ b/openviking/session/compressor_v2.py @@ -283,7 +283,6 @@ async def extract_long_term_memories( telemetry.set("memory.extract.skipped", 0) from openviking.storage.transaction import get_lock_manager, init_lock_manager - from openviking.storage.viking_fs import get_viking_fs # 初始化锁管理器(仅在有 AGFS 时使用锁机制) viking_fs = get_viking_fs() @@ -381,33 +380,36 @@ async def extract_long_term_memories( if operations is None: tracer.info("No memory operations generated") - return [] - - updater = self._get_or_create_updater(registry, transaction_handle) + result = MemoryUpdateResult() + else: + updater = self._get_or_create_updater(registry, transaction_handle) - # Apply operations with isolation_handler - result = await updater.apply_operations( - operations, - ctx, - extract_context=extract_context, - isolation_handler=isolation_handler, - ) + # Apply operations with isolation_handler + result = await updater.apply_operations( + operations, + ctx, + extract_context=extract_context, + isolation_handler=isolation_handler, + ) - tracer.info( - f"Applied memory operations: written={len(result.written_uris)}, " - f"edited={len(result.edited_uris)}, deleted={len(result.deleted_uris)}, " - f"errors={len(result.errors)}" - ) + tracer.info( + f"Applied memory operations: written={len(result.written_uris)}, " + f"edited={len(result.edited_uris)}, deleted={len(result.deleted_uris)}, " + f"errors={len(result.errors)}" + ) # Write memory_diff.json to archive directory if archive_uri and viking_fs: - memory_diff = await self._build_memory_diff( - result=result, - operations=operations, - viking_fs=viking_fs, - ctx=ctx, - archive_uri=archive_uri, - ) + if operations is None: + memory_diff = self._empty_memory_diff(archive_uri) + else: + memory_diff = await self._build_memory_diff( + result=result, + operations=operations, + viking_fs=viking_fs, + ctx=ctx, + archive_uri=archive_uri, + ) await viking_fs.write_file( uri=f"{archive_uri}/memory_diff.json", content=json.dumps(memory_diff, ensure_ascii=False, indent=4), @@ -474,6 +476,23 @@ async def extract_long_term_memories( except Exception as e: logger.warning(f"Failed to release transaction lock: {e}") + @staticmethod + def _empty_memory_diff(archive_uri: str = "") -> Dict[str, Any]: + return { + "archive_uri": archive_uri, + "extracted_at": datetime.utcnow().isoformat() + "Z", + "operations": { + "adds": [], + "updates": [], + "deletes": [], + }, + "summary": { + "total_adds": 0, + "total_updates": 0, + "total_deletes": 0, + }, + } + @tracer(ignore_result=True) async def extract_execution_memories( self, diff --git a/openviking/session/session.py b/openviking/session/session.py index 9585e7393f..c36df46891 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -1272,6 +1272,7 @@ async def _run_memory_extraction( memories_extracted: Dict[str, int] = {} extracted_skill_results: list[dict] = [] active_count_updated = 0 + memory_diff_uri: Optional[str] = None telemetry = OperationTelemetry(operation="session_commit_phase2", enabled=True) archive_index = self._archive_index_from_uri(archive_uri) redo_task_id: Optional[str] = None @@ -1483,6 +1484,14 @@ async def _run_execution_memory_extraction() -> Any: if extraction_error is not None: raise extraction_error + if long_term_has_work and self._viking_fs: + candidate_memory_diff_uri = f"{archive_uri}/memory_diff.json" + if await self._viking_fs.exists( + candidate_memory_diff_uri, + ctx=self.ctx, + ): + memory_diff_uri = candidate_memory_diff_uri + total_extracted = 0 for label, result in zip(extraction_labels[1:], _results[1:], strict=True): if isinstance(result, dict): @@ -1580,30 +1589,34 @@ async def _run_execution_memory_extraction() -> Any: # would have raised above and marked the archive .failed.json. await self._write_done_file(archive_uri, first_message_id, last_message_id) - await tracker.complete( - task_id, - { - "session_id": self.session_id, - "archive_uri": archive_uri, - "memories_extracted": memories_extracted, - "session_skills_extracted": len(extracted_skill_results), - "session_skill_uris": [ - item.get("uri") or item.get("root_uri") - for item in extracted_skill_results - if isinstance(item, dict) and (item.get("uri") or item.get("root_uri")) - ], - "active_count_updated": active_count_updated, - "token_usage": { - "llm": dict(self._meta.llm_token_usage), - "embedding": dict(self._meta.embedding_token_usage), - "total": { - "total_tokens": self._meta.llm_token_usage["total_tokens"] - + self._meta.embedding_token_usage["total_tokens"], - "cached_tokens": self._meta.llm_token_usage["cached_tokens"], - "reasoning_tokens": self._meta.llm_token_usage["reasoning_tokens"], - }, + result_payload = { + "session_id": self.session_id, + "archive_uri": archive_uri, + "memories_extracted": memories_extracted, + "session_skills_extracted": len(extracted_skill_results), + "session_skill_uris": [ + item.get("uri") or item.get("root_uri") + for item in extracted_skill_results + if isinstance(item, dict) and (item.get("uri") or item.get("root_uri")) + ], + "active_count_updated": active_count_updated, + "token_usage": { + "llm": dict(self._meta.llm_token_usage), + "embedding": dict(self._meta.embedding_token_usage), + "total": { + "total_tokens": self._meta.llm_token_usage["total_tokens"] + + self._meta.embedding_token_usage["total_tokens"], + "cached_tokens": self._meta.llm_token_usage["cached_tokens"], + "reasoning_tokens": self._meta.llm_token_usage["reasoning_tokens"], }, }, + } + if memory_diff_uri: + result_payload["memory_diff_uri"] = memory_diff_uri + + await tracker.complete( + task_id, + result_payload, account_id=self.ctx.account_id, user_id=self.ctx.user.user_id, ) diff --git a/tests/server/test_api_sessions.py b/tests/server/test_api_sessions.py index 5aaed5880c..8fe4ecba05 100644 --- a/tests/server/test_api_sessions.py +++ b/tests/server/test_api_sessions.py @@ -771,6 +771,7 @@ async def test_compress_session(client: httpx.AsyncClient): body = resp.json() assert body["status"] == "ok" assert body["result"]["status"] == "accepted" + assert "memory_diff_uri" not in body["result"] assert "usage" not in body assert "telemetry" not in body diff --git a/tests/session/test_session_commit.py b/tests/session/test_session_commit.py index 5bc75be1f9..c8424a8c22 100644 --- a/tests/session/test_session_commit.py +++ b/tests/session/test_session_commit.py @@ -47,6 +47,7 @@ async def test_commit_success(self, session_with_messages: Session): assert result.get("status") == "accepted" assert "session_id" in result assert result.get("task_id") is not None + assert "memory_diff_uri" not in result assert "memories_extracted" not in result async def test_commit_extracts_memories( @@ -59,6 +60,17 @@ async def test_commit_extracts_memories( # Wait for background memory extraction to complete task_result = await _wait_for_task(task_id) assert task_result["status"] == "completed" + assert ( + task_result["result"]["memory_diff_uri"] + == f"{task_result['result']['archive_uri']}/memory_diff.json" + ) + memory_diff = json.loads( + await session_with_messages._viking_fs.read_file( + task_result["result"]["memory_diff_uri"], + ctx=session_with_messages.ctx, + ) + ) + assert memory_diff["archive_uri"] == task_result["result"]["archive_uri"] assert "memories_extracted" in task_result["result"] memory_counts = task_result["result"]["memories_extracted"] assert isinstance(memory_counts, dict) @@ -96,6 +108,7 @@ async def test_commit_reports_session_skills_separately( assert task_result["result"]["session_skill_uris"] == [ "viking://user/test/skills/code-review" ] + assert "memory_diff_uri" not in task_result["result"] session_with_messages._session_compressor.extract_long_term_memories.assert_not_awaited() session_with_messages._session_compressor.extract_execution_memories.assert_awaited_once() call_kwargs = ( @@ -131,6 +144,7 @@ async def test_commit_skips_session_skills_without_execution_memory_type( assert task_result["status"] == "completed" assert task_result["result"]["memories_extracted"] == {} assert task_result["result"]["session_skills_extracted"] == 0 + assert "memory_diff_uri" not in task_result["result"] session_with_messages._session_compressor.extract_long_term_memories.assert_awaited_once() session_with_messages._session_compressor.extract_execution_memories.assert_not_awaited() @@ -156,6 +170,7 @@ async def test_commit_skips_session_skill_extraction_when_disabled( assert task_result["status"] == "completed" assert task_result["result"]["session_skills_extracted"] == 0 assert task_result["result"]["session_skill_uris"] == [] + assert "memory_diff_uri" not in task_result["result"] session_with_messages._session_compressor.extract_long_term_memories.assert_awaited_once() session_with_messages._session_compressor.extract_execution_memories.assert_awaited_once() call_kwargs = (