From a4ed532fd6c77aa487139744cda57807d7a60294 Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 21:41:16 -0700 Subject: [PATCH 01/13] feat: add generic WSC store port --- crates/warp-core/src/wsc/mod.rs | 6 + crates/warp-core/src/wsc/store.rs | 429 ++++++++++++++++++++++ crates/warp-core/tests/wsc_store_tests.rs | 109 ++++++ 3 files changed, 544 insertions(+) create mode 100644 crates/warp-core/src/wsc/store.rs create mode 100644 crates/warp-core/tests/wsc_store_tests.rs diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index fcc76aa9..2f6172a5 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -60,6 +60,7 @@ pub mod build; pub mod read; +pub mod store; pub mod types; pub mod validate; pub mod view; @@ -68,6 +69,11 @@ pub mod write; // Re-exports for convenient access pub use build::build_one_warp_input; pub use read::ReadError; +pub use store::{ + InMemoryWscStore, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, + WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, + WscStoreWriteReceipt, +}; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; pub use write::{write_wsc_one_warp, OneWarpInput}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs new file mode 100644 index 00000000..166bcb68 --- /dev/null +++ b/crates/warp-core/src/wsc/store.rs @@ -0,0 +1,429 @@ +// SPDX-License-Identifier: Apache-2.0 +// © James Ross Ω FLYING•ROBOTS +//! Generic WSC storage port and deterministic envelope format. + +use std::collections::BTreeMap; + +use blake3::Hasher; + +use crate::ident::Hash; + +use super::validate::validate_wsc; +use super::view::WscFile; + +const WSC_STORE_ENVELOPE_MAGIC: &[u8; 8] = b"ECWSCST1"; +const WSC_STORE_ENVELOPE_VERSION: u16 = 1; +const WSC_STORE_ENVELOPE_ID_DOMAIN: &[u8] = b"echo:wsc_store:envelope_id:v1\0"; +const WSC_STORE_BYTES_DOMAIN: &[u8] = b"echo:wsc_store:wsc_bytes:v1\0"; +const HEADER_LEN: usize = 124; + +/// Stable identifier for a WSC store envelope. +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct WscStoreEnvelopeId(Hash); + +impl WscStoreEnvelopeId { + /// Builds an envelope id from a canonical digest. + #[must_use] + pub const fn from_hash(hash: Hash) -> Self { + Self(hash) + } + + /// Returns the canonical digest bytes. + #[must_use] + pub const fn as_hash(self) -> Hash { + self.0 + } +} + +/// Generic kind of WSC material stored by Echo. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum WscStoreRecordKind { + /// Materialized causal snapshot. + Snapshot, + /// Causal-history material. + CausalHistory, + /// Retained evidence material. + RetainedEvidence, +} + +impl WscStoreRecordKind { + const fn code(self) -> u16 { + match self { + Self::Snapshot => 1, + Self::CausalHistory => 2, + Self::RetainedEvidence => 3, + } + } + + const fn from_code(code: u16) -> Option { + match code { + 1 => Some(Self::Snapshot), + 2 => Some(Self::CausalHistory), + 3 => Some(Self::RetainedEvidence), + _ => None, + } + } +} + +/// Subject named by a WSC store obstruction. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum WscStoreSubject { + /// Envelope identity was implicated. + Envelope { + /// Envelope id. + envelope_id: WscStoreEnvelopeId, + }, + /// Encoded bytes were malformed near an offset. + EnvelopeBytes { + /// Byte offset implicated by the obstruction. + offset: usize, + }, + /// Encoded bytes carried a digest mismatch. + EnvelopeDigest { + /// Expected digest recorded by the envelope. + expected: Hash, + /// Actual digest computed from the payload. + actual: Hash, + }, + /// WSC payload was invalid. + WscPayload { + /// Digest of the invalid WSC payload. + digest: Hash, + }, +} + +/// Generic WSC store obstruction kind. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum WscStoreObstructionKind { + /// Requested envelope was absent. + MissingEnvelope, + /// Envelope header or structural fields were malformed. + InvalidEnvelope, + /// WSC payload failed WSC parsing or validation. + InvalidWsc, + /// Encoded envelope digest did not match its payload. + DigestMismatch, + /// Existing envelope id maps to different material. + DuplicateEnvelopeMismatch, +} + +/// Typed obstruction returned instead of hidden fallback or invented success. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct WscStoreObstruction { + /// Obstruction kind. + pub kind: WscStoreObstructionKind, + /// Obstruction subject. + pub subject: WscStoreSubject, +} + +impl WscStoreObstruction { + fn invalid_envelope(offset: usize) -> Self { + Self { + kind: WscStoreObstructionKind::InvalidEnvelope, + subject: WscStoreSubject::EnvelopeBytes { offset }, + } + } + + fn invalid_wsc(wsc_digest: Hash) -> Self { + Self { + kind: WscStoreObstructionKind::InvalidWsc, + subject: WscStoreSubject::WscPayload { digest: wsc_digest }, + } + } + + fn digest_mismatch(expected: Hash, actual: Hash) -> Self { + Self { + kind: WscStoreObstructionKind::DigestMismatch, + subject: WscStoreSubject::EnvelopeDigest { expected, actual }, + } + } + + fn missing_envelope(envelope_id: WscStoreEnvelopeId) -> Self { + Self { + kind: WscStoreObstructionKind::MissingEnvelope, + subject: WscStoreSubject::Envelope { envelope_id }, + } + } + + fn duplicate_mismatch(envelope_id: WscStoreEnvelopeId) -> Self { + Self { + kind: WscStoreObstructionKind::DuplicateEnvelopeMismatch, + subject: WscStoreSubject::Envelope { envelope_id }, + } + } +} + +/// Deterministic WSC store envelope. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WscStoreEnvelope { + id: WscStoreEnvelopeId, + record_kind: WscStoreRecordKind, + basis_digest: Hash, + schema_hash: Hash, + tick: u64, + wsc_digest: Hash, + wsc_len: u64, + wsc_bytes: Vec, +} + +impl WscStoreEnvelope { + /// Builds and validates a WSC store envelope. + /// + /// # Errors + /// + /// Returns [`WscStoreObstructionKind::InvalidWsc`] when the payload is not + /// valid WSC material. + pub fn validated( + record_kind: WscStoreRecordKind, + basis_digest: Hash, + wsc_bytes: Vec, + ) -> Result { + let wsc_digest = digest_wsc_bytes(&wsc_bytes); + let file = WscFile::from_bytes(wsc_bytes.clone()) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + validate_wsc(&file).map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + let schema_hash = *file.schema_hash(); + let tick = file.tick(); + let wsc_len = u64::try_from(wsc_bytes.len()) + .map_err(|_| WscStoreObstruction::invalid_envelope(HEADER_LEN))?; + let id = derive_envelope_id( + record_kind, + &basis_digest, + &schema_hash, + tick, + &wsc_digest, + wsc_len, + ); + Ok(Self { + id, + record_kind, + basis_digest, + schema_hash, + tick, + wsc_digest, + wsc_len, + wsc_bytes, + }) + } + + /// Decodes and validates a deterministic WSC store envelope. + /// + /// # Errors + /// + /// Returns a typed WSC store obstruction for malformed envelopes, digest + /// mismatch, or invalid WSC payloads. + pub fn decode(bytes: &[u8]) -> Result { + let magic = read_array::<8>(bytes, 0)?; + if &magic != WSC_STORE_ENVELOPE_MAGIC { + return Err(WscStoreObstruction::invalid_envelope(0)); + } + let version = u16::from_le_bytes(read_array::<2>(bytes, 8)?); + if version != WSC_STORE_ENVELOPE_VERSION { + return Err(WscStoreObstruction::invalid_envelope(8)); + } + let record_kind_code = u16::from_le_bytes(read_array::<2>(bytes, 10)?); + let record_kind = WscStoreRecordKind::from_code(record_kind_code) + .ok_or_else(|| WscStoreObstruction::invalid_envelope(10))?; + let schema_hash = read_array::<32>(bytes, 12)?; + let basis_digest = read_array::<32>(bytes, 44)?; + let wsc_digest = read_array::<32>(bytes, 76)?; + let tick = u64::from_le_bytes(read_array::<8>(bytes, 108)?); + let wsc_len = u64::from_le_bytes(read_array::<8>(bytes, 116)?); + let payload_start = 124usize; + let payload_len = + usize::try_from(wsc_len).map_err(|_| WscStoreObstruction::invalid_envelope(116))?; + let payload_end = payload_start + .checked_add(payload_len) + .ok_or_else(|| WscStoreObstruction::invalid_envelope(payload_start))?; + let payload = bytes + .get(payload_start..payload_end) + .ok_or_else(|| WscStoreObstruction::invalid_envelope(payload_start))?; + if payload_end != bytes.len() { + return Err(WscStoreObstruction::invalid_envelope(payload_end)); + } + let actual_digest = digest_wsc_bytes(payload); + if actual_digest != wsc_digest { + return Err(WscStoreObstruction::digest_mismatch( + wsc_digest, + actual_digest, + )); + } + let envelope = Self::validated(record_kind, basis_digest, payload.to_vec())?; + if envelope.schema_hash != schema_hash + || envelope.tick != tick + || envelope.wsc_len != wsc_len + { + return Err(WscStoreObstruction::invalid_envelope(12)); + } + Ok(envelope) + } + + /// Encodes this envelope into deterministic bytes. + #[must_use] + pub fn encode(&self) -> Vec { + let mut bytes = Vec::with_capacity(HEADER_LEN + self.wsc_bytes.len()); + bytes.extend_from_slice(WSC_STORE_ENVELOPE_MAGIC); + bytes.extend_from_slice(&WSC_STORE_ENVELOPE_VERSION.to_le_bytes()); + bytes.extend_from_slice(&self.record_kind.code().to_le_bytes()); + bytes.extend_from_slice(&self.schema_hash); + bytes.extend_from_slice(&self.basis_digest); + bytes.extend_from_slice(&self.wsc_digest); + bytes.extend_from_slice(&self.tick.to_le_bytes()); + bytes.extend_from_slice(&self.wsc_len.to_le_bytes()); + bytes.extend_from_slice(&self.wsc_bytes); + bytes + } + + /// Returns the envelope id. + #[must_use] + pub const fn id(&self) -> WscStoreEnvelopeId { + self.id + } + + /// Returns the generic record kind. + #[must_use] + pub const fn record_kind(&self) -> WscStoreRecordKind { + self.record_kind + } + + /// Returns the basis digest. + #[must_use] + pub const fn basis_digest(&self) -> &Hash { + &self.basis_digest + } + + /// Returns the WSC schema hash. + #[must_use] + pub const fn schema_hash(&self) -> &Hash { + &self.schema_hash + } + + /// Returns the WSC tick. + #[must_use] + pub const fn tick(&self) -> u64 { + self.tick + } + + /// Returns the WSC payload digest. + #[must_use] + pub const fn wsc_digest(&self) -> &Hash { + &self.wsc_digest + } + + /// Returns the WSC bytes. + #[must_use] + pub fn wsc_bytes(&self) -> &[u8] { + &self.wsc_bytes + } +} + +/// Receipt returned after a WSC envelope write. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct WscStoreWriteReceipt { + /// Written envelope id. + pub envelope_id: WscStoreEnvelopeId, + /// WSC payload digest. + pub wsc_digest: Hash, + /// Encoded envelope byte length. + pub encoded_len: u64, +} + +/// Generic WSC store port. +pub trait WscStorePort { + /// Writes a validated WSC envelope. + fn write_envelope( + &mut self, + envelope: WscStoreEnvelope, + ) -> Result; + + /// Reads a WSC envelope by id. + fn read_envelope( + &self, + envelope_id: WscStoreEnvelopeId, + ) -> Result; + + /// Lists known envelope ids in deterministic order. + fn list_envelopes(&self) -> Vec; +} + +/// In-memory WSC store implementation for tests and adapters. +#[derive(Debug, Default)] +pub struct InMemoryWscStore { + envelopes: BTreeMap, +} + +impl WscStorePort for InMemoryWscStore { + fn write_envelope( + &mut self, + envelope: WscStoreEnvelope, + ) -> Result { + let envelope_id = envelope.id(); + if let Some(existing) = self.envelopes.get(&envelope_id) { + if existing != &envelope { + return Err(WscStoreObstruction::duplicate_mismatch(envelope_id)); + } + } + let encoded_len = u64::try_from(envelope.encode().len()) + .map_err(|_| WscStoreObstruction::invalid_envelope(HEADER_LEN))?; + let receipt = WscStoreWriteReceipt { + envelope_id, + wsc_digest: envelope.wsc_digest, + encoded_len, + }; + self.envelopes.insert(envelope_id, envelope); + Ok(receipt) + } + + fn read_envelope( + &self, + envelope_id: WscStoreEnvelopeId, + ) -> Result { + self.envelopes + .get(&envelope_id) + .cloned() + .ok_or_else(|| WscStoreObstruction::missing_envelope(envelope_id)) + } + + fn list_envelopes(&self) -> Vec { + self.envelopes.keys().copied().collect() + } +} + +fn read_array(bytes: &[u8], offset: usize) -> Result<[u8; N], WscStoreObstruction> { + let end = offset + .checked_add(N) + .ok_or_else(|| WscStoreObstruction::invalid_envelope(offset))?; + let slice = bytes + .get(offset..end) + .ok_or_else(|| WscStoreObstruction::invalid_envelope(offset))?; + let mut out = [0; N]; + out.copy_from_slice(slice); + Ok(out) +} + +fn derive_envelope_id( + record_kind: WscStoreRecordKind, + basis_digest: &Hash, + schema_hash: &Hash, + tick: u64, + wsc_digest: &Hash, + wsc_len: u64, +) -> WscStoreEnvelopeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_STORE_ENVELOPE_ID_DOMAIN); + hasher.update(&record_kind.code().to_le_bytes()); + hasher.update(basis_digest); + hasher.update(schema_hash); + hasher.update(&tick.to_le_bytes()); + hasher.update(wsc_digest); + hasher.update(&wsc_len.to_le_bytes()); + WscStoreEnvelopeId(hasher.finalize().into()) +} + +fn digest_wsc_bytes(bytes: &[u8]) -> Hash { + let mut hasher = Hasher::new(); + hasher.update(WSC_STORE_BYTES_DOMAIN); + hasher.update(bytes); + hasher.finalize().into() +} diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs new file mode 100644 index 00000000..6cc57f44 --- /dev/null +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: Apache-2.0 +// © James Ross Ω FLYING•ROBOTS +//! WSC store port contract tests. + +#![allow(clippy::expect_used)] + +use warp_core::wsc::{ + write_wsc_one_warp, InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, + WscStorePort, WscStoreRecordKind, WscStoreSubject, +}; + +#[test] +fn wsc_store_envelope_round_trips_deterministically() { + let bytes = fixture_wsc_bytes(7); + let basis_digest = [9; 32]; + let envelope = WscStoreEnvelope::validated( + WscStoreRecordKind::CausalHistory, + basis_digest, + bytes.clone(), + ) + .expect("valid WSC envelope"); + + let encoded_a = envelope.encode(); + let encoded_b = envelope.encode(); + assert_eq!(encoded_a, encoded_b); + + let decoded = WscStoreEnvelope::decode(&encoded_a).expect("decoded envelope"); + assert_eq!(decoded, envelope); + assert_eq!(decoded.wsc_bytes(), bytes.as_slice()); + assert_eq!(decoded.basis_digest(), &basis_digest); +} + +#[test] +fn in_memory_wsc_store_writes_reads_and_lists_envelopes() { + let envelope = + WscStoreEnvelope::validated(WscStoreRecordKind::Snapshot, [3; 32], fixture_wsc_bytes(11)) + .expect("valid WSC envelope"); + let id = envelope.id(); + let mut store = InMemoryWscStore::default(); + + let receipt = store + .write_envelope(envelope.clone()) + .expect("write envelope"); + assert_eq!(receipt.envelope_id, id); + assert_eq!(store.list_envelopes(), vec![id]); + assert_eq!(store.read_envelope(id), Ok(envelope)); +} + +#[test] +fn in_memory_wsc_store_missing_envelope_returns_typed_obstruction() { + let store = InMemoryWscStore::default(); + let missing_id = WscStoreEnvelope::validated( + WscStoreRecordKind::RetainedEvidence, + [4; 32], + fixture_wsc_bytes(13), + ) + .expect("valid WSC envelope") + .id(); + + let obstruction = store + .read_envelope(missing_id) + .expect_err("missing envelope obstructs"); + assert_eq!(obstruction.kind, WscStoreObstructionKind::MissingEnvelope); + assert_eq!( + obstruction.subject, + WscStoreSubject::Envelope { + envelope_id: missing_id + } + ); +} + +#[test] +fn wsc_store_decode_rejects_digest_mismatch() { + let envelope = + WscStoreEnvelope::validated(WscStoreRecordKind::Snapshot, [5; 32], fixture_wsc_bytes(17)) + .expect("valid WSC envelope"); + let mut encoded = envelope.encode(); + let last = encoded.last_mut().expect("encoded envelope byte"); + *last ^= 0xff; + + let obstruction = WscStoreEnvelope::decode(&encoded).expect_err("digest mismatch obstructs"); + assert_eq!(obstruction.kind, WscStoreObstructionKind::DigestMismatch); +} + +#[test] +fn wsc_store_module_has_no_jedit_nouns() { + let source = include_str!("../src/wsc/store.rs"); + assert!(!source.to_lowercase().contains("jedit")); +} + +fn fixture_wsc_bytes(tick: u64) -> Vec { + let input = OneWarpInput { + warp_id: [1; 32], + root_node_id: [2; 32], + nodes: vec![warp_core::wsc::types::NodeRow { + node_id: [2; 32], + node_type: [3; 32], + }], + edges: vec![], + out_index: vec![warp_core::wsc::types::Range::default()], + out_edges: vec![], + node_atts_index: vec![warp_core::wsc::types::Range::default()], + node_atts: vec![], + edge_atts_index: vec![], + edge_atts: vec![], + blobs: vec![], + }; + write_wsc_one_warp(&input, [8; 32], tick).expect("fixture WSC bytes") +} From 9893662468b91cb94d191eff9f39df5cadde41ae Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 21:49:30 -0700 Subject: [PATCH 02/13] feat: persist accepted submissions in WSC --- crates/warp-core/src/causal_wal.rs | 67 ++++++--- crates/warp-core/src/wsc/mod.rs | 1 + crates/warp-core/src/wsc/store.rs | 164 +++++++++++++++++++++- crates/warp-core/tests/wsc_store_tests.rs | 35 +++++ 4 files changed, 245 insertions(+), 22 deletions(-) diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index 6ad80e1b..db313043 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -2081,6 +2081,51 @@ pub struct RecoveredSubmissionIndex { } impl RecoveredSubmissionIndex { + /// Builds a recovered index from accepted submission records. + /// + /// Every recovered record starts as accepted pending because no scheduler + /// decision material is supplied by this constructor. + /// + /// # Errors + /// + /// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one + /// submission id is associated with conflicting canonical envelope digests. + pub fn from_acceptance_records(records: I) -> Result + where + I: IntoIterator, + { + let mut index = Self::default(); + for record in records { + index.insert_acceptance_record(record)?; + } + Ok(index) + } + + fn insert_acceptance_record( + &mut self, + record: SubmissionAcceptanceRecord, + ) -> Result<(), WalRecoveryIndexError> { + if let Some(existing) = self.submissions.get(&record.submission_id) { + if existing.acceptance.canonical_envelope_digest != record.canonical_envelope_digest { + return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict { + submission_id: record.submission_id, + }); + } + } + self.envelope_to_submissions + .entry(record.canonical_envelope_digest) + .or_default() + .insert(record.submission_id); + self.submissions + .entry(record.submission_id) + .or_insert(RecoveredSubmissionEntry { + acceptance: record, + posture: RecoveredSubmissionPosture::AcceptedPending, + receipt_digest: None, + }); + Ok(()) + } + /// Returns a recovered submission entry. #[must_use] pub fn get(&self, submission_id: &Hash) -> Option<&RecoveredSubmissionEntry> { @@ -4058,27 +4103,7 @@ pub fn recover_submission_index( let record = SubmissionAcceptanceRecord::from_payload_bytes( &frame.payload.canonical_bytes, )?; - if let Some(existing) = index.submissions.get(&record.submission_id) { - if existing.acceptance.canonical_envelope_digest - != record.canonical_envelope_digest - { - return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict { - submission_id: record.submission_id, - }); - } - } - index - .envelope_to_submissions - .entry(record.canonical_envelope_digest) - .or_default() - .insert(record.submission_id); - index.submissions.entry(record.submission_id).or_insert( - RecoveredSubmissionEntry { - acceptance: record, - posture: RecoveredSubmissionPosture::AcceptedPending, - receipt_digest: None, - }, - ); + index.insert_acceptance_record(record)?; } WalRecordKind::TickReceiptRecorded => { let receipt = diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index 2f6172a5..b8d3ca29 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -70,6 +70,7 @@ pub mod write; pub use build::build_one_warp_input; pub use read::ReadError; pub use store::{ + accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, InMemoryWscStore, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, WscStoreWriteReceipt, diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 166bcb68..c3f67efe 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -5,16 +5,35 @@ use std::collections::BTreeMap; use blake3::Hasher; +use bytes::Bytes; -use crate::ident::Hash; +use crate::attachment::{AtomPayload, AttachmentValue}; +use crate::causal_wal::SubmissionAcceptanceRecord; +use crate::graph::GraphStore; +use crate::ident::{make_node_id, make_type_id, make_warp_id, EdgeId, Hash, NodeId}; +use crate::record::{EdgeRecord, NodeRecord}; +use super::build::build_one_warp_input; +use super::types::AttRow; use super::validate::validate_wsc; use super::view::WscFile; +use super::write::write_wsc_one_warp; const WSC_STORE_ENVELOPE_MAGIC: &[u8; 8] = b"ECWSCST1"; const WSC_STORE_ENVELOPE_VERSION: u16 = 1; const WSC_STORE_ENVELOPE_ID_DOMAIN: &[u8] = b"echo:wsc_store:envelope_id:v1\0"; const WSC_STORE_BYTES_DOMAIN: &[u8] = b"echo:wsc_store:wsc_bytes:v1\0"; +const WSC_ACCEPTED_SUBMISSION_BASIS_DOMAIN: &[u8] = + b"echo:wsc_store:accepted_submission_basis:v1\0"; +const WSC_ACCEPTED_SUBMISSION_NODE_DOMAIN: &[u8] = b"echo:wsc_store:accepted_submission_node:v1\0"; +const WSC_ACCEPTED_SUBMISSION_EDGE_DOMAIN: &[u8] = b"echo:wsc_store:accepted_submission_edge:v1\0"; +const WSC_ACCEPTED_SUBMISSION_SCHEMA: &str = "echo/wsc-store/accepted-submissions/v1"; +const WSC_ACCEPTED_SUBMISSION_WARP: &str = "echo/wsc-store/accepted-submissions"; +const WSC_ACCEPTED_SUBMISSION_ROOT: &str = "echo/wsc-store/accepted-submissions/root"; +const WSC_ACCEPTED_SUBMISSION_NODE_TYPE: &str = "echo/wsc-store/accepted-submissions/node/v1"; +const WSC_ACCEPTED_SUBMISSION_EDGE_TYPE: &str = "echo/wsc-store/accepted-submissions/member/v1"; +const WSC_ACCEPTED_SUBMISSION_ATTACHMENT_TYPE: &str = + "echo/wsc-store/accepted-submissions/record/v1"; const HEADER_LEN: usize = 124; /// Stable identifier for a WSC store envelope. @@ -390,6 +409,97 @@ impl WscStorePort for InMemoryWscStore { } } +/// Builds a generic WSC envelope for accepted submission records. +/// +/// Duplicate identical records are represented once. A duplicate submission id +/// with different material is a typed obstruction. +/// +/// # Errors +/// +/// Returns [`WscStoreObstructionKind::DuplicateEnvelopeMismatch`] for +/// conflicting duplicate submission ids or [`WscStoreObstructionKind::InvalidWsc`] +/// when generated WSC material fails validation. +pub fn accepted_submission_records_to_wsc_envelope( + records: &[SubmissionAcceptanceRecord], +) -> Result { + let records = canonical_accepted_submission_records(records)?; + let mut store = GraphStore::new(make_warp_id(WSC_ACCEPTED_SUBMISSION_WARP)); + let root = make_node_id(WSC_ACCEPTED_SUBMISSION_ROOT); + store.insert_node( + root, + NodeRecord { + ty: make_type_id(WSC_ACCEPTED_SUBMISSION_NODE_TYPE), + }, + ); + for record in &records { + let node = accepted_submission_node_id(&record.submission_id); + store.insert_node( + node, + NodeRecord { + ty: make_type_id(WSC_ACCEPTED_SUBMISSION_NODE_TYPE), + }, + ); + store.insert_edge( + root, + EdgeRecord { + id: accepted_submission_edge_id(&record.submission_id), + from: root, + to: node, + ty: make_type_id(WSC_ACCEPTED_SUBMISSION_EDGE_TYPE), + }, + ); + store.set_node_attachment( + node, + Some(AttachmentValue::Atom(AtomPayload::new( + make_type_id(WSC_ACCEPTED_SUBMISSION_ATTACHMENT_TYPE), + Bytes::from(record.to_payload_bytes()), + ))), + ); + } + let basis_digest = accepted_submission_basis_digest(&records); + let input = build_one_warp_input(&store, root); + let wsc_bytes = write_wsc_one_warp(&input, make_type_id(WSC_ACCEPTED_SUBMISSION_SCHEMA).0, 0) + .map_err(|_| WscStoreObstruction::invalid_wsc(basis_digest))?; + WscStoreEnvelope::validated(WscStoreRecordKind::CausalHistory, basis_digest, wsc_bytes) +} + +/// Recovers accepted submission records from a generic WSC envelope. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when the envelope is not accepted +/// submission causal-history material or when record payloads are malformed. +pub fn accepted_submission_records_from_wsc_envelope( + envelope: &WscStoreEnvelope, +) -> Result, WscStoreObstruction> { + if envelope.record_kind() != WscStoreRecordKind::CausalHistory { + return Err(WscStoreObstruction::invalid_envelope(0)); + } + let wsc_digest = *envelope.wsc_digest(); + let file = WscFile::from_bytes(envelope.wsc_bytes().to_vec()) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + validate_wsc(&file).map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + if file.schema_hash() != &make_type_id(WSC_ACCEPTED_SUBMISSION_SCHEMA).0 { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + let view = file + .warp_view(0) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + let mut records = Vec::new(); + for node_index in 0..view.nodes().len() { + for attachment in view.node_attachments(node_index) { + if attachment.type_or_warp != make_type_id(WSC_ACCEPTED_SUBMISSION_ATTACHMENT_TYPE).0 { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + let payload = atom_payload_bytes(&view, attachment, wsc_digest)?; + let record = SubmissionAcceptanceRecord::from_payload_bytes(payload) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + records.push(record); + } + } + canonical_accepted_submission_records(&records) +} + fn read_array(bytes: &[u8], offset: usize) -> Result<[u8; N], WscStoreObstruction> { let end = offset .checked_add(N) @@ -427,3 +537,55 @@ fn digest_wsc_bytes(bytes: &[u8]) -> Hash { hasher.update(bytes); hasher.finalize().into() } + +fn canonical_accepted_submission_records( + records: &[SubmissionAcceptanceRecord], +) -> Result, WscStoreObstruction> { + let mut by_submission = BTreeMap::new(); + for record in records { + if let Some(existing) = by_submission.get(&record.submission_id) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.submission_id), + )); + } + } + by_submission.insert(record.submission_id, *record); + } + Ok(by_submission.into_values().collect()) +} + +fn accepted_submission_basis_digest(records: &[SubmissionAcceptanceRecord]) -> Hash { + let mut hasher = Hasher::new(); + hasher.update(WSC_ACCEPTED_SUBMISSION_BASIS_DOMAIN); + for record in records { + hasher.update(&record.to_payload_bytes()); + } + hasher.finalize().into() +} + +fn accepted_submission_node_id(submission_id: &Hash) -> NodeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_ACCEPTED_SUBMISSION_NODE_DOMAIN); + hasher.update(submission_id); + NodeId(hasher.finalize().into()) +} + +fn accepted_submission_edge_id(submission_id: &Hash) -> EdgeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_ACCEPTED_SUBMISSION_EDGE_DOMAIN); + hasher.update(submission_id); + EdgeId(hasher.finalize().into()) +} + +fn atom_payload_bytes<'a>( + view: &'a super::view::WarpView<'a>, + attachment: &AttRow, + wsc_digest: Hash, +) -> Result<&'a [u8], WscStoreObstruction> { + if !attachment.is_atom() { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + view.blob_for_attachment(attachment) + .ok_or_else(|| WscStoreObstruction::invalid_wsc(wsc_digest)) +} diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 6cc57f44..45e708a1 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -4,7 +4,11 @@ #![allow(clippy::expect_used)] +use warp_core::causal_wal::{ + RecoveredSubmissionIndex, SubmissionAcceptanceRecord, SubmissionRetryPosture, +}; use warp_core::wsc::{ + accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, }; @@ -88,6 +92,28 @@ fn wsc_store_module_has_no_jedit_nouns() { assert!(!source.to_lowercase().contains("jedit")); } +#[test] +fn accepted_submission_records_round_trip_through_wsc_envelope() { + let duplicate = submission_acceptance(1, 11); + let envelope = accepted_submission_records_to_wsc_envelope(&[ + submission_acceptance(2, 22), + duplicate, + duplicate, + ]) + .expect("accepted submission WSC envelope"); + + let recovered = + accepted_submission_records_from_wsc_envelope(&envelope).expect("recovered records"); + assert_eq!(recovered, vec![duplicate, submission_acceptance(2, 22)]); + + let index = + RecoveredSubmissionIndex::from_acceptance_records(recovered).expect("recovered index"); + assert_eq!( + index.retry_posture([1; 32], [11; 32]), + SubmissionRetryPosture::AlreadyAcceptedPending + ); +} + fn fixture_wsc_bytes(tick: u64) -> Vec { let input = OneWarpInput { warp_id: [1; 32], @@ -107,3 +133,12 @@ fn fixture_wsc_bytes(tick: u64) -> Vec { }; write_wsc_one_warp(&input, [8; 32], tick).expect("fixture WSC bytes") } + +fn submission_acceptance(submission_byte: u8, envelope_byte: u8) -> SubmissionAcceptanceRecord { + SubmissionAcceptanceRecord { + submission_id: [submission_byte; 32], + canonical_envelope_digest: [envelope_byte; 32], + idempotency_key_digest: None, + acceptance_evidence_digest: [submission_byte ^ envelope_byte; 32], + } +} From 68cc77cb21f73363a8d8cd8731c7990cc88a5d09 Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 21:56:31 -0700 Subject: [PATCH 03/13] feat: persist receipt correlation in WSC --- crates/warp-core/src/causal_wal.rs | 42 +++++ crates/warp-core/src/wsc/mod.rs | 7 +- crates/warp-core/src/wsc/store.rs | 214 +++++++++++++++++++++- crates/warp-core/tests/wsc_store_tests.rs | 54 +++++- 4 files changed, 312 insertions(+), 5 deletions(-) diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index db313043..04a9a1af 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -2276,6 +2276,48 @@ pub struct RecoveredReceiptIndex { pub decisions_by_receipt: BTreeMap, } +impl RecoveredReceiptIndex { + /// Builds a recovered receipt index from receipt and correlation records. + /// + /// Tick receipt records carry decision posture. Correlation records can + /// restore ticket/submission/receipt lookup handles when decision material + /// is not present in the same source. + #[must_use] + pub fn from_receipt_correlation_records(receipts: I, correlations: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + let mut index = Self::default(); + for receipt in receipts { + index + .receipt_by_submission + .insert(receipt.submission_id, receipt.receipt_digest); + index + .receipt_by_ticket + .insert(receipt.ticket_digest, receipt.receipt_digest); + index + .ticket_by_submission + .insert(receipt.submission_id, receipt.ticket_digest); + index + .decisions_by_receipt + .insert(receipt.receipt_digest, receipt.decision); + } + for correlation in correlations { + index + .receipt_by_submission + .insert(correlation.submission_id, correlation.receipt_digest); + index + .receipt_by_ticket + .insert(correlation.ticket_digest, correlation.receipt_digest); + index + .ticket_by_submission + .insert(correlation.submission_id, correlation.ticket_digest); + } + index + } +} + /// Recovered retained material and reading index. #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct RecoveredRetentionIndex { diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index b8d3ca29..c3c7496c 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -71,9 +71,10 @@ pub use build::build_one_warp_input; pub use read::ReadError; pub use store::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, - InMemoryWscStore, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, - WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, - WscStoreWriteReceipt, + receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, + InMemoryWscStore, WscReceiptCorrelationRecords, WscStoreEnvelope, WscStoreEnvelopeId, + WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, + WscStoreSubject, WscStoreWriteReceipt, }; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index c3f67efe..b8c0ad82 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -8,7 +8,9 @@ use blake3::Hasher; use bytes::Bytes; use crate::attachment::{AtomPayload, AttachmentValue}; -use crate::causal_wal::SubmissionAcceptanceRecord; +use crate::causal_wal::{ + SubmissionAcceptanceRecord, TickReceiptRecord, WalReceiptCorrelationRecord, +}; use crate::graph::GraphStore; use crate::ident::{make_node_id, make_type_id, make_warp_id, EdgeId, Hash, NodeId}; use crate::record::{EdgeRecord, NodeRecord}; @@ -34,6 +36,18 @@ const WSC_ACCEPTED_SUBMISSION_NODE_TYPE: &str = "echo/wsc-store/accepted-submiss const WSC_ACCEPTED_SUBMISSION_EDGE_TYPE: &str = "echo/wsc-store/accepted-submissions/member/v1"; const WSC_ACCEPTED_SUBMISSION_ATTACHMENT_TYPE: &str = "echo/wsc-store/accepted-submissions/record/v1"; +const WSC_RECEIPT_CORRELATION_BASIS_DOMAIN: &[u8] = + b"echo:wsc_store:receipt_correlation_basis:v1\0"; +const WSC_RECEIPT_CORRELATION_NODE_DOMAIN: &[u8] = b"echo:wsc_store:receipt_correlation_node:v1\0"; +const WSC_RECEIPT_CORRELATION_EDGE_DOMAIN: &[u8] = b"echo:wsc_store:receipt_correlation_edge:v1\0"; +const WSC_RECEIPT_CORRELATION_SCHEMA: &str = "echo/wsc-store/receipt-correlations/v1"; +const WSC_RECEIPT_CORRELATION_WARP: &str = "echo/wsc-store/receipt-correlations"; +const WSC_RECEIPT_CORRELATION_ROOT: &str = "echo/wsc-store/receipt-correlations/root"; +const WSC_RECEIPT_CORRELATION_NODE_TYPE: &str = "echo/wsc-store/receipt-correlations/node/v1"; +const WSC_RECEIPT_CORRELATION_EDGE_TYPE: &str = "echo/wsc-store/receipt-correlations/member/v1"; +const WSC_TICK_RECEIPT_ATTACHMENT_TYPE: &str = "echo/wsc-store/receipt-correlations/receipt/v1"; +const WSC_RECEIPT_CORRELATION_ATTACHMENT_TYPE: &str = + "echo/wsc-store/receipt-correlations/correlation/v1"; const HEADER_LEN: usize = 124; /// Stable identifier for a WSC store envelope. @@ -348,6 +362,15 @@ pub struct WscStoreWriteReceipt { pub encoded_len: u64, } +/// Receipt and correlation records recovered from WSC material. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WscReceiptCorrelationRecords { + /// Tick receipt records with decision posture. + pub receipts: Vec, + /// Ticket/submission/receipt correlation records. + pub correlations: Vec, +} + /// Generic WSC store port. pub trait WscStorePort { /// Writes a validated WSC envelope. @@ -500,6 +523,98 @@ pub fn accepted_submission_records_from_wsc_envelope( canonical_accepted_submission_records(&records) } +/// Builds a generic WSC envelope for receipt and ticket correlation records. +/// +/// # Errors +/// +/// Returns a typed obstruction when generated WSC material fails validation. +pub fn receipt_correlation_records_to_wsc_envelope( + receipts: &[TickReceiptRecord], + correlations: &[WalReceiptCorrelationRecord], +) -> Result { + let receipts = canonical_tick_receipts(receipts); + let correlations = canonical_receipt_correlations(correlations); + let mut store = GraphStore::new(make_warp_id(WSC_RECEIPT_CORRELATION_WARP)); + let root = make_node_id(WSC_RECEIPT_CORRELATION_ROOT); + store.insert_node( + root, + NodeRecord { + ty: make_type_id(WSC_RECEIPT_CORRELATION_NODE_TYPE), + }, + ); + for receipt in &receipts { + insert_receipt_material_node( + &mut store, + root, + receipt_node_id(&receipt.receipt_digest), + WSC_TICK_RECEIPT_ATTACHMENT_TYPE, + receipt.to_payload_bytes(), + ); + } + for correlation in &correlations { + insert_receipt_material_node( + &mut store, + root, + correlation_node_id(&correlation.submission_id, &correlation.ticket_digest), + WSC_RECEIPT_CORRELATION_ATTACHMENT_TYPE, + correlation.to_payload_bytes(), + ); + } + let basis_digest = receipt_correlation_basis_digest(&receipts, &correlations); + let input = build_one_warp_input(&store, root); + let wsc_bytes = write_wsc_one_warp(&input, make_type_id(WSC_RECEIPT_CORRELATION_SCHEMA).0, 0) + .map_err(|_| WscStoreObstruction::invalid_wsc(basis_digest))?; + WscStoreEnvelope::validated(WscStoreRecordKind::CausalHistory, basis_digest, wsc_bytes) +} + +/// Recovers receipt and ticket correlation records from a generic WSC envelope. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when the envelope is not receipt +/// correlation material or when record payloads are malformed. +pub fn receipt_correlation_records_from_wsc_envelope( + envelope: &WscStoreEnvelope, +) -> Result { + if envelope.record_kind() != WscStoreRecordKind::CausalHistory { + return Err(WscStoreObstruction::invalid_envelope(0)); + } + let wsc_digest = *envelope.wsc_digest(); + let file = WscFile::from_bytes(envelope.wsc_bytes().to_vec()) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + validate_wsc(&file).map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + if file.schema_hash() != &make_type_id(WSC_RECEIPT_CORRELATION_SCHEMA).0 { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + let view = file + .warp_view(0) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + let mut receipts = Vec::new(); + let mut correlations = Vec::new(); + for node_index in 0..view.nodes().len() { + for attachment in view.node_attachments(node_index) { + let payload = atom_payload_bytes(&view, attachment, wsc_digest)?; + if attachment.type_or_warp == make_type_id(WSC_TICK_RECEIPT_ATTACHMENT_TYPE).0 { + let receipt = TickReceiptRecord::from_payload_bytes(payload) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + receipts.push(receipt); + } else if attachment.type_or_warp + == make_type_id(WSC_RECEIPT_CORRELATION_ATTACHMENT_TYPE).0 + { + let correlation = WalReceiptCorrelationRecord::from_payload_bytes(payload) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + correlations.push(correlation); + } else { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + } + } + Ok(WscReceiptCorrelationRecords { + receipts: canonical_tick_receipts(&receipts), + correlations: canonical_receipt_correlations(&correlations), + }) +} + fn read_array(bytes: &[u8], offset: usize) -> Result<[u8; N], WscStoreObstruction> { let end = offset .checked_add(N) @@ -589,3 +704,100 @@ fn atom_payload_bytes<'a>( view.blob_for_attachment(attachment) .ok_or_else(|| WscStoreObstruction::invalid_wsc(wsc_digest)) } + +fn canonical_tick_receipts(records: &[TickReceiptRecord]) -> Vec { + let mut by_receipt = BTreeMap::new(); + for record in records { + by_receipt.insert(record.receipt_digest, *record); + } + by_receipt.into_values().collect() +} + +fn canonical_receipt_correlations( + records: &[WalReceiptCorrelationRecord], +) -> Vec { + let mut by_correlation = BTreeMap::new(); + for record in records { + by_correlation.insert( + ( + record.submission_id, + record.ticket_digest, + record.receipt_digest, + ), + *record, + ); + } + by_correlation.into_values().collect() +} + +fn insert_receipt_material_node( + store: &mut GraphStore, + root: NodeId, + node: NodeId, + attachment_type: &str, + payload_bytes: Vec, +) { + store.insert_node( + node, + NodeRecord { + ty: make_type_id(WSC_RECEIPT_CORRELATION_NODE_TYPE), + }, + ); + store.insert_edge( + root, + EdgeRecord { + id: receipt_material_edge_id(&node.0), + from: root, + to: node, + ty: make_type_id(WSC_RECEIPT_CORRELATION_EDGE_TYPE), + }, + ); + store.set_node_attachment( + node, + Some(AttachmentValue::Atom(AtomPayload::new( + make_type_id(attachment_type), + Bytes::from(payload_bytes), + ))), + ); +} + +fn receipt_correlation_basis_digest( + receipts: &[TickReceiptRecord], + correlations: &[WalReceiptCorrelationRecord], +) -> Hash { + let mut hasher = Hasher::new(); + hasher.update(WSC_RECEIPT_CORRELATION_BASIS_DOMAIN); + for receipt in receipts { + hasher.update(b"receipt"); + hasher.update(&receipt.to_payload_bytes()); + } + for correlation in correlations { + hasher.update(b"correlation"); + hasher.update(&correlation.to_payload_bytes()); + } + hasher.finalize().into() +} + +fn receipt_node_id(receipt_digest: &Hash) -> NodeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_RECEIPT_CORRELATION_NODE_DOMAIN); + hasher.update(b"receipt"); + hasher.update(receipt_digest); + NodeId(hasher.finalize().into()) +} + +fn correlation_node_id(submission_id: &Hash, ticket_digest: &Hash) -> NodeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_RECEIPT_CORRELATION_NODE_DOMAIN); + hasher.update(b"correlation"); + hasher.update(submission_id); + hasher.update(ticket_digest); + NodeId(hasher.finalize().into()) +} + +fn receipt_material_edge_id(node_id: &Hash) -> EdgeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_RECEIPT_CORRELATION_EDGE_DOMAIN); + hasher.update(node_id); + EdgeId(hasher.finalize().into()) +} diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 45e708a1..7e06e365 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -5,10 +5,12 @@ #![allow(clippy::expect_used)] use warp_core::causal_wal::{ - RecoveredSubmissionIndex, SubmissionAcceptanceRecord, SubmissionRetryPosture, + RecoveredReceiptIndex, RecoveredSubmissionIndex, SubmissionAcceptanceRecord, + SubmissionRetryPosture, TickReceiptRecord, WalReceiptCorrelationRecord, WalTickDecision, }; use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, + receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, }; @@ -114,6 +116,30 @@ fn accepted_submission_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn receipt_correlation_records_round_trip_through_wsc_envelope() { + let receipt = tick_receipt(7, 17, 27, WalTickDecision::Applied); + let correlation = receipt_correlation(7, 17, 27); + let envelope = receipt_correlation_records_to_wsc_envelope(&[receipt], &[correlation]) + .expect("receipt correlation WSC envelope"); + + let recovered = receipt_correlation_records_from_wsc_envelope(&envelope) + .expect("recovered receipt correlations"); + assert_eq!(recovered.receipts, vec![receipt]); + assert_eq!(recovered.correlations, vec![correlation]); + + let index = RecoveredReceiptIndex::from_receipt_correlation_records( + recovered.receipts, + recovered.correlations, + ); + assert_eq!(index.receipt_by_submission.get(&[7; 32]), Some(&[27; 32])); + assert_eq!(index.receipt_by_ticket.get(&[17; 32]), Some(&[27; 32])); + assert_eq!( + index.decisions_by_receipt.get(&[27; 32]), + Some(&WalTickDecision::Applied) + ); +} + fn fixture_wsc_bytes(tick: u64) -> Vec { let input = OneWarpInput { warp_id: [1; 32], @@ -142,3 +168,29 @@ fn submission_acceptance(submission_byte: u8, envelope_byte: u8) -> SubmissionAc acceptance_evidence_digest: [submission_byte ^ envelope_byte; 32], } } + +fn tick_receipt( + submission_byte: u8, + ticket_byte: u8, + receipt_byte: u8, + decision: WalTickDecision, +) -> TickReceiptRecord { + TickReceiptRecord { + submission_id: [submission_byte; 32], + ticket_digest: [ticket_byte; 32], + receipt_digest: [receipt_byte; 32], + decision, + } +} + +fn receipt_correlation( + submission_byte: u8, + ticket_byte: u8, + receipt_byte: u8, +) -> WalReceiptCorrelationRecord { + WalReceiptCorrelationRecord { + submission_id: [submission_byte; 32], + ticket_digest: [ticket_byte; 32], + receipt_digest: [receipt_byte; 32], + } +} From 1d64cd0b9722309dac97bb12402ecbe91a77a50d Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 22:04:16 -0700 Subject: [PATCH 04/13] feat: persist retention refs in WSC --- crates/warp-core/src/causal_wal.rs | 31 ++++ crates/warp-core/src/wsc/mod.rs | 3 +- crates/warp-core/src/wsc/store.rs | 199 +++++++++++++++++++++- crates/warp-core/tests/wsc_store_tests.rs | 86 +++++++++- 4 files changed, 313 insertions(+), 6 deletions(-) diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index 04a9a1af..206c8843 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -2331,6 +2331,37 @@ pub struct RecoveredRetentionIndex { pub readings_by_semantic_coordinate: BTreeMap>, } +impl RecoveredRetentionIndex { + /// Builds a recovered retention index from retained material and reading records. + #[must_use] + pub fn from_retention_records(materials: I, readings: J) -> Self + where + I: IntoIterator, + J: IntoIterator, + { + let mut index = Self::default(); + for record in materials { + index + .material_by_semantic_coordinate + .entry(record.semantic_coordinate_digest) + .or_default() + .insert(record.material_digest); + index + .material_by_digest + .insert(record.material_digest, record); + } + for record in readings { + index + .readings_by_semantic_coordinate + .entry(record.semantic_coordinate_digest) + .or_default() + .insert(record.reading_id); + index.reading_by_id.insert(record.reading_id, record); + } + index + } +} + /// Retained material obstruction. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct RetainedMaterialObstruction { diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index c3c7496c..199e2203 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -72,7 +72,8 @@ pub use read::ReadError; pub use store::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, - InMemoryWscStore, WscReceiptCorrelationRecords, WscStoreEnvelope, WscStoreEnvelopeId, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, InMemoryWscStore, + WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, WscStoreWriteReceipt, }; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index b8c0ad82..df742eca 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -9,7 +9,8 @@ use bytes::Bytes; use crate::attachment::{AtomPayload, AttachmentValue}; use crate::causal_wal::{ - SubmissionAcceptanceRecord, TickReceiptRecord, WalReceiptCorrelationRecord, + ReadingRefRecord, RetainedMaterialRecord, SubmissionAcceptanceRecord, TickReceiptRecord, + WalReceiptCorrelationRecord, }; use crate::graph::GraphStore; use crate::ident::{make_node_id, make_type_id, make_warp_id, EdgeId, Hash, NodeId}; @@ -48,6 +49,16 @@ const WSC_RECEIPT_CORRELATION_EDGE_TYPE: &str = "echo/wsc-store/receipt-correlat const WSC_TICK_RECEIPT_ATTACHMENT_TYPE: &str = "echo/wsc-store/receipt-correlations/receipt/v1"; const WSC_RECEIPT_CORRELATION_ATTACHMENT_TYPE: &str = "echo/wsc-store/receipt-correlations/correlation/v1"; +const WSC_RETENTION_BASIS_DOMAIN: &[u8] = b"echo:wsc_store:retention_basis:v1\0"; +const WSC_RETENTION_NODE_DOMAIN: &[u8] = b"echo:wsc_store:retention_node:v1\0"; +const WSC_RETENTION_EDGE_DOMAIN: &[u8] = b"echo:wsc_store:retention_edge:v1\0"; +const WSC_RETENTION_SCHEMA: &str = "echo/wsc-store/retention/v1"; +const WSC_RETENTION_WARP: &str = "echo/wsc-store/retention"; +const WSC_RETENTION_ROOT: &str = "echo/wsc-store/retention/root"; +const WSC_RETENTION_NODE_TYPE: &str = "echo/wsc-store/retention/node/v1"; +const WSC_RETENTION_EDGE_TYPE: &str = "echo/wsc-store/retention/member/v1"; +const WSC_RETAINED_MATERIAL_ATTACHMENT_TYPE: &str = "echo/wsc-store/retention/material/v1"; +const WSC_READING_REF_ATTACHMENT_TYPE: &str = "echo/wsc-store/retention/reading/v1"; const HEADER_LEN: usize = 124; /// Stable identifier for a WSC store envelope. @@ -371,6 +382,15 @@ pub struct WscReceiptCorrelationRecords { pub correlations: Vec, } +/// Retained material and reading records recovered from WSC material. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct WscRetentionRecords { + /// Retained material references with evidence posture. + pub materials: Vec, + /// Retained reading references with semantic coordinates. + pub readings: Vec, +} + /// Generic WSC store port. pub trait WscStorePort { /// Writes a validated WSC envelope. @@ -615,6 +635,102 @@ pub fn receipt_correlation_records_from_wsc_envelope( }) } +/// Builds a generic WSC envelope for retained material and reading records. +/// +/// Duplicate identical records are represented once. +/// +/// # Errors +/// +/// Returns a typed obstruction when generated WSC material fails validation. +pub fn retention_records_to_wsc_envelope( + materials: &[RetainedMaterialRecord], + readings: &[ReadingRefRecord], +) -> Result { + let materials = canonical_retained_material_records(materials); + let readings = canonical_reading_ref_records(readings); + let mut store = GraphStore::new(make_warp_id(WSC_RETENTION_WARP)); + let root = make_node_id(WSC_RETENTION_ROOT); + store.insert_node( + root, + NodeRecord { + ty: make_type_id(WSC_RETENTION_NODE_TYPE), + }, + ); + for material in &materials { + insert_retention_record_node( + &mut store, + root, + retention_node_id(b"material", &material.to_payload_bytes()), + WSC_RETAINED_MATERIAL_ATTACHMENT_TYPE, + material.to_payload_bytes(), + ); + } + for reading in &readings { + insert_retention_record_node( + &mut store, + root, + retention_node_id(b"reading", &reading.to_payload_bytes()), + WSC_READING_REF_ATTACHMENT_TYPE, + reading.to_payload_bytes(), + ); + } + let basis_digest = retention_basis_digest(&materials, &readings); + let input = build_one_warp_input(&store, root); + let wsc_bytes = write_wsc_one_warp(&input, make_type_id(WSC_RETENTION_SCHEMA).0, 0) + .map_err(|_| WscStoreObstruction::invalid_wsc(basis_digest))?; + WscStoreEnvelope::validated( + WscStoreRecordKind::RetainedEvidence, + basis_digest, + wsc_bytes, + ) +} + +/// Recovers retained material and reading records from a generic WSC envelope. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when the envelope is not retained +/// evidence material or when record payloads are malformed. +pub fn retention_records_from_wsc_envelope( + envelope: &WscStoreEnvelope, +) -> Result { + if envelope.record_kind() != WscStoreRecordKind::RetainedEvidence { + return Err(WscStoreObstruction::invalid_envelope(0)); + } + let wsc_digest = *envelope.wsc_digest(); + let file = WscFile::from_bytes(envelope.wsc_bytes().to_vec()) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + validate_wsc(&file).map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + if file.schema_hash() != &make_type_id(WSC_RETENTION_SCHEMA).0 { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + let view = file + .warp_view(0) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + let mut materials = Vec::new(); + let mut readings = Vec::new(); + for node_index in 0..view.nodes().len() { + for attachment in view.node_attachments(node_index) { + let payload = atom_payload_bytes(&view, attachment, wsc_digest)?; + if attachment.type_or_warp == make_type_id(WSC_RETAINED_MATERIAL_ATTACHMENT_TYPE).0 { + let material = RetainedMaterialRecord::from_payload_bytes(payload) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + materials.push(material); + } else if attachment.type_or_warp == make_type_id(WSC_READING_REF_ATTACHMENT_TYPE).0 { + let reading = ReadingRefRecord::from_payload_bytes(payload) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + readings.push(reading); + } else { + return Err(WscStoreObstruction::invalid_wsc(wsc_digest)); + } + } + } + Ok(WscRetentionRecords { + materials: canonical_retained_material_records(&materials), + readings: canonical_reading_ref_records(&readings), + }) +} + fn read_array(bytes: &[u8], offset: usize) -> Result<[u8; N], WscStoreObstruction> { let end = offset .checked_add(N) @@ -801,3 +917,84 @@ fn receipt_material_edge_id(node_id: &Hash) -> EdgeId { hasher.update(node_id); EdgeId(hasher.finalize().into()) } + +fn canonical_retained_material_records( + records: &[RetainedMaterialRecord], +) -> Vec { + let mut by_payload = BTreeMap::new(); + for record in records { + by_payload.insert(record.to_payload_bytes(), *record); + } + by_payload.into_values().collect() +} + +fn canonical_reading_ref_records(records: &[ReadingRefRecord]) -> Vec { + let mut by_payload = BTreeMap::new(); + for record in records { + by_payload.insert(record.to_payload_bytes(), *record); + } + by_payload.into_values().collect() +} + +fn insert_retention_record_node( + store: &mut GraphStore, + root: NodeId, + node: NodeId, + attachment_type: &str, + payload_bytes: Vec, +) { + store.insert_node( + node, + NodeRecord { + ty: make_type_id(WSC_RETENTION_NODE_TYPE), + }, + ); + store.insert_edge( + root, + EdgeRecord { + id: retention_edge_id(&node.0), + from: root, + to: node, + ty: make_type_id(WSC_RETENTION_EDGE_TYPE), + }, + ); + store.set_node_attachment( + node, + Some(AttachmentValue::Atom(AtomPayload::new( + make_type_id(attachment_type), + Bytes::from(payload_bytes), + ))), + ); +} + +fn retention_basis_digest( + materials: &[RetainedMaterialRecord], + readings: &[ReadingRefRecord], +) -> Hash { + let mut hasher = Hasher::new(); + hasher.update(WSC_RETENTION_BASIS_DOMAIN); + for material in materials { + hasher.update(b"material"); + hasher.update(&material.to_payload_bytes()); + } + for reading in readings { + hasher.update(b"reading"); + hasher.update(&reading.to_payload_bytes()); + } + hasher.finalize().into() +} + +fn retention_node_id(role: &[u8], payload_bytes: &[u8]) -> NodeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_RETENTION_NODE_DOMAIN); + hasher.update(role); + hasher.update(payload_bytes); + NodeId(hasher.finalize().into()) +} + +fn retention_edge_id(node_id: &Hash) -> EdgeId { + let mut hasher = Hasher::new(); + hasher.update(WSC_RETENTION_EDGE_DOMAIN); + hasher.update(node_id); + EdgeId(hasher.finalize().into()) +} diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 7e06e365..55b6e5c3 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -5,14 +5,17 @@ #![allow(clippy::expect_used)] use warp_core::causal_wal::{ - RecoveredReceiptIndex, RecoveredSubmissionIndex, SubmissionAcceptanceRecord, - SubmissionRetryPosture, TickReceiptRecord, WalReceiptCorrelationRecord, WalTickDecision, + EvidenceMaterialPosture, ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, + RecoveredSubmissionIndex, RetainedMaterialKind, RetainedMaterialRecord, + SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, + WalReceiptCorrelationRecord, WalTickDecision, }; use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, - write_wsc_one_warp, InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, - WscStorePort, WscStoreRecordKind, WscStoreSubject, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, write_wsc_one_warp, + InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, + WscStoreRecordKind, WscStoreSubject, }; #[test] @@ -140,6 +143,51 @@ fn receipt_correlation_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn retention_records_round_trip_through_wsc_envelope() { + let material = retained_material( + 31, + 41, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + ); + let missing_material = retained_material( + 32, + 42, + RetainedMaterialKind::ReadingPayload, + EvidenceMaterialPosture::Missing, + ); + let reading = reading_ref(51, 41, 61, 71, EvidenceMaterialPosture::Present); + let envelope = retention_records_to_wsc_envelope( + &[missing_material, material, material], + &[reading, reading], + ) + .expect("retention WSC envelope"); + + let recovered = retention_records_from_wsc_envelope(&envelope).expect("recovered retention"); + assert_eq!(recovered.materials, vec![material, missing_material]); + assert_eq!(recovered.readings, vec![reading]); + + let index = + RecoveredRetentionIndex::from_retention_records(recovered.materials, recovered.readings); + assert_eq!(index.material_by_digest.get(&[31; 32]), Some(&material)); + assert_eq!( + index.material_by_digest.get(&[32; 32]), + Some(&missing_material) + ); + assert_eq!(index.reading_by_id.get(&[51; 32]), Some(&reading)); + assert!(index + .material_by_semantic_coordinate + .get(&[41; 32]) + .expect("material semantic coordinate") + .contains(&[31; 32])); + assert!(index + .readings_by_semantic_coordinate + .get(&[41; 32]) + .expect("reading semantic coordinate") + .contains(&[51; 32])); +} + fn fixture_wsc_bytes(tick: u64) -> Vec { let input = OneWarpInput { warp_id: [1; 32], @@ -194,3 +242,33 @@ fn receipt_correlation( receipt_digest: [receipt_byte; 32], } } + +fn retained_material( + material_byte: u8, + semantic_byte: u8, + kind: RetainedMaterialKind, + posture: EvidenceMaterialPosture, +) -> RetainedMaterialRecord { + RetainedMaterialRecord { + material_digest: [material_byte; 32], + semantic_coordinate_digest: [semantic_byte; 32], + kind, + posture, + } +} + +fn reading_ref( + reading_byte: u8, + semantic_byte: u8, + payload_byte: u8, + envelope_byte: u8, + posture: EvidenceMaterialPosture, +) -> ReadingRefRecord { + ReadingRefRecord { + reading_id: [reading_byte; 32], + semantic_coordinate_digest: [semantic_byte; 32], + payload_digest: [payload_byte; 32], + envelope_digest: [envelope_byte; 32], + posture, + } +} From c1c213534d0866e668e6b17c2eb584e31566419a Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 22:07:17 -0700 Subject: [PATCH 05/13] test: cover retention WSC obstruction posture --- crates/warp-core/tests/wsc_store_tests.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 55b6e5c3..1e315d1f 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -4,11 +4,13 @@ #![allow(clippy::expect_used)] +use std::collections::BTreeSet; + use warp_core::causal_wal::{ - EvidenceMaterialPosture, ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, - RecoveredSubmissionIndex, RetainedMaterialKind, RetainedMaterialRecord, - SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, - WalReceiptCorrelationRecord, WalTickDecision, + retained_material_obstructions, EvidenceMaterialPosture, MissingMaterialScope, + ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, RecoveredSubmissionIndex, + RetainedMaterialKind, RetainedMaterialRecord, SubmissionAcceptanceRecord, + SubmissionRetryPosture, TickReceiptRecord, WalReceiptCorrelationRecord, WalTickDecision, }; use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, @@ -186,6 +188,15 @@ fn retention_records_round_trip_through_wsc_envelope() { .get(&[41; 32]) .expect("reading semantic coordinate") .contains(&[51; 32])); + + let available_material = BTreeSet::from([[31; 32]]); + let obstructions = retained_material_obstructions(&index, &available_material); + assert_eq!(obstructions.len(), 1); + let obstruction = obstructions[0]; + assert_eq!(obstruction.material_digest, [32; 32]); + assert_eq!(obstruction.kind, RetainedMaterialKind::ReadingPayload); + assert_eq!(obstruction.scope, MissingMaterialScope::Reading); + assert_eq!(obstruction.posture, EvidenceMaterialPosture::Missing); } fn fixture_wsc_bytes(tick: u64) -> Vec { From 8580ac67deb89cb0f64cd6304fe3c8549e76bfc2 Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 22:12:59 -0700 Subject: [PATCH 06/13] feat: add WSC commit markers --- crates/warp-core/src/wsc/store.rs | 197 +++++++++++++++++++--- crates/warp-core/tests/wsc_store_tests.rs | 46 +++++ 2 files changed, 224 insertions(+), 19 deletions(-) diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index df742eca..dd51c5df 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -26,6 +26,7 @@ const WSC_STORE_ENVELOPE_MAGIC: &[u8; 8] = b"ECWSCST1"; const WSC_STORE_ENVELOPE_VERSION: u16 = 1; const WSC_STORE_ENVELOPE_ID_DOMAIN: &[u8] = b"echo:wsc_store:envelope_id:v1\0"; const WSC_STORE_BYTES_DOMAIN: &[u8] = b"echo:wsc_store:wsc_bytes:v1\0"; +const WSC_STORE_COMMIT_MARKER_DOMAIN: &[u8] = b"echo:wsc_store:commit_marker:v1\0"; const WSC_ACCEPTED_SUBMISSION_BASIS_DOMAIN: &[u8] = b"echo:wsc_store:accepted_submission_basis:v1\0"; const WSC_ACCEPTED_SUBMISSION_NODE_DOMAIN: &[u8] = b"echo:wsc_store:accepted_submission_node:v1\0"; @@ -150,6 +151,10 @@ pub enum WscStoreObstructionKind { DigestMismatch, /// Existing envelope id maps to different material. DuplicateEnvelopeMismatch, + /// Envelope material exists without a matching commit marker, or vice versa. + IncompleteEnvelopeWrite, + /// Commit marker does not match the envelope material. + CommitMarkerMismatch, } /// Typed obstruction returned instead of hidden fallback or invented success. @@ -196,6 +201,20 @@ impl WscStoreObstruction { subject: WscStoreSubject::Envelope { envelope_id }, } } + + fn incomplete_write(envelope_id: WscStoreEnvelopeId) -> Self { + Self { + kind: WscStoreObstructionKind::IncompleteEnvelopeWrite, + subject: WscStoreSubject::Envelope { envelope_id }, + } + } + + fn commit_marker_mismatch(envelope_id: WscStoreEnvelopeId) -> Self { + Self { + kind: WscStoreObstructionKind::CommitMarkerMismatch, + subject: WscStoreSubject::Envelope { envelope_id }, + } + } } /// Deterministic WSC store envelope. @@ -367,12 +386,77 @@ impl WscStoreEnvelope { pub struct WscStoreWriteReceipt { /// Written envelope id. pub envelope_id: WscStoreEnvelopeId, + /// Commit marker digest proving the envelope was published. + pub commit_marker_digest: Hash, /// WSC payload digest. pub wsc_digest: Hash, /// Encoded envelope byte length. pub encoded_len: u64, } +/// Commit marker for a completed WSC envelope write. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct WscStoreCommitMarker { + envelope_id: WscStoreEnvelopeId, + record_kind: WscStoreRecordKind, + basis_digest: Hash, + schema_hash: Hash, + tick: u64, + wsc_digest: Hash, + encoded_len: u64, + marker_digest: Hash, +} + +impl WscStoreCommitMarker { + fn from_envelope(envelope: &WscStoreEnvelope) -> Result { + let encoded_len = u64::try_from(envelope.encode().len()) + .map_err(|_| WscStoreObstruction::invalid_envelope(HEADER_LEN))?; + let marker_digest = derive_commit_marker_digest(envelope, encoded_len); + Ok(Self { + envelope_id: envelope.id(), + record_kind: envelope.record_kind(), + basis_digest: *envelope.basis_digest(), + schema_hash: *envelope.schema_hash(), + tick: envelope.tick(), + wsc_digest: *envelope.wsc_digest(), + encoded_len, + marker_digest, + }) + } + + /// Returns the envelope id committed by this marker. + #[must_use] + pub const fn envelope_id(&self) -> WscStoreEnvelopeId { + self.envelope_id + } + + /// Returns the marker digest. + #[must_use] + pub const fn marker_digest(&self) -> Hash { + self.marker_digest + } + + fn write_receipt(self) -> WscStoreWriteReceipt { + WscStoreWriteReceipt { + envelope_id: self.envelope_id, + commit_marker_digest: self.marker_digest, + wsc_digest: self.wsc_digest, + encoded_len: self.encoded_len, + } + } + + fn matches_envelope(self, envelope: &WscStoreEnvelope) -> bool { + self.envelope_id == envelope.id() + && self.record_kind == envelope.record_kind() + && self.basis_digest == *envelope.basis_digest() + && self.schema_hash == *envelope.schema_hash() + && self.tick == envelope.tick() + && self.wsc_digest == *envelope.wsc_digest() + && self.encoded_len == u64::try_from(envelope.encode().len()).unwrap_or(u64::MAX) + && self.marker_digest == derive_commit_marker_digest(envelope, self.encoded_len) + } +} + /// Receipt and correlation records recovered from WSC material. #[derive(Clone, Debug, PartialEq, Eq)] pub struct WscReceiptCorrelationRecords { @@ -412,43 +496,105 @@ pub trait WscStorePort { /// In-memory WSC store implementation for tests and adapters. #[derive(Debug, Default)] pub struct InMemoryWscStore { - envelopes: BTreeMap, + staged_envelopes: BTreeMap, + commit_markers: BTreeMap, } -impl WscStorePort for InMemoryWscStore { - fn write_envelope( +impl InMemoryWscStore { + /// Stages an envelope without publishing its commit marker. + /// + /// This models the pre-commit phase of an atomic write. Callers that read + /// through [`WscStorePort`] will not observe the staged envelope until + /// [`Self::commit_staged_envelope`] publishes the matching marker. + /// + /// # Errors + /// + /// Returns a typed obstruction when the same envelope id already maps to + /// different staged material. + pub fn stage_envelope_without_commit_marker( &mut self, envelope: WscStoreEnvelope, - ) -> Result { + ) -> Result { let envelope_id = envelope.id(); - if let Some(existing) = self.envelopes.get(&envelope_id) { + if let Some(existing) = self.staged_envelopes.get(&envelope_id) { if existing != &envelope { return Err(WscStoreObstruction::duplicate_mismatch(envelope_id)); } } - let encoded_len = u64::try_from(envelope.encode().len()) - .map_err(|_| WscStoreObstruction::invalid_envelope(HEADER_LEN))?; - let receipt = WscStoreWriteReceipt { - envelope_id, - wsc_digest: envelope.wsc_digest, - encoded_len, - }; - self.envelopes.insert(envelope_id, envelope); - Ok(receipt) + if let Some(marker) = self.commit_markers.get(&envelope_id) { + if !marker.matches_envelope(&envelope) { + return Err(WscStoreObstruction::commit_marker_mismatch(envelope_id)); + } + } + self.staged_envelopes.insert(envelope_id, envelope); + Ok(envelope_id) + } + + /// Publishes the commit marker for a staged envelope. + /// + /// # Errors + /// + /// Returns a typed obstruction when the staged envelope is absent or when + /// an existing marker does not match the staged material. + pub fn commit_staged_envelope( + &mut self, + envelope_id: WscStoreEnvelopeId, + ) -> Result { + let envelope = self + .staged_envelopes + .get(&envelope_id) + .ok_or_else(|| WscStoreObstruction::incomplete_write(envelope_id))?; + let marker = WscStoreCommitMarker::from_envelope(envelope)?; + if let Some(existing) = self.commit_markers.get(&envelope_id) { + if existing != &marker { + return Err(WscStoreObstruction::commit_marker_mismatch(envelope_id)); + } + return Ok(existing.write_receipt()); + } + self.commit_markers.insert(envelope_id, marker); + Ok(marker.write_receipt()) + } +} + +impl WscStorePort for InMemoryWscStore { + fn write_envelope( + &mut self, + envelope: WscStoreEnvelope, + ) -> Result { + let envelope_id = envelope.id(); + self.stage_envelope_without_commit_marker(envelope)?; + self.commit_staged_envelope(envelope_id) } fn read_envelope( &self, envelope_id: WscStoreEnvelopeId, ) -> Result { - self.envelopes - .get(&envelope_id) - .cloned() - .ok_or_else(|| WscStoreObstruction::missing_envelope(envelope_id)) + match ( + self.staged_envelopes.get(&envelope_id), + self.commit_markers.get(&envelope_id), + ) { + (Some(envelope), Some(marker)) if marker.matches_envelope(envelope) => { + Ok(envelope.clone()) + } + (Some(_), Some(_)) => Err(WscStoreObstruction::commit_marker_mismatch(envelope_id)), + (Some(_), None) | (None, Some(_)) => { + Err(WscStoreObstruction::incomplete_write(envelope_id)) + } + (None, None) => Err(WscStoreObstruction::missing_envelope(envelope_id)), + } } fn list_envelopes(&self) -> Vec { - self.envelopes.keys().copied().collect() + self.commit_markers + .iter() + .filter_map(|(envelope_id, marker)| { + self.staged_envelopes + .get(envelope_id) + .filter(|envelope| marker.matches_envelope(envelope)) + .map(|_| *envelope_id) + }) + .collect() } } @@ -762,6 +908,19 @@ fn derive_envelope_id( WscStoreEnvelopeId(hasher.finalize().into()) } +fn derive_commit_marker_digest(envelope: &WscStoreEnvelope, encoded_len: u64) -> Hash { + let mut hasher = Hasher::new(); + hasher.update(WSC_STORE_COMMIT_MARKER_DOMAIN); + hasher.update(&envelope.id().as_hash()); + hasher.update(&envelope.record_kind().code().to_le_bytes()); + hasher.update(envelope.basis_digest()); + hasher.update(envelope.schema_hash()); + hasher.update(&envelope.tick().to_le_bytes()); + hasher.update(envelope.wsc_digest()); + hasher.update(&encoded_len.to_le_bytes()); + hasher.finalize().into() +} + fn digest_wsc_bytes(bytes: &[u8]) -> Hash { let mut hasher = Hasher::new(); hasher.update(WSC_STORE_BYTES_DOMAIN); diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 1e315d1f..a35b3a19 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -57,6 +57,52 @@ fn in_memory_wsc_store_writes_reads_and_lists_envelopes() { assert_eq!(store.read_envelope(id), Ok(envelope)); } +#[test] +fn in_memory_wsc_store_ignores_uncommitted_staged_write() { + let envelope = + WscStoreEnvelope::validated(WscStoreRecordKind::Snapshot, [6; 32], fixture_wsc_bytes(19)) + .expect("valid WSC envelope"); + let id = envelope.id(); + let mut store = InMemoryWscStore::default(); + + let staged_id = store + .stage_envelope_without_commit_marker(envelope) + .expect("staged WSC envelope"); + + assert_eq!(staged_id, id); + assert!(store.list_envelopes().is_empty()); + let obstruction = store + .read_envelope(id) + .expect_err("uncommitted staged write obstructs"); + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::IncompleteEnvelopeWrite + ); +} + +#[test] +fn in_memory_wsc_store_commits_staged_write_idempotently() { + let envelope = + WscStoreEnvelope::validated(WscStoreRecordKind::Snapshot, [7; 32], fixture_wsc_bytes(23)) + .expect("valid WSC envelope"); + let id = envelope.id(); + let mut store = InMemoryWscStore::default(); + + store + .stage_envelope_without_commit_marker(envelope.clone()) + .expect("staged WSC envelope"); + let committed = store + .commit_staged_envelope(id) + .expect("committed staged WSC envelope"); + let committed_again = store + .write_envelope(envelope.clone()) + .expect("idempotent write"); + + assert_eq!(committed, committed_again); + assert_eq!(store.list_envelopes(), vec![id]); + assert_eq!(store.read_envelope(id), Ok(envelope)); +} + #[test] fn in_memory_wsc_store_missing_envelope_returns_typed_obstruction() { let store = InMemoryWscStore::default(); From 7fe747a1358122de6599baec70f56ef29bca630a Mon Sep 17 00:00:00 2001 From: James Ross Date: Mon, 1 Jun 2026 22:18:05 -0700 Subject: [PATCH 07/13] feat: recover pending submissions from WSC --- crates/warp-core/src/wsc/mod.rs | 13 +++---- crates/warp-core/src/wsc/store.rs | 39 +++++++++++++++++++++ crates/warp-core/tests/wsc_store_tests.rs | 41 +++++++++++++++++++---- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index 199e2203..8767b99b 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -70,12 +70,13 @@ pub mod write; pub use build::build_one_warp_input; pub use read::ReadError; pub use store::{ - accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, - receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, InMemoryWscStore, - WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, - WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, - WscStoreSubject, WscStoreWriteReceipt, + accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, + accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, + receipt_correlation_records_to_wsc_envelope, retention_records_from_wsc_envelope, + retention_records_to_wsc_envelope, InMemoryWscStore, WscReceiptCorrelationRecords, + WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, + WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, + WscStoreWriteReceipt, }; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index dd51c5df..75979843 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -689,6 +689,34 @@ pub fn accepted_submission_records_from_wsc_envelope( canonical_accepted_submission_records(&records) } +/// Recovers accepted submission records from committed WSC store envelopes. +/// +/// Incomplete staged writes are not visible through [`WscStorePort::list_envelopes`], +/// and any incomplete envelope read returns a typed obstruction. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when a committed accepted-submission +/// envelope is malformed or conflicting duplicate submission material is found. +pub fn accepted_submission_records_from_wsc_store

