Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 210 additions & 122 deletions crates/cli/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use sightglass_data::{Format, Measurement, Phase};
use sightglass_recorder::measure::Measurements;
use sightglass_recorder::{bench_api::BenchApi, benchmark::benchmark, measure::MeasureType};
use std::{
fs,
io::{self, BufWriter, Write},
env, fs,
io::{self, BufWriter, Read, Write},
path::{Path, PathBuf},
process::Command,
process::Stdio,
Expand Down Expand Up @@ -116,85 +116,111 @@ impl BenchmarkCommand {
"iterations-per-process must be greater than zero"
);

if self.processes == 1 {
self.execute_in_current_process()
if env::var("__SIGHTGLASS_CHILD").is_ok() {
self.execute_child()
} else {
self.execute_in_multiple_processes()
self.execute_parent()
}
}

/// Execute benchmark(s) in the provided engine(s) using the current process.
pub fn execute_in_current_process(&self) -> Result<()> {
let mut output_file: Box<dyn Write> = if let Some(file) = self.output_file.as_ref() {
Box::new(BufWriter::new(fs::File::create(file)?))
} else {
Box::new(io::stdout())
};

let wasm_files: Vec<_> = self
.wasm_files
.iter()
.map(|f| f.display().to_string())
.collect();
let mut all_measurements = vec![];
/// Execute a single Wasm benchmark with a single Engine in the current
/// child process.
pub fn execute_child(&self) -> Result<()> {
// The parent process is responsible for ensuring that all these things
// are true for child processes.
assert_eq!(self.processes, 1);
assert_eq!(self.engines.len(), 1);
assert_eq!(self.wasm_files.len(), 1);
assert!(self.output_file.is_none());
assert!(self.raw);
assert_eq!(self.output_format, Format::Json);

let engine = &self.engines[0];
let engine_path = Path::new(engine);
assert!(
engine_path.is_file(),
"parent should have already built the engine, if necessary"
);

for engine in &self.engines {
let engine_path = get_built_engine(engine)?;
log::info!("Using benchmark engine: {}", engine_path.display());
let lib = libloading::Library::new(&engine_path)?;
let mut bench_api = unsafe { BenchApi::new(&lib)? };

for wasm_file in &wasm_files {
log::info!("Using Wasm benchmark: {}", wasm_file);

// Use the provided --working-dir, otherwise find the Wasm file's parent directory.
let working_dir = self.get_working_directory(&wasm_file)?;
log::info!("Using working directory: {}", working_dir.display());

// Read the Wasm bytes.
let bytes = fs::read(&wasm_file).context("Attempting to read Wasm bytes")?;
log::debug!("Wasm benchmark size: {} bytes", bytes.len());

let mut measurements = Measurements::new(this_arch(), engine, wasm_file);
let mut measure = self.measure.build();

// Run the benchmark (compilation, instantiation, and execution) several times in
// this process.
for i in 0..self.iterations_per_process {
let wasm_hash = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
wasm_file.hash(&mut hasher);
hasher.finish()
};
let stdout = format!("stdout-{:x}-{}-{}.log", wasm_hash, std::process::id(), i);
let stdout = Path::new(&stdout);
let stderr = format!("stderr-{:x}-{}-{}.log", wasm_hash, std::process::id(), i);
let stderr = Path::new(&stderr);
let stdin = None;

benchmark(
&mut bench_api,
&working_dir,
stdout,
stderr,
stdin,
&bytes,
self.stop_after_phase.clone(),
&mut measure,
&mut measurements,
)?;

self.check_output(Path::new(wasm_file), stdout, stderr)?;
measurements.next_iteration();
}
log::info!("Using benchmark engine: {}", engine_path.display());
let lib = libloading::Library::new(&engine_path)?;
let mut bench_api = unsafe { BenchApi::new(&lib)? };

let wasm_file = self.wasm_files[0].display().to_string();
log::info!("Using Wasm benchmark: {}", wasm_file);

let working_dir = self.get_working_directory(&wasm_file)?;
log::info!("Using working directory: {}", working_dir.display());

let wasm_bytes = fs::read(&wasm_file).context("Attempting to read Wasm bytes")?;
log::debug!("Wasm benchmark size: {} bytes", wasm_bytes.len());

let mut measurements = Measurements::new(this_arch(), engine, &wasm_file);
let mut measure = self.measure.build();

// Run the benchmark (compilation, instantiation, and execution) several times in
// this process.
for i in 0..self.iterations_per_process {
let wasm_hash = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
wasm_file.hash(&mut hasher);
hasher.finish()
};
let stdout = format!("stdout-{:x}-{}-{}.log", wasm_hash, std::process::id(), i);
let stdout = Path::new(&stdout);
let stderr = format!("stderr-{:x}-{}-{}.log", wasm_hash, std::process::id(), i);
let stderr = Path::new(&stderr);
let stdin = None;

self.wait_for_parent()?;
benchmark(
&mut bench_api,
&working_dir,
stdout,
stderr,
stdin,
&wasm_bytes,
self.stop_after_phase.clone(),
&mut measure,
&mut measurements,
)?;
self.notify_parent()?;

all_measurements.extend(measurements.finish());
}
self.check_output(Path::new(&wasm_file), stdout, stderr)?;
measurements.next_iteration();
}

self.write_results(&all_measurements, &mut output_file)?;
let measurements = measurements.finish();
let stdout = io::stdout();
let stdout = stdout.lock();
serde_json::to_writer(stdout, &measurements)?;
Ok(())
}

/// Wait for the parent process to write a byte to our (child process's)
/// stdin.
fn wait_for_parent(&self) -> Result<()> {
debug_assert!(env::var("__SIGHTGLASS_CHILD").is_ok());
let stdin = io::stdin();
let mut stdin = stdin.lock();
let mut buf = [0; 1];
stdin
.read_exact(&mut buf)
.context("failed to read a byte from stdin")?;
Ok(())
}

/// Notify the parent process that we (the child process) finished running
/// an iteration.
fn notify_parent(&self) -> Result<()> {
debug_assert!(env::var("__SIGHTGLASS_CHILD").is_ok());
let stdout = io::stdout();
let mut stdout = stdout.lock();
stdout
.write_all(&[b'\n'])
.context("failed to write a byte to stdout")?;
Ok(())
}

Expand Down Expand Up @@ -267,9 +293,9 @@ impl BenchmarkCommand {
Ok(())
}

/// Execute the benchmark(s) by spawning multiple processes. Each of the spawned processes will
/// run the `execute_in_current_process` function above.
fn execute_in_multiple_processes(&self) -> Result<()> {
/// Execute the benchmark(s) by spawning multiple processes. Each of the
/// spawned processes will run the `execute_child` function above.
fn execute_parent(&self) -> Result<()> {
let mut output_file: Box<dyn Write> = if let Some(file) = self.output_file.as_ref() {
Box::new(BufWriter::new(fs::File::create(file)?))
} else {
Expand All @@ -286,17 +312,19 @@ impl BenchmarkCommand {

let mut rng = SmallRng::seed_from_u64(0x1337_4242);

// Worklist that we randomly sample from.
// Worklist of benchmarking child processes that we randomly sample
// from.
let mut choices = vec![];

for engine in &self.engines {
// Ensure that each of our engines is built before we spawn any
// child processes (potentially in a different working directory,
// and therefore potentially invalidating relative paths used here).
// child processes.
let engine = get_built_engine(engine)?;

for wasm in &self.wasm_files {
choices.push((engine.clone(), wasm, self.processes));
for _ in 0..self.processes {
choices.push(Child::new(self, &this_exe, &engine, wasm)?);
}
}
}

Expand All @@ -305,58 +333,40 @@ impl BenchmarkCommand {

while !choices.is_empty() {
let index = rng.gen_range(0, choices.len());
let (engine, wasm, procs_left) = &mut choices[index];

let mut command = Command::new(&this_exe);
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.arg("benchmark")
.arg("--processes")
.arg("1")
.arg("--iterations-per-process")
.arg(self.iterations_per_process.to_string())
.arg("--engine")
.arg(&engine)
.arg("--measure")
.arg(self.measure.to_string())
.arg("--raw")
.arg("--output-format")
// Always use JSON when privately communicating with a
// subprocess.
.arg(Format::Json.to_string());

if self.small_workloads {
command.env("WASM_BENCH_USE_SMALL_WORKLOAD", "1");
}
let child = &mut choices[index];

if let Some(phase) = self.stop_after_phase {
command.arg("--stop-after").arg(phase.to_string());
}

command.arg("--").arg(&wasm);
log::info!(
"Running benchmark iteration in child process {}",
child.process.id()
);
child.run_one_iteration()?;

let output = command
.output()
.context("failed to run benchmark subprocess")?;
if child.iterations > 0 {
// This child process has more iterations to complete before the
// child prints its measurements to `stdout`.
continue;
}

let status = child
.process
.wait()
.context("failed to `wait` on a benchmarking child process")?;
anyhow::ensure!(
output.status.success(),
"benchmark subprocess did not exit successfully"
status.success(),
"benchmarking child process did not exit successfully"
);

// Parse the subprocess's output and add its measurements to our
// accumulation.
// Parse the benchmarking child's stdout and add its measurements to
// our accumulation.
let child_stdout = child.process.stdout.as_mut().unwrap();
measurements.extend(
serde_json::from_slice::<Vec<Measurement<'_>>>(&output.stdout)
serde_json::from_reader::<_, Vec<Measurement<'_>>>(child_stdout)
.context("failed to read benchmark subprocess's results")?,
);

*procs_left -= 1;
if *procs_left == 0 {
choices.swap_remove(index);
}
// We are all done with this benchmarking child process! Remove it
// from our worklist.
choices.swap_remove(index);
}

self.write_results(&measurements, &mut output_file)?;
Expand Down Expand Up @@ -394,6 +404,84 @@ impl BenchmarkCommand {
}
}

/// A benchmarking child process.
struct Child {
/// The child process itself.
process: std::process::Child,
/// How many iterations it still has left.
iterations: usize,
}

impl Child {
fn new(
benchmark: &BenchmarkCommand,
this_exe: &Path,
engine: &Path,
wasm: &Path,
) -> Result<Self> {
let mut command = Command::new(&this_exe);
command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.env("__SIGHTGLASS_CHILD", "1")
.arg("benchmark")
.arg("--processes")
.arg("1")
.arg("--iterations-per-process")
.arg(benchmark.iterations_per_process.to_string())
.arg("--engine")
.arg(engine)
.arg("--measure")
.arg(benchmark.measure.to_string())
.arg("--raw")
.arg("--output-format")
.arg(Format::Json.to_string());

if benchmark.small_workloads {
command.env("WASM_BENCH_USE_SMALL_WORKLOAD", "1");
}

if let Some(phase) = benchmark.stop_after_phase {
command.arg("--stop-after").arg(phase.to_string());
}

command.arg("--").arg(&wasm);

let process = command
.spawn()
.context("failed to spawn benchmarking child process")?;

Ok(Child {
process,
iterations: benchmark.iterations_per_process,
})
}

fn run_one_iteration(&mut self) -> Result<()> {
assert!(self.iterations > 0);

// The child process is blocked on reading a byte from its
// stdin. Write a byte to the child process's stdin, so that it
// unblocks and then executes one benchmark iteration.
let child_stdin = self.process.stdin.as_mut().unwrap();
child_stdin
.write_all(&[b'\n'])
.context("failed to write to benchmarking child process's stdin")?;

// Now wait for the child process to finish its iteration. It will write
// a byte to its `stdout` when the iteration is complete.
let child_stdout = self.process.stdout.as_mut().unwrap();
let mut buf = [0; 1];
child_stdout
.read_exact(&mut buf)
.context("failed to read a byte from a benchmarking child process's stdout")?;

self.iterations -= 1;
Ok(())
}
}

fn this_arch() -> &'static str {
if cfg!(target_arch = "x86_64") {
"x86_64"
Expand Down
Loading