Skip to content

Feat/streaming op log: Implements streaming zst writer for tsv operation logs#5

Merged
russfellows merged 10 commits into
masterfrom
feat/streaming-op-log
Apr 19, 2026
Merged

Feat/streaming op log: Implements streaming zst writer for tsv operation logs#5
russfellows merged 10 commits into
masterfrom
feat/streaming-op-log

Conversation

@russfellows
Copy link
Copy Markdown
Owner

PR: bench: stream --full ops to csv.zst without in-memory accumulation

Branch: russfellows:feat/streaming-op-logminio:master
Depends on: PR minio#475 (russfellows:fix/default-tsv-output) — this branch is stacked on top
Author: Russ Fellows
Date: April 2026


Summary

When --full is used, warp currently accumulates every Operation struct in memory
for the entire benchmark duration, then writes them all to <benchdata>.csv.zst in a
single post-benchmark flush. Memory scales super-linearly with duration × concurrency:
a 1-hour / 32-concurrent run at 4 KiB objects projects to ~8 GB of extra heap.

This PR replaces that batch-write path with a background goroutine that streams
each operation to the zstd encoder as it completes, making --full memory overhead
constant at ~1.5 MB regardless of how long the benchmark runs or how many workers
are active.

The output file format and all downstream tooling (warp analyze --full,
polarWarp, custom log parsers) are
completely unchanged — the on-disk bytes are identical to the batch path.


Problem in Detail

The existing path (simplified):

// collector.go — ops slice grows for the entire benchmark
c.ops = append(c.ops, op)   // inside hot-path goroutine, for every operation

// benchmark.go — batch flush after b.Close()
ops := retrieveOps()
ops.CSV(enc, clientID, cmdLine)  // iterates the entire in-memory slice

Measured memory overhead per op grows as Go's GC holds the entire live slice:

Duration Concurrency Obj size ~Ops Extra RAM
30 s 8 64 KiB 30 K +9 MB
120 s 8 64 KiB 119 K +84 MB
60 s 32 4 KiB 152 K +124 MB
1 hr 32 4 KiB ~9 M ~8 GB (est.)

Solution

New type: pkg/bench/StreamingOpsWriter

type StreamingOpsWriter struct {
    ch       chan Operation  // capacity 1000, matches collector.rcv
    done     chan struct{}
    clientID string
    cmdLine  string
    err      error
}
  • NewStreamingOpsWriter(path, clientID, cmdLine) — creates the output file
    immediately (before the benchmark starts), writes the TSV header, and starts a
    background goroutine.
  • Receiver() chan<- Operation — write-only channel passed as an extra to
    NewNullCollector; the collector fan-outs every arriving op into it.
  • Wait() — called after Collector.Close() (which closes the channel);
    blocks until the goroutine has flushed and finalized the zstd frame.
  • Close() — standalone / test use only: closes the channel then waits.

The background goroutine uses a 256 KiB bufio.Writer wrapping a
zstd.SpeedBetterCompression encoder. On any write error it drains the channel
to avoid blocking benchmark goroutines, and surfaces the error via Wait().

addCollector refactor (cli/benchmark.go)

addCollector now accepts variadic fullExtra ...chan<- bench.Operation and
dispatches into three modes:

Condition Collector retrieveOps Notes
--discard-output NewNullCollector returns nil unchanged
--full + fullExtra provided NewNullCollector(extras+live+fullExtra) returns nil streaming path
--full, no fullExtra NewOpsCollector(extras+live) returns ops batch fallback (distributed mode)
default live collector only returns nil unchanged

runBench wiring (cli/benchmark.go)

In the local (non-distributed) --full path:

  1. StreamingOpsWriter is created before addCollector — the output file
    exists from the first moment of the benchmark.
  2. csvWriter.Receiver() is passed as fullExtra to addCollector.
  3. After c.Collector.Close(), csvWriter.Wait() flushes the final zstd frame.
  4. Because retrieveOps() now returns nil in streaming mode, the existing
    else if updates != nil branch naturally handles the json.zst aggregate
    file — both outputs are produced as before.