( + store: &P, +) -> Result, WscStoreObstruction> +where + P: WscStorePort + ?Sized, +{ + let mut records = Vec::new(); + for envelope_id in store.list_envelopes() { + let envelope = store.read_envelope(envelope_id)?; + if envelope.record_kind() != WscStoreRecordKind::CausalHistory + || !envelope_has_schema(&envelope, WSC_ACCEPTED_SUBMISSION_SCHEMA)? + { + continue; + } + records.extend(accepted_submission_records_from_wsc_envelope(&envelope)?); + } + canonical_accepted_submission_records(&records) +} + /// Builds a generic WSC envelope for receipt and ticket correlation records. /// /// # Errors @@ -980,6 +1008,17 @@ fn atom_payload_bytes<'a>( .ok_or_else(|| WscStoreObstruction::invalid_wsc(wsc_digest)) } +fn envelope_has_schema( + envelope: &WscStoreEnvelope, + schema: &str, +) -> Result { + let wsc_digest = *envelope.wsc_digest(); + let file = WscFile::from_bytes(envelope.wsc_bytes().to_vec()) + .map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + validate_wsc(&file).map_err(|_| WscStoreObstruction::invalid_wsc(wsc_digest))?; + Ok(file.schema_hash() == &make_type_id(schema).0) +} + fn canonical_tick_receipts(records: &[TickReceiptRecord]) -> Vec { let mut by_receipt = BTreeMap::new(); for record in records { diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index a35b3a19..1f67e699 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -9,15 +9,16 @@ use std::collections::BTreeSet; use warp_core::causal_wal::{ retained_material_obstructions, EvidenceMaterialPosture, MissingMaterialScope, ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, RecoveredSubmissionIndex, - RetainedMaterialKind, RetainedMaterialRecord, SubmissionAcceptanceRecord, - SubmissionRetryPosture, TickReceiptRecord, WalReceiptCorrelationRecord, WalTickDecision, + RecoveredSubmissionPosture, RetainedMaterialKind, RetainedMaterialRecord, + SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, + WalReceiptCorrelationRecord, WalTickDecision, }; use warp_core::wsc::{ - accepted_submission_records_from_wsc_envelope, accepted_submission_records_to_wsc_envelope, - receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, write_wsc_one_warp, - InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, - WscStoreRecordKind, WscStoreSubject, + accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, + accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, + receipt_correlation_records_to_wsc_envelope, retention_records_from_wsc_envelope, + retention_records_to_wsc_envelope, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, + WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, }; #[test] @@ -167,6 +168,32 @@ fn accepted_submission_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn pending_submission_recovers_from_committed_wsc_store_without_decision() { + let pending = submission_acceptance(3, 33); + let envelope = + accepted_submission_records_to_wsc_envelope(&[pending]).expect("accepted WSC envelope"); + let mut store = InMemoryWscStore::default(); + store + .write_envelope(envelope) + .expect("committed accepted WSC envelope"); + + let recovered_records = + accepted_submission_records_from_wsc_store(&store).expect("recovered accepted records"); + let recovered = + RecoveredSubmissionIndex::from_acceptance_records(recovered_records).expect("index"); + + let entry = recovered + .get(&pending.submission_id) + .expect("recovered pending submission"); + assert_eq!(entry.posture, RecoveredSubmissionPosture::AcceptedPending); + assert_eq!(entry.receipt_digest, None); + assert_eq!( + recovered.retry_posture(pending.submission_id, pending.canonical_envelope_digest), + SubmissionRetryPosture::AlreadyAcceptedPending + ); +} + #[test] fn receipt_correlation_records_round_trip_through_wsc_envelope() { let receipt = tick_receipt(7, 17, 27, WalTickDecision::Applied); From 6b17054dd651b15f6dc9459c3aaecfa0b2fbfbbb Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 01:18:04 -0700 Subject: [PATCH 08/13] feat: recover decided submissions from WSC --- crates/warp-core/src/causal_wal.rs | 49 +++++++++--- crates/warp-core/src/wsc/mod.rs | 10 +-- crates/warp-core/src/wsc/store.rs | 31 ++++++++ crates/warp-core/tests/wsc_store_tests.rs | 90 ++++++++++++++++++++++- 4 files changed, 162 insertions(+), 18 deletions(-) diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index 206c8843..f7cbdf3e 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -2101,6 +2101,31 @@ impl RecoveredSubmissionIndex { Ok(index) } + /// Builds a recovered index from accepted submission and tick receipt records. + /// + /// Accepted submissions with no matching receipt remain accepted pending. + /// Matching receipt records move accepted submissions to their decided or + /// obstructed posture without inventing retries. + /// + /// # Errors + /// + /// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one + /// submission id is associated with conflicting canonical envelope digests. + pub fn from_acceptance_and_receipt_records( + acceptances: I, + receipts: J, + ) -> Result + where + I: IntoIterator, + J: IntoIterator, + { + let mut index = Self::from_acceptance_records(acceptances)?; + for receipt in receipts { + index.apply_tick_receipt_record(receipt); + } + Ok(index) + } + fn insert_acceptance_record( &mut self, record: SubmissionAcceptanceRecord, @@ -2126,6 +2151,19 @@ impl RecoveredSubmissionIndex { Ok(()) } + fn apply_tick_receipt_record(&mut self, receipt: TickReceiptRecord) { + if let Some(entry) = self.submissions.get_mut(&receipt.submission_id) { + entry.posture = match receipt.decision { + WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied, + WalTickDecision::RejectedFootprintConflict => { + RecoveredSubmissionPosture::DecidedRejected + } + WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed, + }; + entry.receipt_digest = Some(receipt.receipt_digest); + } + } + /// Returns a recovered submission entry. #[must_use] pub fn get(&self, submission_id: &Hash) -> Option<&RecoveredSubmissionEntry> { @@ -4181,16 +4219,7 @@ pub fn recover_submission_index( WalRecordKind::TickReceiptRecorded => { let receipt = TickReceiptRecord::from_payload_bytes(&frame.payload.canonical_bytes)?; - if let Some(entry) = index.submissions.get_mut(&receipt.submission_id) { - entry.posture = match receipt.decision { - WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied, - WalTickDecision::RejectedFootprintConflict => { - RecoveredSubmissionPosture::DecidedRejected - } - WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed, - }; - entry.receipt_digest = Some(receipt.receipt_digest); - } + index.apply_tick_receipt_record(receipt); } _ => {} } diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index 8767b99b..5f4261ac 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -72,11 +72,11 @@ pub use read::ReadError; pub use store::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, - receipt_correlation_records_to_wsc_envelope, retention_records_from_wsc_envelope, - retention_records_to_wsc_envelope, InMemoryWscStore, WscReceiptCorrelationRecords, - WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, - WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, - WscStoreWriteReceipt, + receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, InMemoryWscStore, + WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, + WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, + WscStoreSubject, WscStoreWriteReceipt, }; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 75979843..f308b61c 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -809,6 +809,37 @@ pub fn receipt_correlation_records_from_wsc_envelope( }) } +/// Recovers receipt and ticket correlation records from committed WSC store envelopes. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when a committed receipt-correlation +/// envelope is malformed. +pub fn receipt_correlation_records_from_wsc_store

