Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
25da07d
Add environment file
tomvothecoder Jan 8, 2026
cb399d9
Add modules for experimentation
tomvothecoder Jan 8, 2026
f7c01ab
Extract benchmarking utilities to benchmark.py
tomvothecoder Jan 9, 2026
531913b
Add notes and findings
tomvothecoder Jan 9, 2026
09e0c8c
Add plotting code
tomvothecoder Jan 12, 2026
955d836
Add experiment results
tomvothecoder Jan 12, 2026
24af4af
Fix json netcdf map
tomvothecoder Jan 16, 2026
ea3996d
Refactor code to use pandas and parallelism
tomvothecoder Jan 16, 2026
0044481
Use python logger module
tomvothecoder Jan 16, 2026
d179490
Update FREQ_PATTERN
tomvothecoder Jan 26, 2026
3a5bfc5
Update main.py and benchmark.py for production runs
tomvothecoder Jan 26, 2026
83d4c43
Add latest metrics
tomvothecoder Jan 27, 2026
8424ec2
Update module name
tomvothecoder Jan 27, 2026
2fb2618
Add metric notebooks
tomvothecoder Jan 27, 2026
2bed6ea
Update title of cell
tomvothecoder Jan 27, 2026
a6a545f
Update takeaway
tomvothecoder Jan 27, 2026
693b21e
Fix display of dataframe
tomvothecoder Jan 27, 2026
f69eea6
Add single file metrics notebook
tomvothecoder Feb 5, 2026
9a4cb2b
Add 3hr notebook
tomvothecoder Feb 5, 2026
75d61d4
Add overall takeaway to single file notebook
tomvothecoder Feb 5, 2026
08f7b7b
Update notebook with key findings on reference counts
tomvothecoder Feb 9, 2026
55cb09c
Update notebook
tomvothecoder Feb 9, 2026
47b8072
Fix comment
tomvothecoder Feb 9, 2026
8920f1d
Update 3hr_outlier_metrics.ipynb
tomvothecoder Feb 13, 2026
77654a4
Add initial end-to-end .load() notebook
tomvothecoder Feb 19, 2026
2a46e1c
Add Steve scripts
tomvothecoder Mar 2, 2026
f56ba96
Enhance script for better benchmarking
tomvothecoder Mar 2, 2026
1a0d220
More improvements to head_to_head.py
tomvothecoder Mar 2, 2026
4914902
Update head_to_head.py
tomvothecoder Mar 2, 2026
e252d31
Update docstring
tomvothecoder Mar 2, 2026
b2b5f87
Add benchmark reuslts
tomvothecoder Mar 2, 2026
1e901c3
Add dask metrics
tomvothecoder Mar 2, 2026
5505aef
Remove outdated plots and scripts
tomvothecoder Mar 2, 2026
690355c
Fail fast and skip datasets with malformed cf metadata
tomvothecoder Mar 6, 2026
9f707ba
Remove dataset-level parallelism to prevent noise
tomvothecoder Mar 6, 2026
75c4f51
Add results of benchmark
tomvothecoder Mar 9, 2026
ab8c4e3
Add considerations section
tomvothecoder Mar 9, 2026
0b11faa
Update possible causes section
tomvothecoder Mar 9, 2026
925aa14
Update directories and add script for file-count
tomvothecoder Mar 13, 2026
058287c
Add dataset dir to kerchunk mapping
tomvothecoder Mar 16, 2026
de4fe28
Update with CLI args
tomvothecoder Mar 16, 2026
96c5c67
Add results
tomvothecoder Mar 19, 2026
085e7a2
Add overall summary
tomvothecoder Mar 19, 2026
f58bad7
Update sections by benchmark
tomvothecoder Mar 19, 2026
fd3ccdc
Add latest benchmark scripts
tomvothecoder Apr 16, 2026
1a8915e
Update benchmark script by bin
tomvothecoder Apr 16, 2026
c75dc30
Add results
tomvothecoder Apr 16, 2026
dcbd27f
Remove practical read section from summary.md
tomvothecoder Apr 16, 2026
5d6fa01
Add scripts for latest upscale benchmark
tomvothecoder Apr 23, 2026
4eb4e4a
Update prepare_datasets.py
tomvothecoder Apr 23, 2026
f522014
Improve kerchunk dataset preparation workflow
tomvothecoder Apr 23, 2026
e8f503c
Add loggers to prepare_datasets.py
tomvothecoder Apr 23, 2026
fb92ecc
Add latest benchmark results
tomvothecoder Apr 28, 2026
1d461c4
Add summary.md
tomvothecoder Apr 28, 2026
34b0ba0
Add daily crossover benchmark artifacts and QA
tomvothecoder May 27, 2026
5caa547
Add final total timing by bin plots
tomvothecoder May 27, 2026
a6f4de0
Update summaries
tomvothecoder May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,6 @@ input/
*.nc
*.xml
*.sh.status