The distributed path (runClientBenchmark) is unchanged and continues to use
the batch collector.


Files Changed

File Change
pkg/bench/streaming_writer.go NewStreamingOpsWriter implementation (153 lines)
pkg/bench/streaming_writer_test.go New — 9 unit tests, all pass with -race (368 lines)
cli/benchmark.go Modified — addCollector signature + streaming wiring in runBench (+71 / -14)
cli/addcollector_test.go Modified — 4 new integration tests for streaming path (+190 lines)
README.md Updated --full memory note to reflect constant overhead
docs/Warp-streaming-log-Design.md New — full design document (542 lines)
docs/issue-streaming-full-writer.md New — GitHub issue draft used to track the work
docs/RNG_ANALYSIS.md Moved from repo root into docs/
.gitignore Added .env

Net: +1,539 lines inserted, −29 deleted across 9 files.


Tests

Unit tests — pkg/bench (9 tests, all pass with -race)

Test Validates
TestStreamingOpsWriter_FileCreatedOnNew File exists before any ops are sent
TestStreamingOpsWriter_OpCount All sent ops appear in output
TestStreamingOpsWriter_ClientIDStamped Non-empty clientID overwrites every row
TestStreamingOpsWriter_EmptyClientID_PreservesField Empty clientID preserves op's existing field
TestStreamingOpsWriter_EmptyRun Graceful close with zero ops sent
TestStreamingOpsWriter_ValuesPreserved All 14 TSV columns round-trip correctly
TestStreamingOpsWriter_MultipleOps_AllClientIDSet 100-op batch, all stamped
TestStreamingOpsWriter_Wait_CollectorPattern Channel closed externally; Wait() returns clean
TestStreamingOpsWriter_ConcurrentSends 8 goroutines × 100 ops, no races

Integration tests — cli (4 new tests in addcollector_test.go)

Test Validates
TestAddCollector_StreamingMode_OpsGoToChannel Ops route to channel, not to in-memory slice
TestAddCollector_StreamingMode_LiveCollectorAlsoReceives Live collector still receives ops in parallel
TestAddCollector_FullMode_BatchFallback_NoChannel Batch mode preserved when no extra channel provided
TestAddCollector_Streaming_EndToEnd Full pipeline: addCollector + StreamingOpsWritercsv.zst → parse + verify clientID

All 20 tests (11 cli + 9 bench + generator tests) pass with go test ./... and
with go test -race ./cli/... ./pkg/bench/....

Note: TestAddCollector_DefaultMode_UpdatesChannelFunctional fails under
-race in the unmodified upstream codebase due to a data race in
substrait-go/iceberg-go init() code. This is a pre-existing issue not
introduced by this PR.


Integration / End-to-End Validation

Tested against a live MinIO target (TLS, 4-node):

Run Op mix Concurrency Duration Ops captured Empty clientID idx gaps
PUT only put 4 20 s 15,055 0 0
Stress put 16 30 s 51,465 0 0
Mixed put/get/delete/stat 4 20 s 51,001 0 0

All output files were verified with:

  • warp analyze --full <file>.csv.zst — correct op counts and throughput
  • warp analyze <file>.csv.zst — re-aggregate path unchanged
  • warp analyze <file>.json.zst — aggregate JSON output still produced alongside
  • polarWarp — full parse on macOS,
    correct per-op-type latency percentiles and throughput buckets

Memory Overhead (Before vs After)

Before (batch path):

  • Memory scales linearly with ops_per_second × duration
  • 1-hr / 32-concurrent run at 4 KiB objects: ~8 GB projected

After (streaming path):

  • Goroutine stack + 256 KiB bufio.Writer + zstd encoder internal buffers
  • ~1.5 MB constant regardless of benchmark duration or concurrency