( + store: &P, +) -> Result +where + P: WscStorePort + ?Sized, +{ + let mut receipts = Vec::new(); + let mut correlations = Vec::new(); + for envelope_id in store.list_envelopes() { + let envelope = store.read_envelope(envelope_id)?; + if envelope.record_kind() != WscStoreRecordKind::CausalHistory + || !envelope_has_schema(&envelope, WSC_RECEIPT_CORRELATION_SCHEMA)? + { + continue; + } + let recovered = receipt_correlation_records_from_wsc_envelope(&envelope)?; + receipts.extend(recovered.receipts); + correlations.extend(recovered.correlations); + } + Ok(WscReceiptCorrelationRecords { + receipts: canonical_tick_receipts(&receipts), + correlations: canonical_receipt_correlations(&correlations), + }) +} + /// Builds a generic WSC envelope for retained material and reading records. /// /// Duplicate identical records are represented once. diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 1f67e699..b904b18a 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -16,9 +16,10 @@ use warp_core::causal_wal::{ use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, - receipt_correlation_records_to_wsc_envelope, retention_records_from_wsc_envelope, - retention_records_to_wsc_envelope, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, - WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, + receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, write_wsc_one_warp, + InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, + WscStoreRecordKind, WscStoreSubject, }; #[test] @@ -218,6 +219,89 @@ fn receipt_correlation_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn decided_submissions_recover_from_committed_wsc_store() { + let applied = submission_acceptance(8, 38); + let rejected = submission_acceptance(9, 39); + let applied_receipt = tick_receipt(8, 18, 28, WalTickDecision::Applied); + let rejected_receipt = tick_receipt(9, 19, 29, WalTickDecision::RejectedFootprintConflict); + let mut store = InMemoryWscStore::default(); + store + .write_envelope( + accepted_submission_records_to_wsc_envelope(&[rejected, applied]) + .expect("accepted WSC envelope"), + ) + .expect("committed accepted WSC envelope"); + store + .write_envelope( + receipt_correlation_records_to_wsc_envelope( + &[rejected_receipt, applied_receipt], + &[ + receipt_correlation(8, 18, 28), + receipt_correlation(9, 19, 29), + ], + ) + .expect("receipt WSC envelope"), + ) + .expect("committed receipt WSC envelope"); + + let accepted = + accepted_submission_records_from_wsc_store(&store).expect("recovered accepted records"); + let receipt_records = + receipt_correlation_records_from_wsc_store(&store).expect("recovered receipt records"); + let submissions = RecoveredSubmissionIndex::from_acceptance_and_receipt_records( + accepted, + receipt_records.receipts.clone(), + ) + .expect("decided submission index"); + let receipts = RecoveredReceiptIndex::from_receipt_correlation_records( + receipt_records.receipts, + receipt_records.correlations, + ); + + let applied_entry = submissions + .get(&applied.submission_id) + .expect("recovered applied submission"); + assert_eq!( + applied_entry.posture, + RecoveredSubmissionPosture::DecidedApplied + ); + assert_eq!( + applied_entry.receipt_digest, + Some(applied_receipt.receipt_digest) + ); + assert_eq!( + submissions.retry_posture(applied.submission_id, applied.canonical_envelope_digest), + SubmissionRetryPosture::AlreadyDecidedApplied + ); + + let rejected_entry = submissions + .get(&rejected.submission_id) + .expect("recovered rejected submission"); + assert_eq!( + rejected_entry.posture, + RecoveredSubmissionPosture::DecidedRejected + ); + assert_eq!( + rejected_entry.receipt_digest, + Some(rejected_receipt.receipt_digest) + ); + assert_eq!( + submissions.retry_posture(rejected.submission_id, rejected.canonical_envelope_digest), + SubmissionRetryPosture::AlreadyDecidedRejected + ); + assert_eq!( + receipts.receipt_by_submission.get(&applied.submission_id), + Some(&applied_receipt.receipt_digest) + ); + assert_eq!( + receipts + .decisions_by_receipt + .get(&rejected_receipt.receipt_digest), + Some(&WalTickDecision::RejectedFootprintConflict) + ); +} + #[test] fn retention_records_round_trip_through_wsc_envelope() { let material = retained_material( From 0ae88f5c3b4a9452ab3faa93ea2084567ed86fee Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 01:25:19 -0700 Subject: [PATCH 09/13] feat: reject incomplete WSC causal history --- crates/warp-core/src/wsc/mod.rs | 9 +- crates/warp-core/src/wsc/store.rs | 103 +++++++++++++++++++++- crates/warp-core/tests/wsc_store_tests.rs | 75 +++++++++++++++- 3 files changed, 179 insertions(+), 8 deletions(-) diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index 5f4261ac..9459c0b2 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -73,10 +73,11 @@ pub use store::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, InMemoryWscStore, - WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, - WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, - WscStoreSubject, WscStoreWriteReceipt, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, + validate_wsc_causal_history_store, InMemoryWscStore, WscReceiptCorrelationRecords, + WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, + WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, + WscStoreWriteReceipt, }; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index f308b61c..4b54711e 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -2,7 +2,7 @@ // © James Ross Ω FLYING•ROBOTS //! Generic WSC storage port and deterministic envelope format. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use blake3::Hasher; use bytes::Bytes; @@ -136,6 +136,11 @@ pub enum WscStoreSubject { /// Digest of the invalid WSC payload. digest: Hash, }, + /// Causal-history material was inconsistent. + CausalHistory { + /// Digest naming the inconsistent causal-history subject. + subject_digest: Hash, + }, } /// Generic WSC store obstruction kind. @@ -155,6 +160,8 @@ pub enum WscStoreObstructionKind { IncompleteEnvelopeWrite, /// Commit marker does not match the envelope material. CommitMarkerMismatch, + /// Committed causal-history records are missing required partner material. + IncompleteCausalHistory, } /// Typed obstruction returned instead of hidden fallback or invented success. @@ -215,6 +222,13 @@ impl WscStoreObstruction { subject: WscStoreSubject::Envelope { envelope_id }, } } + + fn incomplete_causal_history(subject_digest: Hash) -> Self { + Self { + kind: WscStoreObstructionKind::IncompleteCausalHistory, + subject: WscStoreSubject::CausalHistory { subject_digest }, + } + } } /// Deterministic WSC store envelope. @@ -840,6 +854,29 @@ where }) } +/// Validates committed WSC causal-history records for required partner material. +/// +/// Accepted submissions may remain pending without receipts. Receipt records +/// and receipt-correlation records, however, require a committed accepted +/// submission and a matching receipt/correlation pair. +/// +/// # Errors +/// +/// Returns [`WscStoreObstructionKind::IncompleteCausalHistory`] when committed +/// records reference missing partner material. +pub fn validate_wsc_causal_history_store

