Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 158 additions & 31 deletions crates/warp-core/src/causal_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,89 @@ pub struct RecoveredSubmissionIndex {
}

impl RecoveredSubmissionIndex {
/// Builds a recovered index from accepted submission records.
///
/// Every recovered record starts as accepted pending because no scheduler
/// decision material is supplied by this constructor.
///
/// # Errors
///
/// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one
/// submission id is associated with conflicting canonical envelope digests.
pub fn from_acceptance_records<I>(records: I) -> Result<Self, WalRecoveryIndexError>
where
I: IntoIterator<Item = SubmissionAcceptanceRecord>,
{
let mut index = Self::default();
for record in records {
index.insert_acceptance_record(record)?;
}
Ok(index)
}

/// Builds a recovered index from accepted submission and tick receipt records.
///
/// Accepted submissions with no matching receipt remain accepted pending.
/// Matching receipt records move accepted submissions to their decided or
/// obstructed posture without inventing retries.
///
/// # Errors
///
/// Returns [`WalRecoveryIndexError::SubmissionEnvelopeConflict`] when one
/// submission id is associated with conflicting canonical envelope digests.
pub fn from_acceptance_and_receipt_records<I, J>(
acceptances: I,
receipts: J,
) -> Result<Self, WalRecoveryIndexError>
where
I: IntoIterator<Item = SubmissionAcceptanceRecord>,
J: IntoIterator<Item = TickReceiptRecord>,
{
let mut index = Self::from_acceptance_records(acceptances)?;
for receipt in receipts {
index.apply_tick_receipt_record(receipt);
}
Ok(index)
}

fn insert_acceptance_record(
&mut self,
record: SubmissionAcceptanceRecord,
) -> Result<(), WalRecoveryIndexError> {
if let Some(existing) = self.submissions.get(&record.submission_id) {
if existing.acceptance.canonical_envelope_digest != record.canonical_envelope_digest {
return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict {
submission_id: record.submission_id,
});
}
}
self.envelope_to_submissions
.entry(record.canonical_envelope_digest)
.or_default()
.insert(record.submission_id);
self.submissions
.entry(record.submission_id)
.or_insert(RecoveredSubmissionEntry {
acceptance: record,
posture: RecoveredSubmissionPosture::AcceptedPending,
receipt_digest: None,
});
Ok(())
}

fn apply_tick_receipt_record(&mut self, receipt: TickReceiptRecord) {
if let Some(entry) = self.submissions.get_mut(&receipt.submission_id) {
entry.posture = match receipt.decision {
WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied,
WalTickDecision::RejectedFootprintConflict => {
RecoveredSubmissionPosture::DecidedRejected
}
WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed,
};
entry.receipt_digest = Some(receipt.receipt_digest);
}
}