Backward Compatibility

  • Output file format is byte-for-byte identical to the existing batch path
  • --full flag semantics are unchanged from the user's perspective
  • All existing warp analyze subcommands work without modification
  • The distributed benchmark path (--warp-client) is untouched
  • No changes to any public API types

Relationship to PR minio#475

This PR is stacked on fix/default-tsv-output (PR minio#475), which restored the
--full per-transaction .csv.zst output that was inadvertently removed in a
previous refactor. This PR then replaces that restored batch write with the
streaming implementation. If PR minio#475 lands first, this PR can be rebased directly
onto master with no conflicts.

StreamingOpsWriter writes Operations to a zstd-compressed TSV file as
they arrive via a channel, without ever accumulating them in a slice.
Memory overhead is constant (~1.5 MB) regardless of benchmark duration
or concurrency, compared to the current batch design that grows O(n).

Key design points:
- Channel buffer matches collector.rcv (1000) for back-pressure parity
- clientID stamped per-row in goroutine (ops arrive with empty ClientID)
- Empty clientID preserves any ClientID already set on the Operation
- Two close patterns: Close() for standalone/test use; Wait() for use
  after a Collector has closed the channel (avoids double-close)
- 256 KiB bufio.Writer wrapping zstd SpeedBetterCompression encoder
- 9 unit tests covering: file creation, op count, clientID stamping,
  empty-clientID passthrough, zero-op run, field preservation,
  collector-integrated Wait() pattern, and concurrent-send race check

All tests pass with -race.
addCollector now accepts optional fullExtra ...chan<- bench.Operation.
When --full is set and a streaming channel is provided, ops are fanned
out to that channel via NewNullCollector (no in-memory accumulation).
When no extra channel is provided and --full is set, the existing batch
mode (NewOpsCollector) is used unchanged — preserving backward
compatibility for the distributed agent code path.

New / updated tests (cli/addcollector_test.go):
  Test 8: streaming mode ops go to channel, not memory
  Test 9: streaming mode live collector still receives every op
  Test 10: batch fallback when no streaming channel is given
  Test 11: end-to-end — StreamingOpsWriter wired through addCollector
           writes correct csv.zst file with clientID stamped

All 11 addCollector tests pass.
Changes to cli/benchmark.go (local/standalone benchmark path):
- Compute fileName and cID before addCollector so the streaming writer
  can be created and wired into the collector fan-out before b.Start().
- When --full is set, create a StreamingOpsWriter immediately; the
  output file exists on disk from that point (partial record available
  on interrupt).
- Pass csvWriter.Receiver() to addCollector as a fullExtra channel.
  addCollector uses NewNullCollector (no in-memory accumulation) when
  a streaming channel is provided.
- After Collector.Close() (which closes the writer channel), call
  csvWriter.Wait() to block until the goroutine has flushed to disk.
- The existing batch path (if ops := retrieveOps(); len(ops) > 0) still
  handles the non-streaming --full case and the non-full case unchanged.
  In streaming mode retrieveOps returns empty, so execution falls through
  to the else-if branch that writes json.zst from the live aggregate.

The distributed agent path (runClientBenchmark) retains batch mode;
it calls addCollector with no extra channel, which keeps using
NewOpsCollector.  This is a deferred improvement.

README.md: update --full memory note to reflect constant ~1.5 MB
overhead rather than the previous O(n) batch behaviour.
@russfellows russfellows force-pushed the feat/streaming-op-log branch from 85ae59d to c5999e7 Compare April 19, 2026 20:03
Capture rcv channel by value before launching the goroutine so the
goroutine closure doesn't access c.rcv via a field read. Without this,
Close() can write c.rcv = nil (under mu) while the goroutine is still
reading c.rcv to pass to Live() — a data race detected by -race.

Fixes: TestAddCollector_DefaultMode_UpdatesChannelFunctional failing
under go test -race ./...
@russfellows russfellows merged commit 150b005 into master Apr 19, 2026
7 checks passed
@russfellows russfellows deleted the feat/streaming-op-log branch April 19, 2026 21:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant