Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,13 @@ Applied, Rejected, Obstructed}` with receipt evidence and typed contract

### Fixed

- `warp-core` WSC retained-evidence recovery now rejects conflicting duplicate
retained material digests and reading ids instead of letting recovery or the
recovered retention index silently overwrite evidence identity collisions.
- `warp-core` WSC causal-history and retained-evidence recovery now rejects
envelopes whose recorded basis digest does not match the canonical digest of
recovered records, returning typed `BasisDigestMismatch` obstruction evidence
instead of admitting stale or forged envelope bindings.
- `echo-cli wal doctor` now inspects a real filesystem WAL root through
Echo's read-only filesystem WAL doctor instead of reporting a fresh empty
in-memory store. The command accepts an optional WAL root path, defaults to
Expand Down
229 changes: 198 additions & 31 deletions crates/warp-core/src/causal_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,89 @@ pub struct RecoveredSubmissionIndex {
}

impl RecoveredSubmissionIndex {
/// Builds a recovered index from accepted submission records.
///
/// Every recovered record starts as accepted pending because no scheduler
/// decision material is supplied by this constructor.
///
/// # Errors
///
/// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one
/// submission id is associated with conflicting canonical envelope digests.
pub fn from_acceptance_records<I>(records: I) -> Result<Self, WalRecoveryIndexError>
where
I: IntoIterator<Item = SubmissionAcceptanceRecord>,
{
let mut index = Self::default();
for record in records {
index.insert_acceptance_record(record)?;
}
Ok(index)
}

/// Builds a recovered index from accepted submission and tick receipt records.
///
/// Accepted submissions with no matching receipt remain accepted pending.
/// Matching receipt records move accepted submissions to their decided or
/// obstructed posture without inventing retries.
///
/// # Errors
///
/// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one
/// submission id is associated with conflicting canonical envelope digests.
pub fn from_acceptance_and_receipt_records<I, J>(
acceptances: I,
receipts: J,
) -> Result<Self, WalRecoveryIndexError>
where
I: IntoIterator<Item = SubmissionAcceptanceRecord>,
J: IntoIterator<Item = TickReceiptRecord>,
{
let mut index = Self::from_acceptance_records(acceptances)?;
for receipt in receipts {
index.apply_tick_receipt_record(receipt);
}
Ok(index)
}

fn insert_acceptance_record(
&mut self,
record: SubmissionAcceptanceRecord,
) -> Result<(), WalRecoveryIndexError> {
if let Some(existing) = self.submissions.get(&record.submission_id) {
if existing.acceptance.canonical_envelope_digest != record.canonical_envelope_digest {
return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict {
submission_id: record.submission_id,
});
}
}
self.envelope_to_submissions
.entry(record.canonical_envelope_digest)
.or_default()
.insert(record.submission_id);
self.submissions
.entry(record.submission_id)
.or_insert(RecoveredSubmissionEntry {
acceptance: record,
posture: RecoveredSubmissionPosture::AcceptedPending,
receipt_digest: None,
});
Ok(())
}

fn apply_tick_receipt_record(&mut self, receipt: TickReceiptRecord) {
if let Some(entry) = self.submissions.get_mut(&receipt.submission_id) {
entry.posture = match receipt.decision {
WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied,
WalTickDecision::RejectedFootprintConflict => {
RecoveredSubmissionPosture::DecidedRejected
}
WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed,
};
entry.receipt_digest = Some(receipt.receipt_digest);
}
}

/// Returns a recovered submission entry.
#[must_use]
pub fn get(&self, submission_id: &Hash) -> Option<&RecoveredSubmissionEntry> {
Expand Down Expand Up @@ -2231,6 +2314,48 @@ pub struct RecoveredReceiptIndex {
pub decisions_by_receipt: BTreeMap<Hash, WalTickDecision>,
}

impl RecoveredReceiptIndex {
/// Builds a recovered receipt index from receipt and correlation records.
///
/// Tick receipt records carry decision posture. Correlation records can
/// restore ticket/submission/receipt lookup handles when decision material
/// is not present in the same source.
#[must_use]
pub fn from_receipt_correlation_records<I, J>(receipts: I, correlations: J) -> Self
where
I: IntoIterator<Item = TickReceiptRecord>,
J: IntoIterator<Item = WalReceiptCorrelationRecord>,
{
let mut index = Self::default();
for receipt in receipts {
index
.receipt_by_submission
.insert(receipt.submission_id, receipt.receipt_digest);
index
.receipt_by_ticket
.insert(receipt.ticket_digest, receipt.receipt_digest);
index
.ticket_by_submission
.insert(receipt.submission_id, receipt.ticket_digest);
index
.decisions_by_receipt
.insert(receipt.receipt_digest, receipt.decision);
}
for correlation in correlations {
index
.receipt_by_submission
.insert(correlation.submission_id, correlation.receipt_digest);
index
.receipt_by_ticket
.insert(correlation.ticket_digest, correlation.receipt_digest);
index
.ticket_by_submission
.insert(correlation.submission_id, correlation.ticket_digest);
}
index
}
}

/// Recovered retained material and reading index.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RecoveredRetentionIndex {
Expand All @@ -2244,6 +2369,77 @@ pub struct RecoveredRetentionIndex {
pub readings_by_semantic_coordinate: BTreeMap<Hash, BTreeSet<Hash>>,
}

/// Error returned while building a recovered retention index.
#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
pub enum RecoveredRetentionIndexError {
/// A material digest mapped to conflicting retained material records.
#[error("retained material digest maps to conflicting records")]
ConflictingMaterialDigest {
/// Conflicting retained material digest.
material_digest: Hash,
},
/// A reading id mapped to conflicting reading reference records.
#[error("retained reading id maps to conflicting records")]
ConflictingReadingId {
/// Conflicting reading id.
reading_id: Hash,
},
}

impl RecoveredRetentionIndex {
/// Builds a recovered retention index from retained material and reading records.
///
/// # Errors
///
/// Returns an error when retained evidence records carry conflicting
/// material digests or reading ids.
pub fn from_retention_records<I, J>(
materials: I,
readings: J,
) -> Result<Self, RecoveredRetentionIndexError>
Comment on lines +2396 to +2399
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route committed WAL recovery through retention checks

When conflicting retained evidence comes from committed WAL frames rather than a WSC envelope, this new checking constructor is bypassed: recover_retention_index still builds RecoveredRetentionIndex::default() and inserts directly into material_by_digest/reading_by_id, so a later frame silently overwrites an earlier record with the same material digest or reading id. That leaves WAL doctor/recovery accepting exactly the identity collision this API now rejects; please route WAL recovery through these checks or share the duplicate-detection logic.

Useful? React with 👍 / 👎.

where
I: IntoIterator<Item = RetainedMaterialRecord>,
J: IntoIterator<Item = ReadingRefRecord>,
{
let mut index = Self::default();
for record in materials {
if let Some(existing) = index.material_by_digest.get(&record.material_digest) {
if existing != &record {
return Err(RecoveredRetentionIndexError::ConflictingMaterialDigest {
material_digest: record.material_digest,
});
}
} else {
index
.material_by_digest
.insert(record.material_digest, record);
}
index
.material_by_semantic_coordinate
.entry(record.semantic_coordinate_digest)
.or_default()
.insert(record.material_digest);
}
for record in readings {
if let Some(existing) = index.reading_by_id.get(&record.reading_id) {
if existing != &record {
return Err(RecoveredRetentionIndexError::ConflictingReadingId {
reading_id: record.reading_id,
});
}
} else {
index.reading_by_id.insert(record.reading_id, record);
}
index
.readings_by_semantic_coordinate
.entry(record.semantic_coordinate_digest)
.or_default()
.insert(record.reading_id);
}
Ok(index)
}
}

/// Retained material obstruction.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RetainedMaterialObstruction {
Expand Down Expand Up @@ -4058,41 +4254,12 @@ pub fn recover_submission_index(
let record = SubmissionAcceptanceRecord::from_payload_bytes(
&frame.payload.canonical_bytes,
)?;
if let Some(existing) = index.submissions.get(&record.submission_id) {
if existing.acceptance.canonical_envelope_digest
!= record.canonical_envelope_digest
{
return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict {
submission_id: record.submission_id,
});
}
}
index
.envelope_to_submissions
.entry(record.canonical_envelope_digest)
.or_default()
.insert(record.submission_id);
index.submissions.entry(record.submission_id).or_insert(
RecoveredSubmissionEntry {
acceptance: record,
posture: RecoveredSubmissionPosture::AcceptedPending,
receipt_digest: None,
},
);
index.insert_acceptance_record(record)?;
}
WalRecordKind::TickReceiptRecorded => {
let receipt =
TickReceiptRecord::from_payload_bytes(&frame.payload.canonical_bytes)?;
if let Some(entry) = index.submissions.get_mut(&receipt.submission_id) {
entry.posture = match receipt.decision {
WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied,
WalTickDecision::RejectedFootprintConflict => {
RecoveredSubmissionPosture::DecidedRejected
}
WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed,
};
entry.receipt_digest = Some(receipt.receipt_digest);
}
index.apply_tick_receipt_record(receipt);
}
_ => {}
}
Expand Down
11 changes: 11 additions & 0 deletions crates/warp-core/src/wsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

pub mod build;
pub mod read;
pub mod store;
pub mod types;
pub mod validate;
pub mod view;
Expand All @@ -68,6 +69,16 @@ pub mod write;
// Re-exports for convenient access
pub use build::build_one_warp_input;
pub use read::ReadError;
pub use store::{
accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store,
accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope,
receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope,
retention_records_from_wsc_envelope, retention_records_from_wsc_store,
retention_records_to_wsc_envelope, validate_wsc_causal_history_store, InMemoryWscStore,
WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId,
WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind,
WscStoreSubject, WscStoreWriteReceipt,
};
pub use validate::validate_wsc;
pub use view::{AttachmentRef, WarpView, WscFile};
pub use write::{write_wsc_one_warp, OneWarpInput};
Loading
Loading