(store: &P) -> Result<(), WscStoreObstruction> +where + P: WscStorePort + ?Sized, +{ + let acceptances = accepted_submission_records_from_wsc_store(store)?; + let receipt_records = receipt_correlation_records_from_wsc_store(store)?; + validate_wsc_causal_history_records( + &acceptances, + &receipt_records.receipts, + &receipt_records.correlations, + ) +} + /// Builds a generic WSC envelope for retained material and reading records. /// /// Duplicate identical records are represented once. @@ -1075,6 +1112,70 @@ fn canonical_receipt_correlations( by_correlation.into_values().collect() } +fn validate_wsc_causal_history_records( + acceptances: &[SubmissionAcceptanceRecord], + receipts: &[TickReceiptRecord], + correlations: &[WalReceiptCorrelationRecord], +) -> Result<(), WscStoreObstruction> { + let accepted_submissions: BTreeSet = acceptances + .iter() + .map(|record| record.submission_id) + .collect(); + let receipt_keys: BTreeSet<(Hash, Hash, Hash)> = receipts + .iter() + .map(|record| { + ( + record.submission_id, + record.ticket_digest, + record.receipt_digest, + ) + }) + .collect(); + let correlation_keys: BTreeSet<(Hash, Hash, Hash)> = correlations + .iter() + .map(|record| { + ( + record.submission_id, + record.ticket_digest, + record.receipt_digest, + ) + }) + .collect(); + for receipt in receipts { + if !accepted_submissions.contains(&receipt.submission_id) { + return Err(WscStoreObstruction::incomplete_causal_history( + receipt.receipt_digest, + )); + } + if !correlation_keys.contains(&( + receipt.submission_id, + receipt.ticket_digest, + receipt.receipt_digest, + )) { + return Err(WscStoreObstruction::incomplete_causal_history( + receipt.receipt_digest, + )); + } + } + for correlation in correlations { + if !accepted_submissions.contains(&correlation.submission_id) { + return Err(WscStoreObstruction::incomplete_causal_history( + correlation.receipt_digest, + )); + } + if !receipt_keys.contains(&( + correlation.submission_id, + correlation.ticket_digest, + correlation.receipt_digest, + )) { + return Err(WscStoreObstruction::incomplete_causal_history( + correlation.receipt_digest, + )); + } + } + Ok(()) +} + fn insert_receipt_material_node( store: &mut GraphStore, root: NodeId, diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index b904b18a..4ca5a8d2 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -17,9 +17,9 @@ use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, write_wsc_one_warp, - InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, - WscStoreRecordKind, WscStoreSubject, + retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, + validate_wsc_causal_history_store, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, + WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, }; #[test] @@ -302,6 +302,75 @@ fn decided_submissions_recover_from_committed_wsc_store() { ); } +#[test] +fn wsc_causal_history_rejects_receipt_without_committed_acceptance() { + let acceptance = submission_acceptance(10, 40); + let receipt = tick_receipt(10, 20, 30, WalTickDecision::Applied); + let mut store = InMemoryWscStore::default(); + store + .stage_envelope_without_commit_marker( + accepted_submission_records_to_wsc_envelope(&[acceptance]) + .expect("accepted WSC envelope"), + ) + .expect("staged accepted WSC envelope"); + store + .write_envelope( + receipt_correlation_records_to_wsc_envelope( + &[receipt], + &[receipt_correlation(10, 20, 30)], + ) + .expect("receipt WSC envelope"), + ) + .expect("committed receipt WSC envelope"); + + let obstruction = + validate_wsc_causal_history_store(&store).expect_err("half-accepted history obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::IncompleteCausalHistory + ); + assert_eq!( + obstruction.subject, + WscStoreSubject::CausalHistory { + subject_digest: receipt.receipt_digest + } + ); +} + +#[test] +fn wsc_causal_history_rejects_receipt_without_correlation() { + let acceptance = submission_acceptance(11, 41); + let receipt = tick_receipt(11, 21, 31, WalTickDecision::Applied); + let mut store = InMemoryWscStore::default(); + store + .write_envelope( + accepted_submission_records_to_wsc_envelope(&[acceptance]) + .expect("accepted WSC envelope"), + ) + .expect("committed accepted WSC envelope"); + store + .write_envelope( + receipt_correlation_records_to_wsc_envelope(&[receipt], &[]) + .expect("receipt WSC envelope"), + ) + .expect("committed receipt WSC envelope"); + + let obstruction = + validate_wsc_causal_history_store(&store).expect_err("missing correlation obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::IncompleteCausalHistory + ); + assert_eq!( + obstruction.subject, + WscStoreSubject::CausalHistory { + subject_digest: receipt.receipt_digest + } + ); +} + #[test] fn retention_records_round_trip_through_wsc_envelope() { let material = retained_material( From aa63c1d8305908a44cc5bab080ffc9ba85c54ff8 Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 13:19:56 -0700 Subject: [PATCH 10/13] Fix: tighten WSC store recovery evidence --- crates/warp-core/src/wsc/mod.rs | 10 +- crates/warp-core/src/wsc/store.rs | 108 +++++++++++++++++++--- crates/warp-core/tests/wsc_store_tests.rs | 47 +++++++++- 3 files changed, 146 insertions(+), 19 deletions(-) diff --git a/crates/warp-core/src/wsc/mod.rs b/crates/warp-core/src/wsc/mod.rs index 9459c0b2..2581a8dd 100644 --- a/crates/warp-core/src/wsc/mod.rs +++ b/crates/warp-core/src/wsc/mod.rs @@ -73,11 +73,11 @@ pub use store::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, - validate_wsc_causal_history_store, InMemoryWscStore, WscReceiptCorrelationRecords, - WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, WscStoreObstruction, - WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, - WscStoreWriteReceipt, + retention_records_from_wsc_envelope, retention_records_from_wsc_store, + retention_records_to_wsc_envelope, validate_wsc_causal_history_store, InMemoryWscStore, + WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId, + WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, + WscStoreSubject, WscStoreWriteReceipt, }; pub use validate::validate_wsc; pub use view::{AttachmentRef, WarpView, WscFile}; diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 4b54711e..9a0785e2 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -735,13 +735,14 @@ where /// /// # Errors /// -/// Returns a typed obstruction when generated WSC material fails validation. +/// Returns a typed obstruction when generated WSC material fails validation or +/// when duplicate receipt/correlation keys map to conflicting material. pub fn receipt_correlation_records_to_wsc_envelope( receipts: &[TickReceiptRecord], correlations: &[WalReceiptCorrelationRecord], ) -> Result { - let receipts = canonical_tick_receipts(receipts); - let correlations = canonical_receipt_correlations(correlations); + let receipts = canonical_tick_receipts(receipts)?; + let correlations = canonical_receipt_correlations(correlations)?; let mut store = GraphStore::new(make_warp_id(WSC_RECEIPT_CORRELATION_WARP)); let root = make_node_id(WSC_RECEIPT_CORRELATION_ROOT); store.insert_node( @@ -818,8 +819,8 @@ pub fn receipt_correlation_records_from_wsc_envelope( } } Ok(WscReceiptCorrelationRecords { - receipts: canonical_tick_receipts(&receipts), - correlations: canonical_receipt_correlations(&correlations), + receipts: canonical_tick_receipts(&receipts)?, + correlations: canonical_receipt_correlations(&correlations)?, }) } @@ -849,8 +850,8 @@ where correlations.extend(recovered.correlations); } Ok(WscReceiptCorrelationRecords { - receipts: canonical_tick_receipts(&receipts), - correlations: canonical_receipt_correlations(&correlations), + receipts: canonical_tick_receipts(&receipts)?, + correlations: canonical_receipt_correlations(&correlations)?, }) } @@ -973,6 +974,37 @@ pub fn retention_records_from_wsc_envelope( }) } +/// Recovers retained material and reading records from committed WSC store envelopes. +/// +/// # Errors +/// +/// Returns a typed WSC store obstruction when a committed retention envelope is +/// malformed. +pub fn retention_records_from_wsc_store

