Skip to content

Commit adafeb9

Browse files
committed
commands: remove stop and start commands
They were significantly broken. They had no awareness of failover, and also did not communicate with the manager process so could do actions that contradict what the manager might be trying to do. If their functionality is still desired, they will need to be re-implemented as programs that communicate via the manager to perform their actions. They will also need failover awareness.
1 parent c55c715 commit adafeb9

File tree

6 files changed

+15
-301
lines changed

6 files changed

+15
-301
lines changed

src/commands/mod.rs

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ pub mod fence;
88
pub mod manage;
99
pub mod power;
1010
pub mod reset;
11-
pub mod start;
1211
pub mod status;
13-
pub mod stop;
1412
pub mod validate;
1513

1614
use {
@@ -26,8 +24,6 @@ use {
2624

2725
use clap::{Parser, Subcommand};
2826

29-
use crate::cluster::Cluster;
30-
3127
/// A `HandledError` represents an error that has already been handled. When you call a function
3228
/// that returns a `HandledError` or `HandledResult`, you don't need to do anything with that error,
3329
/// other than just be aware that it happened, and return it on to your caller.
@@ -93,8 +89,6 @@ pub struct Cli {
9389
#[derive(Subcommand, Debug)]
9490
pub enum Commands {
9591
Status(StatusArgs),
96-
Start,
97-
Stop,
9892
Discover(DiscoverArgs),
9993
Failback(FailbackArgs),
10094
Fence(FenceArgs),
@@ -109,31 +103,18 @@ pub enum Commands {
109103

110104
pub fn main(cli: &Cli) -> HandledResult<()> {
111105
match &cli.command {
112-
Commands::Discover(args) => return discover::discover(args),
113-
Commands::Failback(args) => return failback::failback(cli, args),
114-
Commands::Fence(args) => return fence::fence(cli, args),
115-
Commands::Power(args) => return power::power(cli, args),
116-
Commands::Validate => return validate::validate(cli),
117-
Commands::Status(args) => return status::status(cli, args),
118-
Commands::Manage(args) => return manage::manage(cli, args),
119-
Commands::Unmanage(args) => return manage::unmanage(cli, args),
120-
Commands::Activate(args) => return activate::activate(cli, args),
121-
Commands::Deactivate(args) => return activate::deactivate(cli, args),
122-
Commands::Reset(args) => return reset::reset(cli, args),
123-
_ => {}
106+
Commands::Discover(args) => discover::discover(args),
107+
Commands::Failback(args) => failback::failback(cli, args),
108+
Commands::Fence(args) => fence::fence(cli, args),
109+
Commands::Power(args) => power::power(cli, args),
110+
Commands::Validate => validate::validate(cli),
111+
Commands::Status(args) => status::status(cli, args),
112+
Commands::Manage(args) => manage::manage(cli, args),
113+
Commands::Unmanage(args) => manage::unmanage(cli, args),
114+
Commands::Activate(args) => activate::activate(cli, args),
115+
Commands::Deactivate(args) => activate::deactivate(cli, args),
116+
Commands::Reset(args) => reset::reset(cli, args),
124117
}
125-
126-
let rt = tokio::runtime::Runtime::new()
127-
.handle_err(|e| eprintln!("Error launching tokio runtime: {e}"))?;
128-
129-
rt.block_on(async {
130-
let cluster = Cluster::from_config(cli.config.clone())?;
131-
match &cli.command {
132-
Commands::Start => start::start(cluster).await,
133-
Commands::Stop => stop::stop(cluster).await,
134-
_ => unreachable!(),
135-
}
136-
})
137118
}
138119

139120
/// Convert multiple nodeset strings into a single, deduplicated NodeSet object.

src/commands/start.rs

Lines changed: 0 additions & 48 deletions
This file was deleted.

src/commands/stop.rs

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/halo_capnp.rs

Lines changed: 3 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,9 @@
33

44
use std::{env, io};
55

6-
use {futures::AsyncReadExt, rustls::pki_types::ServerName};
6+
use futures::AsyncReadExt;
77

8-
use crate::{
9-
cluster,
10-
remote::ocf,
11-
resource::{Location, Resource},
12-
tls::get_connector,
13-
};
8+
use crate::{cluster, remote::ocf, resource::Resource};
149

1510
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
1611

@@ -57,7 +52,7 @@ impl From<capnp::Error> for AgentError {
5752
}
5853
}
5954

60-
/// Sends an OCF request to perform `op` to a remote agent, determined by `res` and `loc`.
55+
/// Sends an OCF request to perform `op` to the remote agent reached by `client`.
6156
///
6257
/// Returns a `Result` that contains whether an error occurred while attempting the remote
6358
/// operation, or contains the result of the operation if the request was succesful.
@@ -69,18 +64,6 @@ impl From<capnp::Error> for AgentError {
6964
///
7065
/// An `Err(_)` variant means that succesful communication did not occur, so it is unknown whether
7166
/// the operation was attempted or what the outcome was if it was attempted.
72-
pub async fn remote_ocf_operation(
73-
res: &Resource,
74-
loc: Location,
75-
op: ocf_resource_agent::Operation,
76-
) -> Result<AgentReply, AgentError> {
77-
let request = get_ocf_request(res, loc, op).await?;
78-
79-
let reply = request.send().promise.await?;
80-
81-
Ok(get_status(reply)?)
82-
}
83-
8467
pub async fn remote_ocf_operation_given_client(
8568
res: &Resource,
8669
client: &ocf_resource_agent::Client,
@@ -113,65 +96,6 @@ fn get_status(reply: OcfOperationResults) -> Result<AgentReply, capnp::Error> {
11396
})
11497
}
11598

116-
/// Create a capnp RPC client and set up the client to perform the operation() RPC.
117-
async fn get_ocf_request(
118-
res: &Resource,
119-
loc: Location,
120-
op: ocf_resource_agent::Operation,
121-
) -> io::Result<OperationRequest> {
122-
let hostname = match loc {
123-
Location::Home => res.home_node.address(),
124-
Location::Away => res
125-
.failover_node
126-
.as_ref()
127-
.expect("Called operation on failover node for resource without failover node")
128-
.address(),
129-
};
130-
let stream = tokio::net::TcpStream::connect(hostname).await?;
131-
stream.set_nodelay(true).expect("Setting nodelay failed.");
132-
133-
if res.args.mtls {
134-
// Create mtls connector
135-
let mtls_connector = get_connector().expect("TODO: handle error here");
136-
137-
// Set domain/hostname of server we intend to connect to
138-
let domain = ServerName::try_from(
139-
env::var("HALO_SERVER_DOMAIN_NAME").expect("HALO_SERVER_DOMAIN_NAME not set."),
140-
)
141-
.unwrap();
142-
143-
// Perform mtls handshake
144-
let mtls_stream = mtls_connector.connect(domain, stream).await?;
145-
146-
Ok(__get_ocf_request(mtls_stream, res, op))
147-
} else {
148-
Ok(__get_ocf_request(stream, res, op))
149-
}
150-
}
151-
152-
fn __get_ocf_request<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + 'static>(
153-
stream: S,
154-
res: &Resource,
155-
op: ocf_resource_agent::Operation,
156-
) -> OperationRequest {
157-
let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
158-
let rpc_network = Box::new(twoparty::VatNetwork::new(
159-
futures::io::BufReader::new(reader),
160-
futures::io::BufWriter::new(writer),
161-
rpc_twoparty_capnp::Side::Client,
162-
Default::default(),
163-
));
164-
let mut rpc_system = RpcSystem::new(rpc_network, None);
165-
let client: ocf_resource_agent::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
166-
167-
tokio::task::spawn_local(rpc_system);
168-
169-
let mut request = client.operation_request();
170-
prep_request(&mut request, res, op);
171-
172-
request
173-
}
174-
17599
/// Prepare a capnp operation RPC request.
176100
fn prep_request(request: &mut OperationRequest, res: &Resource, op: ocf_resource_agent::Operation) {
177101
let mut request = request.get();

src/resource.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -472,34 +472,6 @@ impl Resource {
472472
remote_ocf_operation_given_client(self, client, ocf_resource_agent::Operation::Stop).await
473473
}
474474

475-
/// Perform a monitor RPC for this resource.
476-
pub async fn monitor(&self, loc: Location) -> Result<AgentReply, AgentError> {
477-
tokio::task::LocalSet::new()
478-
.run_until(async {
479-
remote_ocf_operation(self, loc, ocf_resource_agent::Operation::Monitor).await
480-
})
481-
.await
482-
}
483-
484-
/// Perform a start RPC for this resource.
485-
pub async fn start(&self, loc: Location) -> Result<AgentReply, AgentError> {
486-
tokio::task::LocalSet::new()
487-
.run_until(async {
488-
remote_ocf_operation(self, loc, ocf_resource_agent::Operation::Start).await
489-
})
490-
.await
491-
}
492-
493-
/// Perform a stop RPC for this resource.
494-
pub async fn stop(&self) -> Result<AgentReply, AgentError> {
495-
tokio::task::LocalSet::new()
496-
.run_until(async {
497-
remote_ocf_operation(self, Location::Home, ocf_resource_agent::Operation::Stop)
498-
.await
499-
})
500-
.await
501-
}
502-
503475
pub fn status(&self) -> ResourceStatus {
504476
self.status.lock().unwrap().clone()
505477
}

tests/simple.rs

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@
55
mod tests {
66
use std::sync::Arc;
77

8-
use tokio::runtime::Runtime;
9-
10-
use halo_lib::{
11-
halo_capnp::AgentReply, host::FenceCommand, remote::ocf, resource::Location, test_env::*,
12-
};
8+
use halo_lib::{host::FenceCommand, test_env::*};
139

1410
/// Create a TestEnvironment for a test.
1511
///
@@ -24,79 +20,6 @@ mod tests {
2420
)
2521
}
2622

27-
#[test]
28-
fn simple() {
29-
let mut env = test_env_helper("simple");
30-
31-
let agent = TestAgent::new(halo_lib::remote_port(), None);
32-
33-
let _agent = env.start_remote_agents(vec![agent]);
34-
35-
let cluster = env.cluster();
36-
37-
let rt = Runtime::new().unwrap();
38-
rt.block_on(async {
39-
for res in cluster.resources() {
40-
assert!(matches!(
41-
res.start(Location::Home).await,
42-
Ok(AgentReply::Success(ocf::Status::Success))
43-
));
44-
45-
env.assert_agent_next_line(&agent_expected_line("start", res));
46-
47-
assert!(matches!(
48-
res.monitor(Location::Home).await,
49-
Ok(AgentReply::Success(ocf::Status::Success))
50-
));
51-
52-
env.assert_agent_next_line(&agent_expected_line("monitor", res));
53-
54-
assert!(matches!(
55-
res.stop().await,
56-
Ok(AgentReply::Success(ocf::Status::Success))
57-
));
58-
env.assert_agent_next_line(&agent_expected_line("stop", res));
59-
}
60-
});
61-
}
62-
63-
#[test]
64-
fn multi_agent() {
65-
let mut env = test_env_helper("multiagent");
66-
67-
let _agents = env.start_remote_agents(vec![
68-
TestAgent::new(8001, Some("mds01".to_string())),
69-
TestAgent::new(8002, Some("oss01".to_string())),
70-
]);
71-
72-
let cluster = env.cluster();
73-
74-
let rt = Runtime::new().unwrap();
75-
rt.block_on(async {
76-
for res in cluster.resources() {
77-
assert!(matches!(
78-
res.start(Location::Home).await,
79-
Ok(AgentReply::Success(ocf::Status::Success))
80-
));
81-
82-
env.assert_agent_next_line(&agent_expected_line("start", res));
83-
84-
assert!(matches!(
85-
res.monitor(Location::Home).await,
86-
Ok(AgentReply::Success(ocf::Status::Success))
87-
));
88-
89-
env.assert_agent_next_line(&agent_expected_line("monitor", res));
90-
91-
assert!(matches!(
92-
res.stop().await,
93-
Ok(AgentReply::Success(ocf::Status::Success))
94-
));
95-
env.assert_agent_next_line(&agent_expected_line("stop", res));
96-
}
97-
});
98-
}
99-
10023
#[test]
10124
fn fencing() {
10225
let env = test_env_helper("fencing");

0 commit comments

Comments
 (0)