Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public static void main(String[] args) throws IOException {
// Write CSV data
try (FileWriter writer = new FileWriter(outputFile)) {
// Write CSV header
writer.write("dataset,QPS,QPS StdDev,Mean Latency,Recall@10,Index Construction Time,Avg Nodes Visited\n");
writer.write("dataset,QPS,QPS StdDev,Mean Latency,Recall@10,Index Construction Time,Avg Nodes Visited," +
"max build heap,max query heap,file size\n");

// Write one row per dataset with average metrics
for (Map.Entry<String, SummaryStats> entry : statsByDataset.entrySet()) {
Expand All @@ -207,7 +208,10 @@ public static void main(String[] args) throws IOException {
writer.write(datasetStats.getAvgLatency() + ",");
writer.write(datasetStats.getAvgRecall() + ",");
writer.write(datasetStats.getIndexConstruction() + ",");
writer.write(datasetStats.getAvgNodesVisited() + "\n");
writer.write(datasetStats.getAvgNodesVisited() + ",");
writer.write(datasetStats.getBuildMaxHeap() + ",");
writer.write(datasetStats.getQueryMaxHeap() + ",");
writer.write(datasetStats.getTotalDisk() + "\n");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,17 @@ public static List<BenchResult> runAllAndCollectResults(
}

if (!missing.isEmpty()) {
// Start peak memory tracking before building
diagnostics.startBuildMemoryTracking();

// At least one index needs to be built (b/c not in cache or cache is disabled)
// We pass the handles map so buildOnDisk knows exactly where to write
var newIndexes = buildOnDisk(missing, m, ef, neighborOverflow, addHierarchy, refineFinalGraph,
ds, outputDir, compressor, handles, null);
indexes.putAll(newIndexes);

// Stop peak memory tracking after building
diagnostics.stopBuildMemoryTracking();
}

ImmutableGraphIndex index = indexes.get(features);
Expand All @@ -912,6 +918,7 @@ public static List<BenchResult> runAllAndCollectResults(
diagnostics.printDiskStatistics("Graph Index Build");
var buildSnapshot = diagnostics.getLatestSystemSnapshot();
DiskUsageMonitor.MultiDirectorySnapshot buildDiskSnapshot = diagnostics.getLatestDiskSnapshot();
var buildPeakMemory = diagnostics.getBuildPeakMemory();

try (ConfiguredSystem cs = new ConfiguredSystem(ds, index, cvArg, features)) {
int queryRuns = 2;
Expand Down Expand Up @@ -954,13 +961,12 @@ public static List<BenchResult> runAllAndCollectResults(
allMetrics.put("Index Build Time", indexBuildTimes.get(ds.getName()));
}

// Add memory metrics if available
if (buildSnapshot != null) {
allMetrics.put("Heap Memory Used (MB)", buildSnapshot.memoryStats.heapUsed / 1024.0 / 1024.0);
allMetrics.put("Heap Memory Max (MB)", buildSnapshot.memoryStats.heapMax / 1024.0 / 1024.0);
allMetrics.put("Off-Heap Direct (MB)", buildSnapshot.memoryStats.directBufferMemory / 1024.0 / 1024.0);
allMetrics.put("Off-Heap Mapped (MB)", buildSnapshot.memoryStats.mappedBufferMemory / 1024.0 / 1024.0);
allMetrics.put("Total Off-Heap (MB)", buildSnapshot.memoryStats.getTotalOffHeapMemory() / 1024.0 / 1024.0);
// Add peak memory metrics from build phase if available
if (buildPeakMemory != null) {
allMetrics.put("Heap Memory Max (MB)", buildPeakMemory.peakHeapUsed / 1024.0 / 1024.0);
allMetrics.put("Off-Heap Direct (MB)", buildPeakMemory.peakDirectBufferMemory / 1024.0 / 1024.0);
allMetrics.put("Off-Heap Mapped (MB)", buildPeakMemory.peakMappedBufferMemory / 1024.0 / 1024.0);
allMetrics.put("Total Off-Heap (MB)", buildPeakMemory.getTotalPeakOffHeapMemory() / 1024.0 / 1024.0);
}

// Add disk metrics if available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,33 @@ public List<Metric> run(
}

diagnostics.capturePrePhaseSnapshot("Query");

// Start peak memory tracking before running queries
diagnostics.startQueryMemoryTracking();

for (var benchmark : benchmarks) {
var metrics = benchmark.runBenchmark(cs, topK, rerankK, usePruning, queryRuns);
results.addAll(metrics);
}

// Stop peak memory tracking after running queries
diagnostics.stopQueryMemoryTracking();

// Capture memory and disk usage after running queries
diagnostics.capturePostPhaseSnapshot("Query");

// Add memory and disk metrics to results
var systemSnapshot = diagnostics.getLatestSystemSnapshot();
// Add peak memory metrics to results
var queryPeakMemory = diagnostics.getQueryPeakMemory();
var diskSnapshot = diagnostics.getLatestDiskSnapshot();

if (systemSnapshot != null) {
// Max heap usage in MB
if (queryPeakMemory != null) {
// Peak heap usage in MB during queries
results.add(Metric.of("search.system.max_heap_mb", "Max heap usage (MB)", ".1f",
systemSnapshot.memoryStats.heapUsed / (1024.0 * 1024.0)));
queryPeakMemory.peakHeapUsed / (1024.0 * 1024.0)));

// Max off-heap usage (direct + mapped) in MB
// Peak off-heap usage (direct + mapped) in MB during queries
results.add(Metric.of("search.system.max_offheap_mb", "Max offheap usage (MB)", ".1f",
systemSnapshot.memoryStats.getTotalOffHeapMemory() / (1024.0 * 1024.0)));
queryPeakMemory.getTotalPeakOffHeapMemory() / (1024.0 * 1024.0)));
}

if (diskSnapshot != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class BenchmarkDiagnostics implements AutoCloseable {
private final List<DiskUsageMonitor.MultiDirectorySnapshot> diskSnapshots;
private final List<PerformanceAnalyzer.TimingAnalysis> timingAnalyses;
private boolean diskMonitorStarted = false;

// Peak memory tracking for build and query phases
private SystemMonitor.PeakMemoryStats buildPeakMemory = null;
private SystemMonitor.PeakMemoryStats queryPeakMemory = null;
private boolean trackingBuildMemory = false;
private boolean trackingQueryMemory = false;

public BenchmarkDiagnostics(DiagnosticLevel level) {
this.level = level;
Expand Down Expand Up @@ -100,6 +106,74 @@ public void startMonitoring(String label, Path directory) throws IOException {
diskUsageMonitor.addDirectory(label, directory);
}
}

/**
* Starts tracking peak memory usage for the build phase.
* Call this before starting index construction.
*/
public void startBuildMemoryTracking() {
if (!trackingBuildMemory) {
systemMonitor.startPeakMemoryTracking();
trackingBuildMemory = true;
}
}

/**
* Stops tracking peak memory usage for the build phase and captures the peak values.
* Call this after index construction completes.
*/
public void stopBuildMemoryTracking() {
if (trackingBuildMemory) {
buildPeakMemory = systemMonitor.stopPeakMemoryTracking();
trackingBuildMemory = false;

if (level != DiagnosticLevel.NONE) {
System.out.printf("[Build] Peak Memory: %s%n", buildPeakMemory);
}
}
}

/**
* Starts tracking peak memory usage for the query phase.
* Call this before starting query execution.
*/
public void startQueryMemoryTracking() {
if (!trackingQueryMemory) {
systemMonitor.startPeakMemoryTracking();
trackingQueryMemory = true;
}
}

/**
* Stops tracking peak memory usage for the query phase and captures the peak values.
* Call this after query execution completes.
*/
public void stopQueryMemoryTracking() {
if (trackingQueryMemory) {
queryPeakMemory = systemMonitor.stopPeakMemoryTracking();
trackingQueryMemory = false;

if (level != DiagnosticLevel.NONE) {
System.out.printf("[Query] Peak Memory: %s%n", queryPeakMemory);
}
}
}

/**
* Gets the peak memory statistics captured during the build phase.
* Returns null if build memory tracking was not performed.
*/
public SystemMonitor.PeakMemoryStats getBuildPeakMemory() {
return buildPeakMemory;
}

/**
* Gets the peak memory statistics captured during the query phase.
* Returns null if query memory tracking was not performed.
*/
public SystemMonitor.PeakMemoryStats getQueryPeakMemory() {
return queryPeakMemory;
}

/**
* Captures system state before starting a benchmark phase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import java.lang.management.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Utility class for monitoring system resources during benchmark execution.
* Tracks GC activity, memory usage (on-heap and off-heap), CPU load, and thread statistics.
*
* Supports continuous peak memory tracking via a background sampling thread.
*/
public class SystemMonitor {

Expand All @@ -32,6 +35,13 @@ public class SystemMonitor {
private final com.sun.management.OperatingSystemMXBean sunOsBean;
private final List<MemoryPoolMXBean> memoryPoolBeans;
private final List<BufferPoolMXBean> bufferPoolBeans;

// Peak memory tracking
private final AtomicLong peakHeapUsed = new AtomicLong(0);
private final AtomicLong peakDirectBufferMemory = new AtomicLong(0);
private final AtomicLong peakMappedBufferMemory = new AtomicLong(0);
private volatile Thread samplingThread;
private volatile boolean sampling = false;

public SystemMonitor() {
this.memoryBean = ManagementFactory.getMemoryMXBean();
Expand All @@ -42,6 +52,93 @@ public SystemMonitor() {
this.memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
this.bufferPoolBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
}

/**
* Starts continuous memory sampling in a background thread.
* Samples memory usage every 50ms and tracks peak values.
*/
public void startPeakMemoryTracking() {
if (sampling) {
return; // Already sampling
}

// Reset peak values
peakHeapUsed.set(0);
peakDirectBufferMemory.set(0);
peakMappedBufferMemory.set(0);

sampling = true;
samplingThread = new Thread(() -> {
while (sampling) {
try {
MemoryStats current = captureMemoryStats();

// Update peak values atomically
updatePeak(peakHeapUsed, current.heapUsed);
updatePeak(peakDirectBufferMemory, current.directBufferMemory);
updatePeak(peakMappedBufferMemory, current.mappedBufferMemory);

Thread.sleep(50); // Sample every 50ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "SystemMonitor-PeakMemorySampler");
samplingThread.setDaemon(true);
samplingThread.start();
}

/**
* Stops continuous memory sampling and returns the peak values observed.
*/
public PeakMemoryStats stopPeakMemoryTracking() {
sampling = false;
if (samplingThread != null) {
try {
samplingThread.join(1000); // Wait up to 1 second
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
samplingThread = null;
}

return new PeakMemoryStats(
peakHeapUsed.get(),
peakDirectBufferMemory.get(),
peakMappedBufferMemory.get()
);
}

/**
* Gets current peak memory values without stopping tracking.
*/
public PeakMemoryStats getCurrentPeakMemory() {
return new PeakMemoryStats(
peakHeapUsed.get(),
peakDirectBufferMemory.get(),
peakMappedBufferMemory.get()
);
}

/**
* Resets peak memory tracking values.
*/
public void resetPeakMemory() {
peakHeapUsed.set(0);
peakDirectBufferMemory.set(0);
peakMappedBufferMemory.set(0);
}

private void updatePeak(AtomicLong peak, long current) {
long currentPeak;
do {
currentPeak = peak.get();
if (current <= currentPeak) {
break;
}
} while (!peak.compareAndSet(currentPeak, current));
}

/**
* Captures current system state snapshot
Expand Down Expand Up @@ -167,6 +264,33 @@ public void logDetailedGCStats(String phase) {
gcBean.getName(), gcBean.getCollectionCount(), gcBean.getCollectionTime());
}
}
/**
* Container for peak memory statistics captured during continuous monitoring.
*/
public static class PeakMemoryStats {
public final long peakHeapUsed;
public final long peakDirectBufferMemory;
public final long peakMappedBufferMemory;

public PeakMemoryStats(long peakHeapUsed, long peakDirectBufferMemory, long peakMappedBufferMemory) {
this.peakHeapUsed = peakHeapUsed;
this.peakDirectBufferMemory = peakDirectBufferMemory;
this.peakMappedBufferMemory = peakMappedBufferMemory;
}

public long getTotalPeakOffHeapMemory() {
return peakDirectBufferMemory + peakMappedBufferMemory;
}

@Override
public String toString() {
return String.format("PeakMemory[heap=%d MB, direct=%d MB, mapped=%d MB]",
peakHeapUsed / 1024 / 1024,
peakDirectBufferMemory / 1024 / 1024,
peakMappedBufferMemory / 1024 / 1024);
}
}


// Inner classes for data structures
public static class SystemSnapshot {
Expand Down
Loading
Loading