( + store: &P, +) -> Result +where + P: WscStorePort + ?Sized, +{ + let mut materials = Vec::new(); + let mut readings = Vec::new(); + for envelope_id in store.list_envelopes() { + let envelope = store.read_envelope(envelope_id)?; + if envelope.record_kind() != WscStoreRecordKind::RetainedEvidence + || !envelope_has_schema(&envelope, WSC_RETENTION_SCHEMA)? + { + continue; + } + let recovered = retention_records_from_wsc_envelope(&envelope)?; + materials.extend(recovered.materials); + readings.extend(recovered.readings); + } + Ok(WscRetentionRecords { + materials: canonical_retained_material_records(&materials), + readings: canonical_reading_ref_records(&readings), + }) +} + fn read_array(bytes: &[u8], offset: usize) -> Result<[u8; N], WscStoreObstruction> { let end = offset .checked_add(N) @@ -1087,19 +1119,70 @@ fn envelope_has_schema( Ok(file.schema_hash() == &make_type_id(schema).0) } -fn canonical_tick_receipts(records: &[TickReceiptRecord]) -> Vec { +fn canonical_tick_receipts( + records: &[TickReceiptRecord], +) -> Result, WscStoreObstruction> { let mut by_receipt = BTreeMap::new(); + let mut by_submission = BTreeMap::new(); + let mut by_ticket = BTreeMap::new(); for record in records { + if let Some(existing) = by_receipt.get(&record.receipt_digest) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.receipt_digest), + )); + } + } + if let Some(existing) = by_submission.get(&record.submission_id) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.submission_id), + )); + } + } + if let Some(existing) = by_ticket.get(&record.ticket_digest) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.ticket_digest), + )); + } + } by_receipt.insert(record.receipt_digest, *record); + by_submission.insert(record.submission_id, *record); + by_ticket.insert(record.ticket_digest, *record); } - by_receipt.into_values().collect() + Ok(by_receipt.into_values().collect()) } fn canonical_receipt_correlations( records: &[WalReceiptCorrelationRecord], -) -> Vec { +) -> Result, WscStoreObstruction> { let mut by_correlation = BTreeMap::new(); + let mut by_submission = BTreeMap::new(); + let mut by_ticket = BTreeMap::new(); + let mut by_receipt = BTreeMap::new(); for record in records { + if let Some(existing) = by_submission.get(&record.submission_id) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.submission_id), + )); + } + } + if let Some(existing) = by_ticket.get(&record.ticket_digest) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.ticket_digest), + )); + } + } + if let Some(existing) = by_receipt.get(&record.receipt_digest) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.receipt_digest), + )); + } + } by_correlation.insert( ( record.submission_id, @@ -1108,8 +1191,11 @@ fn canonical_receipt_correlations( ), *record, ); + by_submission.insert(record.submission_id, *record); + by_ticket.insert(record.ticket_digest, *record); + by_receipt.insert(record.receipt_digest, *record); } - by_correlation.into_values().collect() + Ok(by_correlation.into_values().collect()) } fn validate_wsc_causal_history_records( diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 4ca5a8d2..5510446e 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -17,9 +17,10 @@ use warp_core::wsc::{ accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store, accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope, receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope, - retention_records_from_wsc_envelope, retention_records_to_wsc_envelope, - validate_wsc_causal_history_store, write_wsc_one_warp, InMemoryWscStore, OneWarpInput, - WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind, WscStoreSubject, + retention_records_from_wsc_envelope, retention_records_from_wsc_store, + retention_records_to_wsc_envelope, validate_wsc_causal_history_store, write_wsc_one_warp, + InMemoryWscStore, OneWarpInput, WscStoreEnvelope, WscStoreObstructionKind, WscStorePort, + WscStoreRecordKind, WscStoreSubject, }; #[test] @@ -219,6 +220,23 @@ fn receipt_correlation_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn receipt_correlation_records_reject_conflicting_duplicate_receipt() { + let receipt = tick_receipt(7, 17, 27, WalTickDecision::Applied); + let conflicting_receipt = tick_receipt(8, 18, 27, WalTickDecision::Obstructed); + + let obstruction = receipt_correlation_records_to_wsc_envelope( + &[receipt, conflicting_receipt], + &[receipt_correlation(7, 17, 27)], + ) + .expect_err("conflicting duplicate receipt obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::DuplicateEnvelopeMismatch + ); +} + #[test] fn decided_submissions_recover_from_committed_wsc_store() { let applied = submission_acceptance(8, 38); @@ -425,6 +443,29 @@ fn retention_records_round_trip_through_wsc_envelope() { assert_eq!(obstruction.posture, EvidenceMaterialPosture::Missing); } +#[test] +fn retention_records_recover_from_committed_wsc_store() { + let material = retained_material( + 33, + 43, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + ); + let reading = reading_ref(53, 43, 63, 73, EvidenceMaterialPosture::Present); + let mut store = InMemoryWscStore::default(); + store + .write_envelope( + retention_records_to_wsc_envelope(&[material], &[reading]) + .expect("retention WSC envelope"), + ) + .expect("committed retention WSC envelope"); + + let recovered = retention_records_from_wsc_store(&store).expect("recovered retention"); + + assert_eq!(recovered.materials, vec![material]); + assert_eq!(recovered.readings, vec![reading]); +} + fn fixture_wsc_bytes(tick: u64) -> Vec { let input = OneWarpInput { warp_id: [1; 32], From 20f7562d01ef0c70416f38629ae27eac4cbe2cc5 Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 14:23:49 -0700 Subject: [PATCH 11/13] Fix: reject conflicting retained WSC evidence --- CHANGELOG.md | 3 + crates/warp-core/src/causal_wal.rs | 54 +++++++- crates/warp-core/src/wsc/store.rs | 48 +++++-- crates/warp-core/tests/wsc_store_tests.rs | 157 +++++++++++++++++++++- 4 files changed, 238 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dda17ce8..94bbe528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -587,6 +587,9 @@ Applied, Rejected, Obstructed}` with receipt evidence and typed contract ### Fixed +- `warp-core` WSC retained-evidence recovery now rejects conflicting duplicate + retained material digests and reading ids instead of letting recovery or the + recovered retention index silently overwrite evidence identity collisions. - `echo-cli wal doctor` now inspects a real filesystem WAL root through Echo's read-only filesystem WAL doctor instead of reporting a fresh empty in-memory store. The command accepts an optional WAL root path, defaults to diff --git a/crates/warp-core/src/causal_wal.rs b/crates/warp-core/src/causal_wal.rs index f7cbdf3e..6714bdeb 100644 --- a/crates/warp-core/src/causal_wal.rs +++ b/crates/warp-core/src/causal_wal.rs @@ -2369,34 +2369,74 @@ pub struct RecoveredRetentionIndex { pub readings_by_semantic_coordinate: BTreeMap>, } +/// Error returned while building a recovered retention index. +#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)] +pub enum RecoveredRetentionIndexError { + /// A material digest mapped to conflicting retained material records. + #[error("retained material digest maps to conflicting records")] + ConflictingMaterialDigest { + /// Conflicting retained material digest. + material_digest: Hash, + }, + /// A reading id mapped to conflicting reading reference records. + #[error("retained reading id maps to conflicting records")] + ConflictingReadingId { + /// Conflicting reading id. + reading_id: Hash, + }, +} + impl RecoveredRetentionIndex { /// Builds a recovered retention index from retained material and reading records. - #[must_use] - pub fn from_retention_records(materials: I, readings: J) -> Self + /// + /// # Errors + /// + /// Returns an error when retained evidence records carry conflicting + /// material digests or reading ids. + pub fn from_retention_records( + materials: I, + readings: J, + ) -> Result where I: IntoIterator, J: IntoIterator, { let mut index = Self::default(); for record in materials { + if let Some(existing) = index.material_by_digest.get(&record.material_digest) { + if existing != &record { + return Err(RecoveredRetentionIndexError::ConflictingMaterialDigest { + material_digest: record.material_digest, + }); + } + } else { + index + .material_by_digest + .insert(record.material_digest, record); + } index .material_by_semantic_coordinate .entry(record.semantic_coordinate_digest) .or_default() .insert(record.material_digest); - index - .material_by_digest - .insert(record.material_digest, record); } for record in readings { + if let Some(existing) = index.reading_by_id.get(&record.reading_id) { + if existing != &record { + return Err(RecoveredRetentionIndexError::ConflictingReadingId { + reading_id: record.reading_id, + }); + } + } else { + index.reading_by_id.insert(record.reading_id, record); + } index .readings_by_semantic_coordinate .entry(record.semantic_coordinate_digest) .or_default() .insert(record.reading_id); - index.reading_by_id.insert(record.reading_id, record); } - index + Ok(index) } } diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 9a0785e2..9ca35be3 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -884,13 +884,14 @@ where /// /// # Errors /// -/// Returns a typed obstruction when generated WSC material fails validation. +/// Returns a typed obstruction when retained evidence identities conflict or +/// generated WSC material fails validation. pub fn retention_records_to_wsc_envelope( materials: &[RetainedMaterialRecord], readings: &[ReadingRefRecord], ) -> Result { - let materials = canonical_retained_material_records(materials); - let readings = canonical_reading_ref_records(readings); + let materials = canonical_retained_material_records(materials)?; + let readings = canonical_reading_ref_records(readings)?; let mut store = GraphStore::new(make_warp_id(WSC_RETENTION_WARP)); let root = make_node_id(WSC_RETENTION_ROOT); store.insert_node( @@ -933,7 +934,8 @@ pub fn retention_records_to_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when the envelope is not retained -/// evidence material or when record payloads are malformed. +/// evidence material, when record payloads are malformed, or when retained +/// evidence identities conflict. pub fn retention_records_from_wsc_envelope( envelope: &WscStoreEnvelope, ) -> Result { @@ -969,8 +971,8 @@ pub fn retention_records_from_wsc_envelope( } } Ok(WscRetentionRecords { - materials: canonical_retained_material_records(&materials), - readings: canonical_reading_ref_records(&readings), + materials: canonical_retained_material_records(&materials)?, + readings: canonical_reading_ref_records(&readings)?, }) } @@ -979,7 +981,7 @@ pub fn retention_records_from_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when a committed retention envelope is -/// malformed. +/// malformed or when retained evidence identities conflict. pub fn retention_records_from_wsc_store

( store: &P, ) -> Result @@ -1000,8 +1002,8 @@ where readings.extend(recovered.readings); } Ok(WscRetentionRecords { - materials: canonical_retained_material_records(&materials), - readings: canonical_reading_ref_records(&readings), + materials: canonical_retained_material_records(&materials)?, + readings: canonical_reading_ref_records(&readings)?, }) } @@ -1336,20 +1338,40 @@ fn receipt_material_edge_id(node_id: &Hash) -> EdgeId { fn canonical_retained_material_records( records: &[RetainedMaterialRecord], -) -> Vec { +) -> Result, WscStoreObstruction> { let mut by_payload = BTreeMap::new(); + let mut by_material_digest = BTreeMap::new(); for record in records { + if let Some(existing) = by_material_digest.get(&record.material_digest) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.material_digest), + )); + } + } by_payload.insert(record.to_payload_bytes(), *record); + by_material_digest.insert(record.material_digest, *record); } - by_payload.into_values().collect() + Ok(by_payload.into_values().collect()) } -fn canonical_reading_ref_records(records: &[ReadingRefRecord]) -> Vec { +fn canonical_reading_ref_records( + records: &[ReadingRefRecord], +) -> Result, WscStoreObstruction> { let mut by_payload = BTreeMap::new(); + let mut by_reading_id = BTreeMap::new(); for record in records { + if let Some(existing) = by_reading_id.get(&record.reading_id) { + if existing != record { + return Err(WscStoreObstruction::duplicate_mismatch( + WscStoreEnvelopeId::from_hash(record.reading_id), + )); + } + } by_payload.insert(record.to_payload_bytes(), *record); + by_reading_id.insert(record.reading_id, *record); } - by_payload.into_values().collect() + Ok(by_payload.into_values().collect()) } fn insert_retention_record_node( diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 5510446e..5a3bb54d 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -8,9 +8,9 @@ use std::collections::BTreeSet; use warp_core::causal_wal::{ retained_material_obstructions, EvidenceMaterialPosture, MissingMaterialScope, - ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, RecoveredSubmissionIndex, - RecoveredSubmissionPosture, RetainedMaterialKind, RetainedMaterialRecord, - SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, + ReadingRefRecord, RecoveredReceiptIndex, RecoveredRetentionIndex, RecoveredRetentionIndexError, + RecoveredSubmissionIndex, RecoveredSubmissionPosture, RetainedMaterialKind, + RetainedMaterialRecord, SubmissionAcceptanceRecord, SubmissionRetryPosture, TickReceiptRecord, WalReceiptCorrelationRecord, WalTickDecision, }; use warp_core::wsc::{ @@ -415,7 +415,8 @@ fn retention_records_round_trip_through_wsc_envelope() { assert_eq!(recovered.readings, vec![reading]); let index = - RecoveredRetentionIndex::from_retention_records(recovered.materials, recovered.readings); + RecoveredRetentionIndex::from_retention_records(recovered.materials, recovered.readings) + .expect("recovered retention index"); assert_eq!(index.material_by_digest.get(&[31; 32]), Some(&material)); assert_eq!( index.material_by_digest.get(&[32; 32]), @@ -466,6 +467,154 @@ fn retention_records_recover_from_committed_wsc_store() { assert_eq!(recovered.readings, vec![reading]); } +#[test] +fn retention_records_from_committed_wsc_store_rejects_conflicting_material_digest() { + let material = retained_material( + 34, + 44, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + ); + let conflicting_material = retained_material( + 34, + 45, + RetainedMaterialKind::ReadingPayload, + EvidenceMaterialPosture::Missing, + ); + let mut store = InMemoryWscStore::default(); + store + .write_envelope( + retention_records_to_wsc_envelope(&[material], &[]).expect("retention WSC envelope"), + ) + .expect("committed retention WSC envelope"); + store + .write_envelope( + retention_records_to_wsc_envelope(&[conflicting_material], &[]) + .expect("conflicting retention WSC envelope"), + ) + .expect("committed conflicting retention WSC envelope"); + + let obstruction = retention_records_from_wsc_store(&store) + .expect_err("conflicting material digest obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::DuplicateEnvelopeMismatch + ); +} + +#[test] +fn retention_records_from_committed_wsc_store_rejects_conflicting_reading_id() { + let reading = reading_ref(54, 44, 64, 74, EvidenceMaterialPosture::Present); + let conflicting_reading = reading_ref(54, 45, 65, 75, EvidenceMaterialPosture::Missing); + let mut store = InMemoryWscStore::default(); + store + .write_envelope( + retention_records_to_wsc_envelope(&[], &[reading]).expect("retention WSC envelope"), + ) + .expect("committed retention WSC envelope"); + store + .write_envelope( + retention_records_to_wsc_envelope(&[], &[conflicting_reading]) + .expect("conflicting retention WSC envelope"), + ) + .expect("committed conflicting retention WSC envelope"); + + let obstruction = + retention_records_from_wsc_store(&store).expect_err("conflicting reading id obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::DuplicateEnvelopeMismatch + ); +} + +#[test] +fn retention_records_reject_conflicting_duplicate_material_digest() { + let material = retained_material( + 34, + 44, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + ); + let conflicting_material = retained_material( + 34, + 45, + RetainedMaterialKind::ReadingPayload, + EvidenceMaterialPosture::Missing, + ); + + let obstruction = retention_records_to_wsc_envelope(&[material, conflicting_material], &[]) + .expect_err("conflicting duplicate material digest obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::DuplicateEnvelopeMismatch + ); +} + +#[test] +fn retention_records_reject_conflicting_duplicate_reading_id() { + let reading = reading_ref(54, 44, 64, 74, EvidenceMaterialPosture::Present); + let conflicting_reading = reading_ref(54, 45, 65, 75, EvidenceMaterialPosture::Missing); + + let obstruction = retention_records_to_wsc_envelope(&[], &[reading, conflicting_reading]) + .expect_err("conflicting duplicate reading id obstructs"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::DuplicateEnvelopeMismatch + ); +} + +#[test] +fn recovered_retention_index_rejects_conflicting_material_digest() { + let material = retained_material( + 35, + 45, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + ); + let conflicting_material = retained_material( + 35, + 46, + RetainedMaterialKind::ReadingPayload, + EvidenceMaterialPosture::Missing, + ); + + let error = RecoveredRetentionIndex::from_retention_records( + [material, conflicting_material], + Vec::::new(), + ) + .expect_err("conflicting material digest obstructs"); + + assert_eq!( + error, + RecoveredRetentionIndexError::ConflictingMaterialDigest { + material_digest: [35; 32] + } + ); +} + +#[test] +fn recovered_retention_index_rejects_conflicting_reading_id() { + let reading = reading_ref(55, 45, 65, 75, EvidenceMaterialPosture::Present); + let conflicting_reading = reading_ref(55, 46, 66, 76, EvidenceMaterialPosture::Missing); + + let error = RecoveredRetentionIndex::from_retention_records( + Vec::::new(), + [reading, conflicting_reading], + ) + .expect_err("conflicting reading id obstructs"); + + assert_eq!( + error, + RecoveredRetentionIndexError::ConflictingReadingId { + reading_id: [55; 32] + } + ); +} + fn fixture_wsc_bytes(tick: u64) -> Vec { let input = OneWarpInput { warp_id: [1; 32], From 93b8faf1eb5f1bbf6add386f40aaee91ef116c3a Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 18:01:18 -0700 Subject: [PATCH 12/13] Fix: reject WSC basis digest mismatches --- CHANGELOG.md | 4 + crates/warp-core/src/wsc/store.rs | 54 +++++++++++-- crates/warp-core/tests/wsc_store_tests.rs | 98 +++++++++++++++++++++++ 3 files changed, 148 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94bbe528..5f2b6e54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -590,6 +590,10 @@ Applied, Rejected, Obstructed}` with receipt evidence and typed contract - `warp-core` WSC retained-evidence recovery now rejects conflicting duplicate retained material digests and reading ids instead of letting recovery or the recovered retention index silently overwrite evidence identity collisions. +- `warp-core` WSC causal-history and retained-evidence recovery now rejects + envelopes whose recorded basis digest does not match the canonical digest of + recovered records, returning typed `BasisDigestMismatch` obstruction evidence + instead of admitting stale or forged envelope bindings. - `echo-cli wal doctor` now inspects a real filesystem WAL root through Echo's read-only filesystem WAL doctor instead of reporting a fresh empty in-memory store. The command accepts an optional WAL root path, defaults to diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 9ca35be3..07ce9b07 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -154,6 +154,8 @@ pub enum WscStoreObstructionKind { InvalidWsc, /// Encoded envelope digest did not match its payload. DigestMismatch, + /// Envelope basis digest did not match recovered canonical records. + BasisDigestMismatch, /// Existing envelope id maps to different material. DuplicateEnvelopeMismatch, /// Envelope material exists without a matching commit marker, or vice versa. @@ -195,6 +197,13 @@ impl WscStoreObstruction { } } + fn basis_digest_mismatch(expected: Hash, actual: Hash) -> Self { + Self { + kind: WscStoreObstructionKind::BasisDigestMismatch, + subject: WscStoreSubject::EnvelopeDigest { expected, actual }, + } + } + fn missing_envelope(envelope_id: WscStoreEnvelopeId) -> Self { Self { kind: WscStoreObstructionKind::MissingEnvelope, @@ -671,7 +680,8 @@ pub fn accepted_submission_records_to_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when the envelope is not accepted -/// submission causal-history material or when record payloads are malformed. +/// submission causal-history material, when record payloads are malformed, or +/// when the envelope basis digest does not match recovered canonical records. pub fn accepted_submission_records_from_wsc_envelope( envelope: &WscStoreEnvelope, ) -> Result, WscStoreObstruction> { @@ -700,7 +710,15 @@ pub fn accepted_submission_records_from_wsc_envelope( records.push(record); } } - canonical_accepted_submission_records(&records) + let records = canonical_accepted_submission_records(&records)?; + let basis_digest = accepted_submission_basis_digest(&records); + if envelope.basis_digest() != &basis_digest { + return Err(WscStoreObstruction::basis_digest_mismatch( + *envelope.basis_digest(), + basis_digest, + )); + } + Ok(records) } /// Recovers accepted submission records from committed WSC store envelopes. @@ -781,7 +799,8 @@ pub fn receipt_correlation_records_to_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when the envelope is not receipt -/// correlation material or when record payloads are malformed. +/// correlation material, when record payloads are malformed, or when the +/// envelope basis digest does not match recovered canonical records. pub fn receipt_correlation_records_from_wsc_envelope( envelope: &WscStoreEnvelope, ) -> Result { @@ -818,9 +837,18 @@ pub fn receipt_correlation_records_from_wsc_envelope( } } } + let receipts = canonical_tick_receipts(&receipts)?; + let correlations = canonical_receipt_correlations(&correlations)?; + let basis_digest = receipt_correlation_basis_digest(&receipts, &correlations); + if envelope.basis_digest() != &basis_digest { + return Err(WscStoreObstruction::basis_digest_mismatch( + *envelope.basis_digest(), + basis_digest, + )); + } Ok(WscReceiptCorrelationRecords { - receipts: canonical_tick_receipts(&receipts)?, - correlations: canonical_receipt_correlations(&correlations)?, + receipts, + correlations, }) } @@ -935,7 +963,8 @@ pub fn retention_records_to_wsc_envelope( /// /// Returns a typed WSC store obstruction when the envelope is not retained /// evidence material, when record payloads are malformed, or when retained -/// evidence identities conflict. +/// evidence identities conflict, or when the envelope basis digest does not +/// match recovered canonical records. pub fn retention_records_from_wsc_envelope( envelope: &WscStoreEnvelope, ) -> Result { @@ -970,9 +999,18 @@ pub fn retention_records_from_wsc_envelope( } } } + let materials = canonical_retained_material_records(&materials)?; + let readings = canonical_reading_ref_records(&readings)?; + let basis_digest = retention_basis_digest(&materials, &readings); + if envelope.basis_digest() != &basis_digest { + return Err(WscStoreObstruction::basis_digest_mismatch( + *envelope.basis_digest(), + basis_digest, + )); + } Ok(WscRetentionRecords { - materials: canonical_retained_material_records(&materials)?, - readings: canonical_reading_ref_records(&readings)?, + materials, + readings, }) } diff --git a/crates/warp-core/tests/wsc_store_tests.rs b/crates/warp-core/tests/wsc_store_tests.rs index 5a3bb54d..160b3c4b 100644 --- a/crates/warp-core/tests/wsc_store_tests.rs +++ b/crates/warp-core/tests/wsc_store_tests.rs @@ -170,6 +170,33 @@ fn accepted_submission_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn accepted_submission_records_reject_basis_digest_mismatch() { + let envelope = accepted_submission_records_to_wsc_envelope(&[submission_acceptance(4, 14)]) + .expect("accepted submission WSC envelope"); + let forged = WscStoreEnvelope::validated( + envelope.record_kind(), + [99; 32], + envelope.wsc_bytes().to_vec(), + ) + .expect("basis-forged accepted submission WSC envelope"); + + let obstruction = accepted_submission_records_from_wsc_envelope(&forged) + .expect_err("basis mismatch obstructs accepted submission recovery"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::BasisDigestMismatch + ); + assert_eq!( + obstruction.subject, + WscStoreSubject::EnvelopeDigest { + expected: [99; 32], + actual: *envelope.basis_digest() + } + ); +} + #[test] fn pending_submission_recovers_from_committed_wsc_store_without_decision() { let pending = submission_acceptance(3, 33); @@ -220,6 +247,36 @@ fn receipt_correlation_records_round_trip_through_wsc_envelope() { ); } +#[test] +fn receipt_correlation_records_reject_basis_digest_mismatch() { + let envelope = receipt_correlation_records_to_wsc_envelope( + &[tick_receipt(12, 22, 32, WalTickDecision::Applied)], + &[receipt_correlation(12, 22, 32)], + ) + .expect("receipt correlation WSC envelope"); + let forged = WscStoreEnvelope::validated( + envelope.record_kind(), + [98; 32], + envelope.wsc_bytes().to_vec(), + ) + .expect("basis-forged receipt correlation WSC envelope"); + + let obstruction = receipt_correlation_records_from_wsc_envelope(&forged) + .expect_err("basis mismatch obstructs receipt correlation recovery"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::BasisDigestMismatch + ); + assert_eq!( + obstruction.subject, + WscStoreSubject::EnvelopeDigest { + expected: [98; 32], + actual: *envelope.basis_digest() + } + ); +} + #[test] fn receipt_correlation_records_reject_conflicting_duplicate_receipt() { let receipt = tick_receipt(7, 17, 27, WalTickDecision::Applied); @@ -444,6 +501,47 @@ fn retention_records_round_trip_through_wsc_envelope() { assert_eq!(obstruction.posture, EvidenceMaterialPosture::Missing); } +#[test] +fn retention_records_reject_basis_digest_mismatch() { + let envelope = retention_records_to_wsc_envelope( + &[retained_material( + 36, + 46, + RetainedMaterialKind::ReadingEnvelope, + EvidenceMaterialPosture::Present, + )], + &[reading_ref( + 56, + 46, + 66, + 76, + EvidenceMaterialPosture::Present, + )], + ) + .expect("retention WSC envelope"); + let forged = WscStoreEnvelope::validated( + envelope.record_kind(), + [97; 32], + envelope.wsc_bytes().to_vec(), + ) + .expect("basis-forged retention WSC envelope"); + + let obstruction = retention_records_from_wsc_envelope(&forged) + .expect_err("basis mismatch obstructs retention recovery"); + + assert_eq!( + obstruction.kind, + WscStoreObstructionKind::BasisDigestMismatch + ); + assert_eq!( + obstruction.subject, + WscStoreSubject::EnvelopeDigest { + expected: [97; 32], + actual: *envelope.basis_digest() + } + ); +} + #[test] fn retention_records_recover_from_committed_wsc_store() { let material = retained_material( From f32c53d3f161e31a2101e17f1c1fc3e2b1ef6447 Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 2 Jun 2026 18:05:36 -0700 Subject: [PATCH 13/13] Fix: clarify WSC digest obstruction docs --- crates/warp-core/src/wsc/store.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/warp-core/src/wsc/store.rs b/crates/warp-core/src/wsc/store.rs index 07ce9b07..e5a032e7 100644 --- a/crates/warp-core/src/wsc/store.rs +++ b/crates/warp-core/src/wsc/store.rs @@ -124,7 +124,7 @@ pub enum WscStoreSubject { /// Byte offset implicated by the obstruction. offset: usize, }, - /// Encoded bytes carried a digest mismatch. + /// Envelope digest evidence mismatched. EnvelopeDigest { /// Expected digest recorded by the envelope. expected: Hash, @@ -729,7 +729,8 @@ pub fn accepted_submission_records_from_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when a committed accepted-submission -/// envelope is malformed or conflicting duplicate submission material is found. +/// envelope is malformed, basis-mismatched, or conflicting duplicate submission +/// material is found. pub fn accepted_submission_records_from_wsc_store

( store: &P, ) -> Result, WscStoreObstruction> @@ -857,7 +858,7 @@ pub fn receipt_correlation_records_from_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when a committed receipt-correlation -/// envelope is malformed. +/// envelope is malformed or basis-mismatched. pub fn receipt_correlation_records_from_wsc_store

( store: &P, ) -> Result @@ -1019,7 +1020,7 @@ pub fn retention_records_from_wsc_envelope( /// # Errors /// /// Returns a typed WSC store obstruction when a committed retention envelope is -/// malformed or when retained evidence identities conflict. +/// malformed, basis-mismatched, or when retained evidence identities conflict. pub fn retention_records_from_wsc_store

( store: &P, ) -> Result