riotai/json_to_netcdf_maps/json_to_netcdf_table.csv
riotai/json_to_netcdf_maps/json_to_netcdf.json
Empty file added riotai/__init__.py
Empty file.
386 changes: 386 additions & 0 deletions riotai/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,386 @@
"""
benchmark.py

Utilities for benchmarking xarray dataset open performance using
Kerchunk JSON references versus raw NetCDF collections.

Designed for climate / CMIP / E3SM-style datasets on HPC or cloud storage.
"""

from __future__ import annotations

from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
import random
import time
import warnings
from typing import Callable, Mapping, TypedDict

import numpy as np
from tqdm import tqdm
import xarray as xr
import xcdat as xc
import pandas as pd

logger = logging.getLogger(__name__)

# Number of parallel worker processes to use per frequency.
# Tuned to limit filesystem metadata pressure for file-heavy datasets
# (e.g., daily or sub-daily data) while allowing more parallelism for monthly
# data.
WORKERS_BY_FREQUENCY = {
"Amon": 4,
"Imon": 4,
"ImonAnt": 4,
"ImonGre": 4,
"day": 1,
"AERhr": 1,
"CFsubhr": 1,
"3hr": 1,
"E1hr": 1,
}


JsonPath = str
NetCDFFileList = list[str]
DatasetMapping = Mapping[JsonPath, NetCDFFileList]


class RawMetric(TypedDict):
frequency: str
json: str
num_netcdf_files: int
timesteps: int
dims: dict[str, int]
kerchunk_time: float
netcdf_time: float


class AggMetric(TypedDict):
freq: str
n: int
sample_size_target: int
coverage: float
sampling: str
mean_netcdf_files: int
median_netcdf_files: int
kerchunk_median: float
netcdf_median: float
kerchunk_mean: float
netcdf_mean: float


