Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion components/controller/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ where
vec![Message::Notice(Notice::Event(message.clone()))]
};

join_all(
let sends = join_all(
itertools::iproduct!(messages_to_send, witness_prefixes).map(
|(message, witness_id)| {
self.send_message_to(
Expand All @@ -224,6 +224,10 @@ where
)
.await;

for res in sends {
res?;
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ where
None
}
});
join_all(to_notify).await;

let results = join_all(to_notify).await;
for res in results {
res?;
}
self.to_notify.clear();

Ok(n)
Expand Down
146 changes: 146 additions & 0 deletions components/controller/tests/notify_witness_publish_errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Regression test for witness notification error handling.
//!
//! `Communication::publish` must propagate `send_message` failures, and
//! `notify_witnesses` must return `Err` and leave `to_notify` unchanged so the
//! caller can retry.

use std::{collections::HashMap, sync::Arc};

use keri_controller::{config::ControllerConfig, controller::Controller, error::ControllerError};
use keri_core::{
actor::{error::ActorError, possible_response::PossibleResponse},
event_message::signed_event_message::{Message, Op},
oobi::{LocationScheme, Oobi, Role, Scheme},
prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
query::query_event::SignedQueryMessage,
signer::{CryptoBox, KeyManager},
transport::test::{TestActor, TestTransport},
};
use tempfile::Builder;
use url::Host;
use witness::{WitnessEscrowConfig, WitnessListener};

/// [`TestActor`] that performs real OOBI resolution like [`WitnessListener`] but
/// always fails witness-bound `send_message` (the path used by `publish`).
struct FailingSendWitness<S: keri_core::oobi_manager::storage::OobiStorageBackend> {
inner: Arc<WitnessListener<S>>,
}

#[async_trait::async_trait]
impl<S> TestActor for FailingSendWitness<S>
where
S: keri_core::oobi_manager::storage::OobiStorageBackend + Send + Sync + 'static,
{
async fn send_message(&self, _msg: Message) -> Result<(), ActorError> {
Err(ActorError::DbError(
"simulated witness transport failure (publish path)".into(),
))
}

async fn send_query(&self, query: SignedQueryMessage) -> Result<PossibleResponse, ActorError> {
TestActor::send_query(self.inner.as_ref(), query).await
}

async fn request_loc_scheme(&self, eid: IdentifierPrefix) -> Result<Vec<Op>, ActorError> {
TestActor::request_loc_scheme(self.inner.as_ref(), eid).await
}

async fn request_end_role(
&self,
cid: IdentifierPrefix,
role: Role,
eid: IdentifierPrefix,
) -> Result<Vec<u8>, ActorError> {
TestActor::request_end_role(self.inner.as_ref(), cid, role, eid).await
}

async fn resolve_oobi(&self, msg: Oobi) -> Result<(), ActorError> {
TestActor::resolve_oobi(self.inner.as_ref(), msg).await
}
}

#[async_std::test]
async fn notify_witnesses_propagates_errors_and_keeps_queue_when_publish_fails() -> Result<(), ControllerError> {
let root = Builder::new().prefix("notify-wit-bug-db").tempdir().unwrap();

let witness = {
let seed = "AK8F6AAiYDpXlWdj2O5F5-6wNCCNJh2A4XOlqwR_HwwH";
let witness_root = Builder::new().prefix("notify-wit-bug-wit-db").tempdir().unwrap();
Arc::new(
WitnessListener::setup_with_redb(
url::Url::parse("http://witness1/").unwrap(),
witness_root.path(),
Some(seed.to_string()),
WitnessEscrowConfig::default(),
)
.unwrap(),
)
};

let wit_id = witness.get_prefix();
let wit_location = LocationScheme {
eid: IdentifierPrefix::Basic(wit_id.clone()),
scheme: Scheme::Http,
url: url::Url::parse("http://witness1/").unwrap(),
};

let transport = {
let mut actors: keri_core::transport::test::TestActorMap = HashMap::new();
actors.insert(
(Host::Domain("witness1".to_string()), 80),
Arc::new(FailingSendWitness {
inner: witness.clone(),
}),
);
TestTransport::new(actors)
};

let controller = Controller::new(ControllerConfig {
db_path: root.path().to_owned(),
transport: Box::new(transport),
..Default::default()
})?;

let km = CryptoBox::new()?;
let pk = BasicPrefix::Ed25519(km.public_key());
let npk = BasicPrefix::Ed25519(km.next_public_key());

let icp_event = controller
.incept(
vec![pk],
vec![npk],
vec![wit_location.clone()],
1,
)
.await?;

let signature = SelfSigningPrefix::Ed25519Sha512(km.sign(icp_event.as_bytes())?);
let mut identifier = controller.finalize_incept(icp_event.as_bytes(), &signature)?;

assert!(
!identifier.to_notify.is_empty(),
"fixture should leave at least one partially witnessed event to publish"
);

let outcome = identifier.notify_witnesses().await;
assert!(
outcome.is_err(),
"notify_witnesses must return Err when witness send_message fails (got {outcome:?})"
);

assert!(
!identifier.to_notify.is_empty(),
"to_notify must be retained when publish fails so the caller can retry (got empty queue)"
);

let kel = witness
.witness_data
.event_storage
.get_kel_messages_with_receipts_all(&identifier.id())?;
assert!(
kel.unwrap_or_default().is_empty(),
"witness should not have accepted the event when send always fails"
);

Ok(())
}
77 changes: 55 additions & 22 deletions components/controller/tests/test_kel_managing.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{collections::HashMap, sync::Arc};

use cesrox::primitives::codes::self_addressing::SelfAddressing;
use keri_core::{
actor::prelude::HashFunction,
oobi::LocationScheme,
prefix::{BasicPrefix, SelfSigningPrefix},
oobi::{LocationScheme, Scheme},
prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
signer::{CryptoBox, KeyManager},
transport::test::TestTransport,
};
use tempfile::Builder;
use url::Host;

use keri_controller::{config::ControllerConfig, controller::Controller, error::ControllerError};
use witness::{WitnessEscrowConfig, WitnessListener};

#[async_std::test]
async fn test_kel_managing() -> Result<(), ControllerError> {
Expand Down Expand Up @@ -77,27 +82,56 @@ async fn test_kel_managing() -> Result<(), ControllerError> {
async fn test_kel_managing_with_witness() -> Result<(), ControllerError> {
let root = Builder::new().prefix("test-db").tempdir().unwrap();

let first_witness_id: BasicPrefix = "BNJJhjUnhlw-lsbYdehzLsX1hJMG9QJlK_wJ5AunJLrM"
.parse()
.unwrap();
// OOBI (Out-Of-Band Introduction) specifies the way how actors can be found.
let first_witness_oobi: LocationScheme = serde_json::from_str(&format!(
r#"{{"eid":{:?},"scheme":"http","url":"http://w1.ea.argo.colossi.network/"}}"#,
first_witness_id
))
.unwrap();

let second_witness_id: BasicPrefix = "BFdfJEbC7__AqZdrXkxmW3THnZmIeJzCGpzJpgN6Ettw"
.parse()
.unwrap();
let second_witness_oobi: LocationScheme = serde_json::from_str(&format!(
r#"{{"eid":{:?},"scheme":"http","url":"http://w2.ea.argo.colossi.network/"}}"#,
second_witness_id
))
.unwrap();
let witness1 = {
let seed = "AK8F6AAiYDpXlWdj2O5F5-6wNCCNJh2A4XOlqwR_HwwH";
let witness_root = Builder::new().prefix("test-kel-wit1-db").tempdir().unwrap();
Arc::new(
WitnessListener::setup_with_redb(
url::Url::parse("http://witness1/").unwrap(),
witness_root.path(),
Some(seed.to_string()),
WitnessEscrowConfig::default(),
)
.unwrap(),
)
};
let witness2 = {
let seed = "AJZ7ZLd7unQ4IkMUwE69NXcvDO9rrmmRH_Xk3TPu9BpP";
let witness_root = Builder::new().prefix("test-kel-wit2-db").tempdir().unwrap();
Arc::new(
WitnessListener::setup_with_redb(
url::Url::parse("http://witness2/").unwrap(),
witness_root.path(),
Some(seed.to_string()),
WitnessEscrowConfig::default(),
)
.unwrap(),
)
};

let first_witness_id = witness1.get_prefix();
let first_witness_oobi = LocationScheme {
eid: IdentifierPrefix::Basic(first_witness_id.clone()),
scheme: Scheme::Http,
url: url::Url::parse("http://witness1/").unwrap(),
};
let second_witness_id = witness2.get_prefix();
let second_witness_oobi = LocationScheme {
eid: IdentifierPrefix::Basic(second_witness_id.clone()),
scheme: Scheme::Http,
url: url::Url::parse("http://witness2/").unwrap(),
};

let transport = {
let mut actors: keri_core::transport::test::TestActorMap = HashMap::new();
actors.insert((Host::Domain("witness1".to_string()), 80), witness1.clone());
actors.insert((Host::Domain("witness2".to_string()), 80), witness2.clone());
TestTransport::new(actors)
};

let controller = Controller::new(ControllerConfig {
db_path: root.path().to_owned(),
transport: Box::new(transport),
..Default::default()
})?;

Expand All @@ -118,8 +152,7 @@ async fn test_kel_managing_with_witness() -> Result<(), ControllerError> {

let mut identifier = controller.finalize_incept(inception_event.as_bytes(), &signature)?;

let i = identifier.notify_witnesses().await?;
dbg!(i);
assert_eq!(identifier.notify_witnesses().await?, 1);

let queries_to_sign = identifier.query_mailbox(
identifier.id(),
Expand Down
Loading