Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/en/api/05-sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ if task != nil {
"result": {
"session_id": "a1b2c3d4",
"archive_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001",
"memory_diff_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001/memory_diff.json",
"memories_extracted": {
"profile": 1,
"preferences": 2,
Expand Down Expand Up @@ -1379,15 +1380,15 @@ viking://user/{user_id}/sessions/{session_id}/
| +-- .abstract.md # Written in Phase 2 (background)
| +-- .overview.md # Written in Phase 2 (background)
| +-- .meta.json # Archive metadata
| +-- memory_diff.json # Written in Phase 2 (background, on memory changes)
| +-- memory_diff.json # Written when long-term memory extraction completes
| +-- .done # Phase 2 completion marker
| +-- .failed.json # Phase 2 failure marker
+-- archive_002/
```

### memory_diff.json Structure

Each commit writes a `memory_diff.json` to the archive directory, recording all memory changes for auditing and rollback:
When long-term memory extraction runs successfully, the commit writes a `memory_diff.json` to the archive directory, recording all memory changes for auditing and rollback:

```json
{
Expand Down Expand Up @@ -1436,7 +1437,7 @@ Each commit writes a `memory_diff.json` to the archive directory, recording all
| `summary.total_updates` | int | Number of modified memories |
| `summary.total_deletes` | int | Number of deleted memories |

An empty `memory_diff.json` (all counts zero) is written even when no memory operations occurred.
An empty `memory_diff.json` (all counts zero) is written when long-term memory extraction runs but produces no memory operations.

---

Expand Down
7 changes: 4 additions & 3 deletions docs/zh/api/05-sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,7 @@ if task != nil {
"result": {
"session_id": "a1b2c3d4",
"archive_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001",
"memory_diff_uri": "viking://user/alice/sessions/a1b2c3d4/history/archive_001/memory_diff.json",
"memories_extracted": {
"profile": 1,
"preferences": 2,
Expand Down Expand Up @@ -1351,15 +1352,15 @@ viking://user/{user_id}/sessions/{session_id}/
│ ├── .abstract.md # Phase 2 写入(后台)
│ ├── .overview.md # Phase 2 写入(后台)
│ ├── .meta.json # 归档元数据
│ ├── memory_diff.json # Phase 2 写入(后台,记忆变更时)
│ ├── memory_diff.json # 长记忆抽取完成时写入
│ ├── .done # Phase 2 完成标记
│ └── .failed.json # Phase 2 失败标记
└── archive_002/
```

### memory_diff.json 数据结构

每次提交会在归档目录写入 `memory_diff.json`,记录所有记忆变更,便于审计和回溯:
长记忆抽取成功运行时,会在归档目录写入 `memory_diff.json`,记录所有记忆变更,便于审计和回溯:

```json
{
Expand Down Expand Up @@ -1408,7 +1409,7 @@ viking://user/{user_id}/sessions/{session_id}/
| `summary.total_updates` | int | 修改记忆数 |
| `summary.total_deletes` | int | 删除记忆数 |

即使没有记忆操作,也会写入空结构的 `memory_diff.json`(所有计数为零)。
如果长记忆抽取已运行但没有产生记忆操作,也会写入空结构的 `memory_diff.json`(所有计数为零)。

---

Expand Down
65 changes: 42 additions & 23 deletions openviking/session/compressor_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ async def extract_long_term_memories(
telemetry.set("memory.extract.skipped", 0)

from openviking.storage.transaction import get_lock_manager, init_lock_manager
from openviking.storage.viking_fs import get_viking_fs

# 初始化锁管理器(仅在有 AGFS 时使用锁机制)
viking_fs = get_viking_fs()
Expand Down Expand Up @@ -381,33 +380,36 @@ async def extract_long_term_memories(

if operations is None:
tracer.info("No memory operations generated")
return []

updater = self._get_or_create_updater(registry, transaction_handle)
result = MemoryUpdateResult()
else:
updater = self._get_or_create_updater(registry, transaction_handle)

# Apply operations with isolation_handler
result = await updater.apply_operations(
operations,
ctx,
extract_context=extract_context,
isolation_handler=isolation_handler,
)
# Apply operations with isolation_handler
result = await updater.apply_operations(
operations,
ctx,
extract_context=extract_context,
isolation_handler=isolation_handler,
)

tracer.info(
f"Applied memory operations: written={len(result.written_uris)}, "
f"edited={len(result.edited_uris)}, deleted={len(result.deleted_uris)}, "
f"errors={len(result.errors)}"
)
tracer.info(
f"Applied memory operations: written={len(result.written_uris)}, "
f"edited={len(result.edited_uris)}, deleted={len(result.deleted_uris)}, "
f"errors={len(result.errors)}"
)

# Write memory_diff.json to archive directory
if archive_uri and viking_fs:
memory_diff = await self._build_memory_diff(
result=result,
operations=operations,
viking_fs=viking_fs,
ctx=ctx,
archive_uri=archive_uri,
)
if operations is None:
memory_diff = self._empty_memory_diff(archive_uri)
else:
memory_diff = await self._build_memory_diff(
result=result,
operations=operations,
viking_fs=viking_fs,
ctx=ctx,
archive_uri=archive_uri,
)
await viking_fs.write_file(
uri=f"{archive_uri}/memory_diff.json",
content=json.dumps(memory_diff, ensure_ascii=False, indent=4),
Expand Down Expand Up @@ -474,6 +476,23 @@ async def extract_long_term_memories(
except Exception as e:
logger.warning(f"Failed to release transaction lock: {e}")

@staticmethod
def _empty_memory_diff(archive_uri: str = "") -> Dict[str, Any]:
return {
"archive_uri": archive_uri,
"extracted_at": datetime.utcnow().isoformat() + "Z",
"operations": {
"adds": [],
"updates": [],
"deletes": [],
},
"summary": {
"total_adds": 0,
"total_updates": 0,
"total_deletes": 0,
},
}

@tracer(ignore_result=True)
async def extract_execution_memories(
self,
Expand Down
57 changes: 35 additions & 22 deletions openviking/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ async def _run_memory_extraction(
memories_extracted: Dict[str, int] = {}
extracted_skill_results: list[dict] = []
active_count_updated = 0
memory_diff_uri: Optional[str] = None
telemetry = OperationTelemetry(operation="session_commit_phase2", enabled=True)
archive_index = self._archive_index_from_uri(archive_uri)
redo_task_id: Optional[str] = None
Expand Down Expand Up @@ -1483,6 +1484,14 @@ async def _run_execution_memory_extraction() -> Any:
if extraction_error is not None:
raise extraction_error

if long_term_has_work and self._viking_fs:
candidate_memory_diff_uri = f"{archive_uri}/memory_diff.json"
if await self._viking_fs.exists(
candidate_memory_diff_uri,
ctx=self.ctx,
):
memory_diff_uri = candidate_memory_diff_uri

total_extracted = 0
for label, result in zip(extraction_labels[1:], _results[1:], strict=True):
if isinstance(result, dict):
Expand Down Expand Up @@ -1580,30 +1589,34 @@ async def _run_execution_memory_extraction() -> Any:
# would have raised above and marked the archive .failed.json.
await self._write_done_file(archive_uri, first_message_id, last_message_id)

await tracker.complete(
task_id,
{
"session_id": self.session_id,
"archive_uri": archive_uri,
"memories_extracted": memories_extracted,
"session_skills_extracted": len(extracted_skill_results),
"session_skill_uris": [
item.get("uri") or item.get("root_uri")
for item in extracted_skill_results
if isinstance(item, dict) and (item.get("uri") or item.get("root_uri"))
],
"active_count_updated": active_count_updated,
"token_usage": {
"llm": dict(self._meta.llm_token_usage),
"embedding": dict(self._meta.embedding_token_usage),
"total": {
"total_tokens": self._meta.llm_token_usage["total_tokens"]
+ self._meta.embedding_token_usage["total_tokens"],
"cached_tokens": self._meta.llm_token_usage["cached_tokens"],
"reasoning_tokens": self._meta.llm_token_usage["reasoning_tokens"],
},
result_payload = {
"session_id": self.session_id,
"archive_uri": archive_uri,
"memories_extracted": memories_extracted,
"session_skills_extracted": len(extracted_skill_results),
"session_skill_uris": [
item.get("uri") or item.get("root_uri")
for item in extracted_skill_results
if isinstance(item, dict) and (item.get("uri") or item.get("root_uri"))
],
"active_count_updated": active_count_updated,
"token_usage": {
"llm": dict(self._meta.llm_token_usage),
"embedding": dict(self._meta.embedding_token_usage),
"total": {
"total_tokens": self._meta.llm_token_usage["total_tokens"]
+ self._meta.embedding_token_usage["total_tokens"],
"cached_tokens": self._meta.llm_token_usage["cached_tokens"],
"reasoning_tokens": self._meta.llm_token_usage["reasoning_tokens"],
},
},
}
if memory_diff_uri:
result_payload["memory_diff_uri"] = memory_diff_uri

await tracker.complete(
task_id,
result_payload,
account_id=self.ctx.account_id,
user_id=self.ctx.user.user_id,
)
Expand Down
1 change: 1 addition & 0 deletions tests/server/test_api_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ async def test_compress_session(client: httpx.AsyncClient):
body = resp.json()
assert body["status"] == "ok"
assert body["result"]["status"] == "accepted"
assert "memory_diff_uri" not in body["result"]
assert "usage" not in body
assert "telemetry" not in body

Expand Down
15 changes: 15 additions & 0 deletions tests/session/test_session_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async def test_commit_success(self, session_with_messages: Session):
assert result.get("status") == "accepted"
assert "session_id" in result
assert result.get("task_id") is not None
assert "memory_diff_uri" not in result
assert "memories_extracted" not in result

async def test_commit_extracts_memories(
Expand All @@ -59,6 +60,17 @@ async def test_commit_extracts_memories(
# Wait for background memory extraction to complete
task_result = await _wait_for_task(task_id)
assert task_result["status"] == "completed"
assert (
task_result["result"]["memory_diff_uri"]
== f"{task_result['result']['archive_uri']}/memory_diff.json"
)
memory_diff = json.loads(
await session_with_messages._viking_fs.read_file(
task_result["result"]["memory_diff_uri"],
ctx=session_with_messages.ctx,
)
)
assert memory_diff["archive_uri"] == task_result["result"]["archive_uri"]
assert "memories_extracted" in task_result["result"]
memory_counts = task_result["result"]["memories_extracted"]
assert isinstance(memory_counts, dict)
Expand Down Expand Up @@ -96,6 +108,7 @@ async def test_commit_reports_session_skills_separately(
assert task_result["result"]["session_skill_uris"] == [
"viking://user/test/skills/code-review"
]
assert "memory_diff_uri" not in task_result["result"]
session_with_messages._session_compressor.extract_long_term_memories.assert_not_awaited()
session_with_messages._session_compressor.extract_execution_memories.assert_awaited_once()
call_kwargs = (
Expand Down Expand Up @@ -131,6 +144,7 @@ async def test_commit_skips_session_skills_without_execution_memory_type(
assert task_result["status"] == "completed"
assert task_result["result"]["memories_extracted"] == {}
assert task_result["result"]["session_skills_extracted"] == 0
assert "memory_diff_uri" not in task_result["result"]
session_with_messages._session_compressor.extract_long_term_memories.assert_awaited_once()
session_with_messages._session_compressor.extract_execution_memories.assert_not_awaited()

Expand All @@ -156,6 +170,7 @@ async def test_commit_skips_session_skill_extraction_when_disabled(
assert task_result["status"] == "completed"
assert task_result["result"]["session_skills_extracted"] == 0
assert task_result["result"]["session_skill_uris"] == []
assert "memory_diff_uri" not in task_result["result"]
session_with_messages._session_compressor.extract_long_term_memories.assert_awaited_once()
session_with_messages._session_compressor.extract_execution_memories.assert_awaited_once()
call_kwargs = (
Expand Down
Loading