/// Returns a recovered submission entry.
#[must_use]
pub fn get(&self, submission_id: &Hash) -> Option<&RecoveredSubmissionEntry> {
Expand Down Expand Up @@ -2231,6 +2314,48 @@ pub struct RecoveredReceiptIndex {
pub decisions_by_receipt: BTreeMap<Hash, WalTickDecision>,
}

impl RecoveredReceiptIndex {
/// Builds a recovered receipt index from receipt and correlation records.
///
/// Tick receipt records carry decision posture. Correlation records can
/// restore ticket/submission/receipt lookup handles when decision material
/// is not present in the same source.
#[must_use]
pub fn from_receipt_correlation_records<I, J>(receipts: I, correlations: J) -> Self
where
I: IntoIterator<Item = TickReceiptRecord>,
J: IntoIterator<Item = WalReceiptCorrelationRecord>,
{
let mut index = Self::default();
for receipt in receipts {
index
.receipt_by_submission
.insert(receipt.submission_id, receipt.receipt_digest);
index
.receipt_by_ticket
.insert(receipt.ticket_digest, receipt.receipt_digest);
index
.ticket_by_submission
.insert(receipt.submission_id, receipt.ticket_digest);
index
.decisions_by_receipt
.insert(receipt.receipt_digest, receipt.decision);
}
for correlation in correlations {
index
.receipt_by_submission
.insert(correlation.submission_id, correlation.receipt_digest);
index
.receipt_by_ticket
.insert(correlation.ticket_digest, correlation.receipt_digest);
index
.ticket_by_submission
.insert(correlation.submission_id, correlation.ticket_digest);
}
index
}
}

/// Recovered retained material and reading index.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RecoveredRetentionIndex {
Expand All @@ -2244,6 +2369,37 @@ pub struct RecoveredRetentionIndex {
pub readings_by_semantic_coordinate: BTreeMap<Hash, BTreeSet<Hash>>,
}

impl RecoveredRetentionIndex {
/// Builds a recovered retention index from retained material and reading records.
#[must_use]
pub fn from_retention_records<I, J>(materials: I, readings: J) -> Self
where
I: IntoIterator<Item = RetainedMaterialRecord>,
J: IntoIterator<Item = ReadingRefRecord>,
{
let mut index = Self::default();
for record in materials {
index
.material_by_semantic_coordinate
.entry(record.semantic_coordinate_digest)
.or_default()
.insert(record.material_digest);
index
.material_by_digest
.insert(record.material_digest, record);
}
for record in readings {
index
.readings_by_semantic_coordinate
.entry(record.semantic_coordinate_digest)
.or_default()
.insert(record.reading_id);
index.reading_by_id.insert(record.reading_id, record);
}
index
}
}

/// Retained material obstruction.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RetainedMaterialObstruction {
Expand Down Expand Up @@ -4058,41 +4214,12 @@ pub fn recover_submission_index(
let record = SubmissionAcceptanceRecord::from_payload_bytes(
&frame.payload.canonical_bytes,
)?;
if let Some(existing) = index.submissions.get(&record.submission_id) {
if existing.acceptance.canonical_envelope_digest
!= record.canonical_envelope_digest
{
return Err(WalRecoveryIndexError::SubmissionEnvelopeConflict {
submission_id: record.submission_id,
});
}
}
index
.envelope_to_submissions
.entry(record.canonical_envelope_digest)
.or_default()
.insert(record.submission_id);
index.submissions.entry(record.submission_id).or_insert(
RecoveredSubmissionEntry {
acceptance: record,
posture: RecoveredSubmissionPosture::AcceptedPending,
receipt_digest: None,
},
);
index.insert_acceptance_record(record)?;
}
WalRecordKind::TickReceiptRecorded => {
let receipt =
TickReceiptRecord::from_payload_bytes(&frame.payload.canonical_bytes)?;
if let Some(entry) = index.submissions.get_mut(&receipt.submission_id) {
entry.posture = match receipt.decision {
WalTickDecision::Applied => RecoveredSubmissionPosture::DecidedApplied,
WalTickDecision::RejectedFootprintConflict => {
RecoveredSubmissionPosture::DecidedRejected
}
WalTickDecision::Obstructed => RecoveredSubmissionPosture::Obstructed,
};
entry.receipt_digest = Some(receipt.receipt_digest);
}
index.apply_tick_receipt_record(receipt);
}
_ => {}
}
Expand Down
11 changes: 11 additions & 0 deletions crates/warp-core/src/wsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

pub mod build;
pub mod read;
pub mod store;
pub mod types;
pub mod validate;
pub mod view;
Expand All @@ -68,6 +69,16 @@ pub mod write;
// Re-exports for convenient access
pub use build::build_one_warp_input;
pub use read::ReadError;
pub use store::{
accepted_submission_records_from_wsc_envelope, accepted_submission_records_from_wsc_store,
accepted_submission_records_to_wsc_envelope, receipt_correlation_records_from_wsc_envelope,
receipt_correlation_records_from_wsc_store, receipt_correlation_records_to_wsc_envelope,
retention_records_from_wsc_envelope, retention_records_from_wsc_store,
retention_records_to_wsc_envelope, validate_wsc_causal_history_store, InMemoryWscStore,
WscReceiptCorrelationRecords, WscRetentionRecords, WscStoreEnvelope, WscStoreEnvelopeId,
WscStoreObstruction, WscStoreObstructionKind, WscStorePort, WscStoreRecordKind,
WscStoreSubject, WscStoreWriteReceipt,
};
pub use validate::validate_wsc;
pub use view::{AttachmentRef, WarpView, WscFile};
pub use write::{write_wsc_one_warp, OneWarpInput};
Loading
Loading