def benchmark_all_frequencies(
freq_json_netcdf_map: Mapping[str, DatasetMapping],
sample_size: int = 40,
warmup: bool = True,
rng: random.Random | None = None,
freqs: list[str] | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Benchmark all frequencies in the provided mapping.

Parameters
----------
freq_json_netcdf_map : Mapping[str, DatasetMapping]
Mapping from frequency (str) to a mapping of Kerchunk JSON file paths (str)
to lists of NetCDF file paths (list of str).
sample_size : int, optional
Number of samples to benchmark per frequency (default is 40).
warmup : bool, optional
Whether to perform a warm-up open before benchmarking (default is True).
rng : random.Random or None, optional
Random number generator for reproducible sampling (default is None).

Returns
-------
tuple of pd.DataFrame
DataFrames containing raw and aggregate benchmark metrics.
"""
all_raw_metrics: list[RawMetric] = []
all_agg_metrics: list[AggMetric] = []

for frequency, dataset_mapping in freq_json_netcdf_map.items():
if frequency is not None and frequency not in freqs:
continue

sampled_dataset_items = _sample_items(dataset_mapping, sample_size, rng)
raw_metrics, sampled_dataset_items = _benchmark_frequency(
frequency, sampled_dataset_items, warmup
)

all_raw_metrics.extend(raw_metrics)

agg_metrics = _get_agg_metrics(
raw_metrics,
frequency,
sample_size,
sampled_dataset_items,
)

all_agg_metrics.append(agg_metrics)

df_raw_metrics = pd.DataFrame(all_raw_metrics)
df_agg_metrics = pd.DataFrame(all_agg_metrics)

return df_raw_metrics, df_agg_metrics


# -----------------------------------------------------------------------------
# Benchmark core
# -----------------------------------------------------------------------------
def _benchmark_frequency(
freq: str,
sampled_items: list[tuple[str, list[str]]],
warmup: bool = True,
) -> tuple[list[RawMetric], list[tuple[str, list[str]]]]:
"""
Benchmark Kerchunk vs NetCDF open performance for a single frequency.

Parameters
----------
freq : str
Frequency name (e.g., "Amon", "day", "AERhr").
sampled_items : list of (str, list of str)
Sampled (json_file, netcdf_files) pairs to benchmark.
warmup : bool, default=True
Whether to perform an unrecorded warm-up open.

Returns
-------
tuple
(raw_metrics, sampled_items)
"""
logger.info(f"\n=== Benchmarking {freq} ===")

# Warm-up to avoid cache effects, remains serial.
if warmup and sampled_items:
_run_warmup(sampled_items, freq)

raw_metrics: list[RawMetric] = []
max_workers = WORKERS_BY_FREQUENCY[freq]

with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
_benchmark_single_item,
freq,
json_file,
netcdf_files,
)
for json_file, netcdf_files in sampled_items
]

for future in tqdm(
as_completed(futures),
total=len(futures),
desc="Comparing I/O speed",
mininterval=5,
):
result = future.result()
if result is not None:
raw_metrics.append(result)

return raw_metrics, sampled_items


def _benchmark_single_item(
freq: str,
json_file: str,
netcdf_files: list[str],
) -> RawMetric | None:
try:
kc_time = _time_call(_open_kerchunk, json_file)
except Exception:
return None

try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
nc_time = _time_call(_open_netcdf, netcdf_files)
except Exception:
return None

dims, timesteps = _get_dims_and_timesteps(json_file)

return {
"frequency": freq,
"json": json_file,
"num_netcdf_files": len(netcdf_files),
"timesteps": timesteps,
"dims": dims,
"kerchunk_time": kc_time,
"netcdf_time": nc_time,
}


def _sample_items(
mapping: DatasetMapping,
sample_size: int,
rng: random.Random | None = None,
) -> list[tuple[str, list[str]]]:
"""
Randomly sample dataset entries from a JSON → NetCDF mapping.
"""
# A new random.Random instance seeded with 42 is used for reproducibility.
if rng is None:
rng = random.Random(42)

items = sorted(mapping.items())

return rng.sample(items, min(sample_size, len(items)))


def _run_warmup(sampled_items: list[tuple[str, list[str]]], freq: str) -> None:
logger.info(" * Performing warm-up open...")
json_file, netcdf_files = sampled_items[0]

try:
_open_kerchunk(json_file)
_open_netcdf(netcdf_files)
except Exception as e:
logger.info(f" * Warm-up failed for {freq}: {e}")


def _get_dims_and_timesteps(json_file: str) -> tuple[dict[str, int], int]:
"""
Get the dimension names and number of time steps in an xarray Dataset.

Parameters
----------
json_file : str
Path to the Kerchunk JSON reference file.

Returns
-------
tuple
(dimension sizes, number of time steps)
"""
with xr.open_dataset(json_file, engine="kerchunk") as ds:
time_dim = xc.get_dim_coords(ds, "T")
timesteps = len(time_dim) if time_dim is not None else 0

return dict(ds.sizes), timesteps


def _time_call(fn: Callable[..., None], *args) -> float:
"""
Measure wall-clock execution time of a callable.

Parameters
----------
fn : callable
Function to execute.
*args
Positional arguments passed to the function.

Returns
-------
float
Elapsed time in seconds.
"""
t0 = time.perf_counter()
fn(*args)
return time.perf_counter() - t0


def _open_kerchunk(json_file: str) -> None:
"""
Open a Kerchunk JSON reference file using xarray.

This function measures *open cost only* and does not read any data.

Parameters
----------
json_file : str
Path to the Kerchunk JSON reference file.

Returns
-------
None
"""
with xc.open_dataset(json_file, engine="kerchunk"):
pass


def _open_netcdf(netcdf_files: list[str]) -> None:
"""
Open a collection of NetCDF files using xarray.open_mfdataset.

Uses minimal metadata options to avoid benchmarking merge or conflict
resolution overhead.

Parameters
----------
netcdf_files : list of str
List of NetCDF file paths to open.

Returns
-------
None
"""
with xc.open_mfdataset(
netcdf_files,
combine="by_coords",
compat="override",
coords="minimal",
data_vars="minimal",
parallel=False,
chunks={},
):
pass


def _get_agg_metrics(
raw_metrics: list[RawMetric],
freq: str,
sample_size: int,
sampled_items: list[tuple[str, list[str]]],
) -> AggMetric | None:
"""
Compute aggregate metrics from raw timing data.

Parameters
----------
raw_metrics : list of dict
List of raw benchmark metrics.
freq : str
Frequency name.
sample_size : int
Target sample size.
sampled_items : list
Actual sampled (json, netcdf_files) pairs.

Returns
-------
dict or None
Aggregate metrics including mean and median times.
"""
kc_times = [m["kerchunk_time"] for m in raw_metrics]
nc_times = [m["netcdf_time"] for m in raw_metrics]

if not kc_times or not nc_times:
return None

agg_metrics = {
# --- Sample size / statistical context ---
"freq": freq,
"n": len(kc_times),
"sample_size_target": sample_size,
"coverage": float(min(1.0, len(sampled_items) / sample_size)),
"sampling": "exhaustive" if len(sampled_items) < sample_size else "random",
# --- Workload characterization ---
"mean_netcdf_files": (
int(np.mean([len(nc) for _, nc in sampled_items])) if sampled_items else 0
),
"median_netcdf_files": (
int(np.median([len(nc) for _, nc in sampled_items])) if sampled_items else 0
),
# --- Metrics ---
"kerchunk_median": float(np.median(kc_times)),
"netcdf_median": float(np.median(nc_times)),
"kerchunk_mean": float(np.mean(kc_times)),
"netcdf_mean": float(np.mean(nc_times)),
}

return agg_metrics
1 change: 1 addition & 0 deletions riotai/findings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
## I/O Comparison
Loading