diff --git a/.gitignore b/.gitignore index 48c8ef66..07fa0ac6 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,7 @@ dist # qreleaser state file .release-state.sh + +# local secrets — never commit +.env +warp-test diff --git a/README.md b/README.md index 42d08564..c4db4edb 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,15 @@ flag warp writes **both** output files: warp put --host=... --full ``` +> **Note:** `--full` streams every individual operation directly to the `.csv.zst` file +> as it completes, using a background goroutine. Memory overhead is constant (~1.5 MB) +> regardless of benchmark duration or concurrency — earlier releases buffered all ops in +> memory, which scaled to several GB for long high-concurrency runs. +> +> Analysis time differs by file type: a `.csv.zst` file takes ~5× longer to analyze with +> `--full` and ~13× longer to re-aggregate without it, compared to a `.json.zst` aggregate +> (which analyzes in under 0.2 s regardless of run length). The `.json.zst` aggregate is +> always written alongside the `.csv.zst` when `--full` is used, so both are available. ### Analyzing Results To get accurate per-operation statistics from a `.csv.zst` file, pass `--full` to `warp analyze`: diff --git a/cli/addcollector_test.go b/cli/addcollector_test.go index ec46fb4f..9b83e37e 100644 --- a/cli/addcollector_test.go +++ b/cli/addcollector_test.go @@ -1,6 +1,6 @@ package cli -// Tests for addCollector behaviour. +// Tests for addCollector behavior. // // The invariants we care about: // @@ -17,9 +17,11 @@ package cli import ( "context" "flag" + "os" "testing" "time" + "github.com/klauspost/compress/zstd" mcli "github.com/minio/cli" "github.com/minio/warp/pkg/aggregate" "github.com/minio/warp/pkg/bench" @@ -95,8 +97,8 @@ func TestAddCollector_DefaultMode_NoOpsStored(t *testing.T) { } // Send an operation to the collector and close it. - sendOp(t, b.Common.Collector) - b.Common.Collector.Close() + sendOp(t, b.Collector) + b.Collector.Close() // Without --full, retrieveOps is EmptyOpsCollector → must return empty. ops := retrieveOps() @@ -125,9 +127,9 @@ func TestAddCollector_FullMode_OpsAreCollected(t *testing.T) { const numOps = 5 for i := 0; i < numOps; i++ { - sendOp(t, b.Common.Collector) + sendOp(t, b.Collector) } - b.Common.Collector.Close() + b.Collector.Close() ops := retrieveOps() if len(ops) != numOps { @@ -151,11 +153,11 @@ func TestAddCollector_FullMode_LiveCollectorAlsoReceivesOps(t *testing.T) { const numOps = 3 for i := 0; i < numOps; i++ { - sendOp(t, b.Common.Collector) + sendOp(t, b.Collector) } // Close flushes bench.OpsCollector and, via the extra channel, also // signals the live collector to finish computing its aggregate. - b.Common.Collector.Close() + b.Collector.Close() // Sanity-check: per-transaction ops are present. ops := retrieveOps() @@ -197,9 +199,9 @@ func TestAddCollector_DefaultMode_UpdatesChannelFunctional(t *testing.T) { // Send ops then close. const numOps = 4 for i := 0; i < numOps; i++ { - sendOp(t, b.Common.Collector) + sendOp(t, b.Collector) } - b.Common.Collector.Close() + b.Collector.Close() // The updates channel is buffered (capacity 1000); a write must not block. // A Reset request is a no-op for the live collector and safe to use here. @@ -222,7 +224,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) { t.Run("full=false", func(t *testing.T) { ctx := makeCtx(false) b := &stubBench{} - b.Common.DiscardOutput = true + b.DiscardOutput = true retrieveOps, updates := addCollector(ctx, b) @@ -239,7 +241,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) { t.Run("full=true", func(t *testing.T) { ctx := makeCtx(true) b := &stubBench{} - b.Common.DiscardOutput = true + b.DiscardOutput = true retrieveOps, updates := addCollector(ctx, b) @@ -258,7 +260,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) { // --------------------------------------------------------------------------- // This test validates the core contract: --full must not disable any existing -// behaviour, only add the per-transaction csv.zst path on top. +// behavior, only add the per-transaction csv.zst path on top. // We verify this by confirming both the ops slice AND the live aggregate are // non-empty after the same set of operations. func TestAddCollector_FullMode_IsAdditive(t *testing.T) { @@ -275,9 +277,9 @@ func TestAddCollector_FullMode_IsAdditive(t *testing.T) { // Send ops. const numOps = 6 for i := 0; i < numOps; i++ { - sendOp(t, b.Common.Collector) + sendOp(t, b.Collector) } - b.Common.Collector.Close() + b.Collector.Close() // Per-transaction store must be full. ops := retrieveOps() @@ -315,8 +317,8 @@ func TestAddCollector_FullMode_OpValuesPreserved(t *testing.T) { Size: 8192, Thread: 3, } - b.Common.Collector.Receiver() <- op - b.Common.Collector.Close() + b.Collector.Receiver() <- op + b.Collector.Close() ops := retrieveOps() if len(ops) != 1 { @@ -339,3 +341,191 @@ func TestAddCollector_FullMode_OpValuesPreserved(t *testing.T) { t.Errorf("Thread: got %d want %d", got.Thread, op.Thread) } } + +// --------------------------------------------------------------------------- +// Stage 2 tests: streaming mode via addCollector's fullExtra parameter +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// Test 8: streaming mode — ops go to the channel, not to in-memory store +// --------------------------------------------------------------------------- + +// When a streaming channel is passed to addCollector and --full is set: +// - retrieveOps must return empty (ops are streamed, not buffered in RAM) +// - the streaming channel must receive every op +func TestAddCollector_StreamingMode_OpsGoToChannel(t *testing.T) { + ctx := makeCtx(true) // --full + + streamCh := make(chan bench.Operation, 100) + b := &stubBench{} + + retrieveOps, updates := addCollector(ctx, b, streamCh) + if updates == nil { + t.Fatal("expected non-nil updates in streaming mode") + } + + const numOps = 8 + for i := 0; i < numOps; i++ { + sendOp(t, b.Collector) + } + // Collector.Close() closes streamCh (it is in the fan-out extra list). + b.Collector.Close() + + // In streaming mode, retrieveOps must return empty (no in-memory accumulation). + ops := retrieveOps() + if len(ops) != 0 { + t.Errorf("streaming mode: expected 0 in-memory ops, got %d", len(ops)) + } + + // All ops must have arrived on the streaming channel. + var received int + for range streamCh { + received++ + } + if received != numOps { + t.Errorf("streaming channel: expected %d ops, got %d", numOps, received) + } +} + +// --------------------------------------------------------------------------- +// Test 9: streaming mode — live collector still receives every op +// --------------------------------------------------------------------------- + +// --full with streaming must be additive: both the streaming channel AND the +// live aggregate collector receive every op. +func TestAddCollector_StreamingMode_LiveCollectorAlsoReceives(t *testing.T) { + ctx := makeCtx(true) + + streamCh := make(chan bench.Operation, 100) + b := &stubBench{} + + _, updates := addCollector(ctx, b, streamCh) + + const numOps = 5 + for i := 0; i < numOps; i++ { + sendOp(t, b.Collector) + } + b.Collector.Close() + + // Drain streaming channel so the test doesn't block. + var streamed int + for range streamCh { + streamed++ + } + if streamed != numOps { + t.Errorf("streaming channel: expected %d ops, got %d", numOps, streamed) + } + + // Live collector must also have received all ops. + final := requestFinal(t, updates) + if final == nil { + t.Fatal("live aggregate is nil; streaming mode must not disable live collector") + } + if final.Total.TotalRequests == 0 { + t.Errorf("live TotalRequests == 0; ops were not forwarded to live collector in streaming mode") + } +} + +// --------------------------------------------------------------------------- +// Test 10: backward-compat — batch fallback when no streaming channel given +// --------------------------------------------------------------------------- + +// Passing no fullExtra channel with --full keeps the existing batch behavior: +// retrieveOps returns all ops (needed for distributed agent path). +func TestAddCollector_FullMode_BatchFallback_NoChannel(t *testing.T) { + ctx := makeCtx(true) + b := &stubBench{} + + // No extra channel → existing batch mode. + retrieveOps, updates := addCollector(ctx, b) + if updates == nil { + t.Fatal("expected non-nil updates in batch fallback mode") + } + + const numOps = 4 + for i := 0; i < numOps; i++ { + sendOp(t, b.Collector) + } + b.Collector.Close() + + ops := retrieveOps() + if len(ops) != numOps { + t.Errorf("batch fallback: expected %d in-memory ops, got %d", numOps, len(ops)) + } +} + +// --------------------------------------------------------------------------- +// Test 11: end-to-end — streaming path writes correct csv.zst file +// --------------------------------------------------------------------------- + +// Full pipeline: addCollector with a StreamingOpsWriter → send ops → close +// collector (which closes writer channel) → wait writer → read back file. +func TestAddCollector_Streaming_EndToEnd(t *testing.T) { + dir := t.TempDir() + path := dir + "/out.csv.zst" + const wantClientID = "e2eID" + + csvWriter, err := bench.NewStreamingOpsWriter(path, wantClientID, "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + ctx := makeCtx(true) + b := &stubBench{} + + retrieveOps, updates := addCollector(ctx, b, csvWriter.Receiver()) + if updates == nil { + t.Fatal("expected non-nil updates in streaming end-to-end test") + } + + const numOps = 12 + now := time.Now().Truncate(time.Millisecond) + for i := 0; i < numOps; i++ { + b.Collector.Receiver() <- bench.Operation{ + OpType: "GET", + Thread: uint32(i % 4), + Start: now, + End: now.Add(time.Duration(i+1) * time.Millisecond), + Size: int64(1024 * (i + 1)), + } + } + // Collector.Close() closes csvWriter.Receiver() as an extra channel. + b.Collector.Close() + + // Streaming mode: no in-memory ops. + ops := retrieveOps() + if len(ops) != 0 { + t.Errorf("expected 0 in-memory ops in streaming mode, got %d", len(ops)) + } + + // Wait for writer to flush to disk. + if err := csvWriter.Wait(); err != nil { + t.Fatalf("csvWriter.Wait: %v", err) + } + + // Read back and validate. + f, err := os.Open(path) + if err != nil { + t.Fatalf("open csv file: %v", err) + } + defer f.Close() + + dec, err := zstd.NewReader(f) + if err != nil { + t.Fatalf("zstd.NewReader: %v", err) + } + defer dec.Close() + + written, err := bench.OperationsFromCSV(dec, false, 0, 0, func(string, ...any) {}) + if err != nil { + t.Fatalf("OperationsFromCSV: %v", err) + } + if len(written) != numOps { + t.Errorf("csv file: expected %d ops, got %d", numOps, len(written)) + } + for i, op := range written { + if op.ClientID != wantClientID { + t.Errorf("op[%d] ClientID: got %q, want %q", i, op.ClientID, wantClientID) + } + } +} diff --git a/cli/benchmark.go b/cli/benchmark.go index 7b125110..8b3d9e42 100644 --- a/cli/benchmark.go +++ b/cli/benchmark.go @@ -132,7 +132,30 @@ func runBench(ctx *cli.Context, b bench.Benchmark) error { go ui.Run() } - retrieveOps, updates := addCollector(ctx, b) + // Generate the output file name and client ID here — before addCollector — + // so the streaming writer (if --full) can be wired into the collector + // fan-out from the start of the benchmark. + fileName := ctx.String("benchdata") + cID := pRandASCII(4) + if fileName == "" { + fileName = fmt.Sprintf("%s-%s-%s-%s", appName, ctx.Command.Name, time.Now().Format("2006-01-02[150405]"), cID) + } + + // When --full is set, create the streaming csv.zst writer immediately. + // The file exists on disk from this point so a partial record is available + // even if the benchmark is interrupted before it finishes. + var csvWriter *bench.StreamingOpsWriter + if ctx.Bool("full") { + var werr error + csvWriter, werr = bench.NewStreamingOpsWriter(fileName+".csv.zst", cID, commandLine(ctx)) + fatalIf(probe.NewError(werr), "Unable to create benchmark data file") + } + + var fullExtra []chan<- bench.Operation + if csvWriter != nil { + fullExtra = []chan<- bench.Operation{csvWriter.Receiver()} + } + retrieveOps, updates := addCollector(ctx, b, fullExtra...) c.UpdateStatus = ui.SetSubText monitor := api.NewBenchmarkMonitor(ctx.String(serverFlagName), updates) @@ -216,12 +239,6 @@ func runBench(ctx *cli.Context, b bench.Benchmark) error { close(start) }() - fileName := ctx.String("benchdata") - cID := pRandASCII(4) - if fileName == "" { - fileName = fmt.Sprintf("%s-%s-%s-%s", appName, ctx.Command.Name, time.Now().Format("2006-01-02[150405]"), cID) - } - prof, err := startProfiling(ctx2, ctx) fatalIf(probe.NewError(err), "Unable to start profile.") monitor.InfoLn("Starting benchmark in", time.Until(tStart).Round(time.Second)) @@ -232,6 +249,16 @@ func runBench(ctx *cli.Context, b bench.Benchmark) error { ctx2 = context.Background() prof.stop(ctx2, ctx, fileName+".profiles.zip") + // Flush the streaming writer — Collector.Close() above closed its channel; + // Wait() blocks until the background goroutine has flushed to disk. + if csvWriter != nil { + if werr := csvWriter.Wait(); werr != nil { + monitor.Errorln("Error finalizing benchmark data:", werr) + } else { + monitor.InfoLn(fmt.Sprintf("\nBenchmark data written to %q\n", fileName+".csv.zst")) + } + } + // Previous context is canceled, create a new... monitor.InfoLn("Saving benchmark data") if ops := retrieveOps(); len(ops) > 0 { @@ -596,7 +623,13 @@ func runClientBenchmark(ctx *cli.Context, b bench.Benchmark, cb *clientBenchmark return nil } -func addCollector(ctx *cli.Context, b bench.Benchmark) (bench.OpsCollector, chan<- aggregate.UpdateReq) { +// addCollector sets up the operation collector for the benchmark. +// +// The optional fullExtra channels are only meaningful when --full is set. +// When fullExtra is non-empty, ops are streamed to those channels instead of +// being accumulated in memory (streaming mode). When fullExtra is empty and +// --full is set, ops are accumulated in memory for batch write (legacy mode). +func addCollector(ctx *cli.Context, b bench.Benchmark, fullExtra ...chan<- bench.Operation) (bench.OpsCollector, chan<- aggregate.UpdateReq) { common := b.GetCommon() if common.DiscardOutput { common.Collector = bench.NewNullCollector(common.ExtraOut...) @@ -605,13 +638,25 @@ func addCollector(ctx *cli.Context, b bench.Benchmark) (bench.OpsCollector, chan // Always create the live aggregating collector for real-time display and autoterm. updates := make(chan aggregate.UpdateReq, 1000) if ctx.Bool("full") { - // --full additionally collects every individual operation so that a - // per-transaction csv.zst file is written after the benchmark. - // Each op is forwarded to the live collector via the extra channel, - // so live display and autoterm continue to work normally. + // --full collects every individual operation for csv.zst output. + // Each op is also forwarded to the live collector so real-time display + // and autoterm continue to work normally. liveC := aggregate.LiveCollector(context.Background(), updates, pRandASCII(4), nil) + // Build the fan-out target list: common extras + live collector. + extras := append(append([]chan<- bench.Operation{}, common.ExtraOut...), liveC.Receiver()) + if len(fullExtra) > 0 { + // Streaming mode: ops flow to the streaming writer channels; no + // in-memory accumulation. Collector.Close() will close the + // streaming writer's channel; caller should call writer.Wait() + // afterwards. + extras = append(extras, fullExtra...) + common.Collector = bench.NewNullCollector(extras...) + return bench.EmptyOpsCollector, updates + } + // Batch fallback mode: accumulate ops in memory (used when no + // streaming writer is wired in, e.g. distributed agent path). var retrieveOps bench.OpsCollector - common.Collector, retrieveOps = bench.NewOpsCollector(append(common.ExtraOut, liveC.Receiver())...) + common.Collector, retrieveOps = bench.NewOpsCollector(extras...) return retrieveOps, updates } // Default: live aggregating collector only; no per-transaction file. diff --git a/cli/cli.go b/cli/cli.go index d4923796..1d0cfa70 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -109,7 +109,7 @@ func init() { mergeCmd, clientCmd, runCmd, - replayCmd, // New entry for our replay command + replayCmd, // New entry for our replay command } appCmds = append(append(appCmds, a...), b...) benchCmds = a diff --git a/cli/merge.go b/cli/merge.go index e0275f49..6b56e0ed 100644 --- a/cli/merge.go +++ b/cli/merge.go @@ -129,7 +129,7 @@ func mergeJSON(ctx *cli.Context, args []string) error { func mergeCSV(ctx *cli.Context, args []string) error { zstdDec, _ := zstd.NewReader(nil) defer zstdDec.Close() - var allOps bench.Operations + allOps := make(bench.Operations, 0, len(args)) threads := uint32(0) log := console.Printf if globalQuiet { diff --git a/cli/replay.go b/cli/replay.go index 53030741..36ad06c2 100644 --- a/cli/replay.go +++ b/cli/replay.go @@ -275,15 +275,8 @@ func mainReplay(c *cli.Context) error { } objID := extractObjectID(entry) // implement for your TSV format - resolved := entry.Endpoint - if cfg != nil { - // a) config-level wildcard mapping - if m, _ := cfg.Resolve(entry.Endpoint); m != "" { - resolved = m - } - } // start from the trace’s original host - resolved = entry.Endpoint + resolved := entry.Endpoint if cfg != nil { // a) host_mapping with round-robin “one→many” if targets, ok := cfg.HostMapping[entry.Endpoint]; ok && len(targets) > 0 { diff --git a/docs/RNG_ANALYSIS.md b/docs/RNG_ANALYSIS.md new file mode 100644 index 00000000..ba94fb75 --- /dev/null +++ b/docs/RNG_ANALYSIS.md @@ -0,0 +1,150 @@ +# RNG Data Generation Analysis + +## Overview + +This fork of [minio/warp](https://github.com/minio/warp) includes a **bug fix** to the +pseudo-random data generator and a local copy of the fixed library in +`pkg/generator/rngfix/`. This document records the analysis of the generator's behavior, +the bug that was found, and why the fixed output is genuinely non-deduplicatable. + +--- + +## Bug: `ResetSize()` Always Produced Identical Output + +### Root cause + +`minio/pkg/v3/rng` v3.6.1 contained a typo in `init()`. A local variable `tmp` (zero-filled +on the stack) was used instead of the struct field `r.tmp` (populated from the PRNG) when +deriving the four 64-bit XOR sub-keys (`r.subxor[0..3]`): + +```go +// Buggy upstream code (init() in reader.go): +var tmp [32]byte // ← local, always zero +_, err := io.ReadFull(r.o.rng, r.tmp[:]) // ← fills r.tmp (struct field) — ignored! +r.subxor[0] = binary.LittleEndian.Uint64(tmp[:8]) // ← reads local zero +r.subxor[1] = binary.LittleEndian.Uint64(tmp[8:16]) // ← reads local zero +r.subxor[2] = binary.LittleEndian.Uint64(tmp[16:24]) // ← reads local zero +r.subxor[3] = binary.LittleEndian.Uint64(tmp[24:32]) // ← reads local zero +``` + +Because `subxor` was always `[0, 0, 0, 0]`, every call to `ResetSize()` produced an +**identical byte stream**, regardless of the PRNG seed. + +### Impact + +Measured on a 10 GB file split into 160 × 64 MiB chunks: + +| Metric | Buggy upstream | Fixed (`rngfix`) | +|---|---|---| +| Unique 64 MiB chunks | **1 / 160** | **160 / 160** | +| Dedup savings (64 MiB blocks) | 99.375% | 0.000% | +| Unique 4 KB blocks | **16,384 / 2,621,440** | **2,621,440 / 2,621,440** | +| Dedup savings (4 KB blocks) | 99.375% | 0.000% | + +GitHub issue filed: [minio/warp#471](https://github.com/minio/warp/issues/471) + +### Fix (in `pkg/generator/rngfix/reader.go`) + +Remove the dead local variable and read from `r.tmp` (the struct field): + +```go +// Fixed: +_, err := io.ReadFull(r.o.rng, r.tmp[:]) +r.subxor[0] = binary.LittleEndian.Uint64(r.tmp[:8]) +r.subxor[1] = binary.LittleEndian.Uint64(r.tmp[8:16]) +r.subxor[2] = binary.LittleEndian.Uint64(r.tmp[16:24]) +r.subxor[3] = binary.LittleEndian.Uint64(r.tmp[24:32]) +``` + +--- + +## How the Generator Works (Fixed) + +### Data structure + +``` +Reader { + buf [16384]byte // 16 KB seed — filled once from PRNG, never changes + subxor [4]uint64 // 4 × 64-bit XOR keys — re-randomized on every ResetSize() + offset int64 // current read position +} +``` + +### Output generation (`Read()`) + +For every 32-byte aligned slice of output: + +``` +blockN = offset / 16384 // changes every 16 KB of output +scrambleBase = scrambleU64(blockN) // xxh3-style mixing of blockN +keys[i] = scrambleBase ^ subxor[i] ^ (blockN × prime) + +output[32 bytes] = buf[offset % 16384 : +32] XOR keys (via SSE2 PXOR) +``` + +The `xorSlice` inner loop is SSE2 assembly (`MOVOU` + `PXOR`), processing 32 bytes per +iteration at ~46 GB/s single-core. + +### Why every 4 KB block is unique + +`bufferLog = 14` means `blockN` (and therefore `keys`) rotate once per **16 KB** of output. +This does **not** mean only 32 bytes per 16 KB window are unique. The full 16 KB of `r.buf` +seed data is indexed positionally with `offset & bufferMask`, so every 32-byte slot within +a 16 KB window reads from a **different position** in the 16 KB seed. The XOR keys are +identical within a window, but the seed inputs are different at every position. + +Concretely: + +- **Within a 16 KB window**: 512 × 32-byte output blocks differ because `buf` offsets differ. +- **Across 16 KB windows**: `blockN` changes, rotating `keys`, so even the same `buf` + position produces different output. +- **Across `ResetSize()` calls**: `subxor` is fully re-randomized from the PRNG, so the + entire stream is uncorrelated with any previous call. + +This gives **three independent sources of variation**, making deduplication impossible at +any granularity — confirmed empirically down to 4 KB blocks across a 10 GB test file. + +### Why the 4:1 dedup hypothesis was incorrect + +One might expect that a 16 KB buffer would produce 4:1 redundancy at 4 KB granularity — +i.e., four identical 4 KB blocks per 16 KB window. This would be true only if `r.buf` were +a 32-byte repeating pattern. It is not: `r.buf` is a **16 KB block of independent random +seed data**, so every 4 KB sub-block within a 16 KB window is already distinct. + +--- + +## Performance + +Measured on this machine with the fixed generator: + +**Generator speed (in-memory, CPU-bound):** + +| Metric | Value | +|---|---| +| Single-core throughput | ~46 GB/s | +| 8-goroutine throughput | ~373 GB/s | + +**10 GB test file write (I/O-bound):** + +| Metric | Value | +|---|---| +| File write throughput | ~1.36 GB/s | +| Bottleneck | NVMe write speed | +| Dedup savings at 4 KB | 0.000% | +| Dedup savings at 64 MB | 0.000% | + +The generator itself (SSE2 PXOR loop in `xorSlice`) runs at ~46 GB/s single-core — roughly +35× faster than the NVMe write speed. When writing to disk, the generator is completely +invisible in profiling; the workload is entirely I/O bound. In warp's normal use case +(writing objects to S3/GCS/Azure), the network is the bottleneck, not the generator. + +--- + +## Files Changed + +| File | Change | +|---|---| +| `pkg/generator/rngfix/` | New package — fixed copy of `minio/pkg/v3/rng` | +| `pkg/generator/random.go` | Uses `rngfix` instead of upstream `rng` | +| `cmd/datagen/main.go` | Standalone 10 GB test generator, uses `rngfix` | +| `pkg/generator/generator_bench_test.go` | Microbenchmarks for the fixed generator | diff --git a/docs/Warp-streaming-log-Design.md b/docs/Warp-streaming-log-Design.md new file mode 100644 index 00000000..42d159cc --- /dev/null +++ b/docs/Warp-streaming-log-Design.md @@ -0,0 +1,542 @@ +# Warp Streaming Op-Log Design + +**Status**: Proposed +**Branch context**: `fix/default-tsv-output` (PR #475 on `minio/warp`) +**Author**: Russ Fellows +**Date**: April 2026 + +--- + +## 1. Problem Statement + +When `--full` is used, warp accumulates every individual `Operation` struct in memory +inside `collector.ops []Operation` for the entire duration of the benchmark. Only after +`b.Close()` returns does `ops.CSV(enc, ...)` iterate through the slice and flush it to +a zstd-compressed file. + +This batch-write design causes memory to scale with operation count — and worse, with +concurrency and duration combined: + +| Duration | Concurrency | Obj size | Ops | Extra RAM (`--full` vs no-full) | Bytes/op | +|----------|-------------|----------|-----|---------------------------------|----------| +| 30 s | 8 | 64 KiB | ~30K | +9 MB | ~310 B/op | +| 120 s | 8 | 64 KiB | ~119K | +84 MB | ~740 B/op | +| 60 s | 32 | 4 KiB | ~152K | +124 MB | ~850 B/op | +| 1 hr | 32 | 4 KiB | ~9M | ~8 GB (projected) | — | + +The per-op overhead grows super-linearly because Go's GC arena does not release +allocations between operations — the entire slice stays live until the process exits. + +**Goal**: Replace the post-benchmark batch write with a background goroutine that +streams operations to the zstd encoder as they arrive, making memory usage constant +(~2–3 MB) regardless of benchmark duration or concurrency. + +--- + +## 2. Background: the Rust Reference Implementation + +The design mirrors two existing Rust implementations in the sibling projects that already +use an identical TSV format (modeled directly on warp's `.csv.zst` format). + +### 2.1 `s3dlio/src/s3_logger.rs` + +Implements `Logger` — a `SyncSender` + background `thread::spawn` writer: + +```rust +// Background writer thread — runs for the whole process lifetime +thread::spawn(move || { + for (idx, mut entry) in (0_u64..).zip(receiver.into_iter()) { + if entry.operation == SHUTDOWN_OP { break; } + entry.idx = idx; + let line = entry.to_log_line(clock_offset); + encoder.write_all(line.as_bytes())?; + } + drop(encoder); // finalises the zstd frame on drop + let _ = done_tx.send(()); +}); +``` + +Key design decisions: +- **Bounded channel** (default 256, tunable via `S3DLIO_OPLOG_BUF` env var) +- **`try_send` by default** (drops on overflow to avoid stalling I/O); lossless mode + available via `S3DLIO_OPLOG_LOSSLESS=1` +- **`BufWriter::with_capacity(256 KiB)`** wrapping the zstd encoder +- **zstd level 1** (bias to speed) +- File opened at logger creation time; header written immediately + +### 2.2 `sai3-bench/src/perf_log.rs` + +Implements `PerfLogWriter` — a synchronous writer called from a periodic ticker: + +```rust +pub fn write_entry(&mut self, entry: &PerfLogEntry) -> Result<()> { + writeln!(self.writer, "{}", entry.to_tsv())?; + Ok(()) +} +``` + +Key design decisions: +- `BufWriter::with_capacity(64 KiB)` wrapping optional zstd encoder +- zstd level 3 +- `auto_finish()` encoder — frame finalised automatically on drop +- File and header created at `PerfLogWriter::new()` + +### 2.3 What warp needs + +warp's `--full` path is more like s3dlio's `Logger` (high-frequency, concurrent ops +arriving from worker goroutines) than sai3-bench's `PerfLogWriter` (periodic 1-second +intervals). The channel-based goroutine model from s3dlio is the right approach. + +**Critical difference from s3dlio**: warp must be **lossless** — dropping ops would +produce an incomplete benchmark record. `try_send` is not acceptable; all sends must +be blocking. + +--- + +## 3. Current Code Architecture + +### 3.1 Operation struct (`pkg/bench/ops.go`) + +```go +type Operation struct { + OpType string + Thread uint32 + File string + Endpoint string + Size int64 + ObjPerOp int + ClientID string // ← always empty at construction time; set post-hoc + Start time.Time + End time.Time + FirstByte *time.Time + Err string + Categories bench.Categories +} +``` + +At construction in every benchmark worker (e.g. `put.go`, `multipart.go`, etc.), +`ClientID` is **always left empty**. It is stamped retroactively via +`ops.SetClientID(cID)` after the benchmark completes. + +### 3.2 CSV serialisation (`pkg/bench/ops.go`) + +```go +// Header +"idx\tthread\top\tclient_id\tn_objects\tbytes\tendpoint\tfile\terror\tstart\tfirst_byte\tend\tduration_ns\tcat\n" + +// Per-row (WriteCSV) +fmt.Fprintf(w, "%d\t%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%d\t%d\n", + i, o.Thread, o.OpType, o.ClientID, o.ObjPerOp, o.Size, + csvEscapeString(o.Endpoint), o.File, csvEscapeString(o.Err), + o.Start.Format(time.RFC3339Nano), ttfb, + o.End.Format(time.RFC3339Nano), o.End.Sub(o.Start)/time.Nanosecond, + o.Categories) +``` + +### 3.3 Current `--full` write path (local benchmark, `cli/benchmark.go`) + +``` +b.Start(ctx2, start) // benchmark runs; ops accumulate in memory +c.Collector.Close() // drains channel; all ops now in []Operation slice +ops := retrieveOps() // returns the slice (~84–124 MB at moderate load) +ops.SortByStartTime() // in-place sort of entire slice +ops.SetClientID(cID) // stamps client_id on every element +ops.CSV(enc, cmdLine) // writes entire slice to zstd encoder at once +``` + +`cID` is generated via `pRandASCII(4)` **before** `b.Start()` is called. It is also +embedded in the output filename suffix. This is important: `cID` is available at +the start of the benchmark, not just at the end. + +### 3.4 Current `--full` write path (distributed, `cli/benchserver.go`) + +In distributed mode: +1. Each agent runs its own local benchmark and holds ops in memory +2. After the benchmark, the controller calls `conns.downloadOps()` — each agent + serialises its ops and sends them over gRPC; the controller receives them all + into a combined `allOps` slice in memory +3. Controller calls `allOps.SortByStartTime()` then `allOps.CSV(enc, cmdLine)` + +The distributed path has an additional constraint: ops from multiple agents arrive +in bulk transfers, not as a real-time stream. Streaming at the controller is still +beneficial (avoids holding all agents' ops simultaneously), but the architecture +is different from the local path. + +### 3.5 Collector fan-out (`cli/benchmark.go` `addCollector()`) + +```go +// --full path: +liveC := aggregate.LiveCollector(ctx, updates, pRandASCII(4), nil) // separate ID +common.Collector, retrieveOps = bench.NewOpsCollector( + append(common.ExtraOut, liveC.Receiver())..., +) +``` + +`NewOpsCollector` fans out each incoming op to: (a) the live display collector, and +(b) appends to `collector.ops` in memory. With streaming, the memory accumulation +in (b) is eliminated — a `StreamingOpsWriter` replaces the in-memory buffer. + +Note: `pRandASCII(4)` passed to `LiveCollector` is a **separate, independent** random +string used only for per-client breakdown in the real-time display. It is not written +to the CSV and has no relation to `cID`. + +--- + +## 4. Proposed Design + +### 4.1 New type: `StreamingOpsWriter` (`pkg/bench/streaming_writer.go`) + +A background-goroutine writer that consumes `Operation` values from a channel and +writes them directly to a zstd-compressed TSV file as they arrive. + +```go +package bench + +import ( + "bufio" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/klauspost/compress/zstd" +) + +// StreamingOpsWriter writes Operations to a zstd-compressed TSV file as they +// arrive, without buffering them in memory. It is the streaming equivalent of +// Operations.CSV() but suitable for use during a live benchmark run. +type StreamingOpsWriter struct { + ch chan Operation // ops arrive here from benchmark workers (via fan-out) + done chan struct{} // closed when background goroutine has fully flushed + clientID string // stamped on every row (known before benchmark starts) + cmdLine string // written as trailing # comment + err error // first error from background goroutine; read after Close() +} + +// NewStreamingOpsWriter creates the output file immediately, writes the TSV header, +// and starts a background goroutine to consume and write operations. +// +// path - full path to the .csv.zst file (created immediately) +// clientID - value for the client_id TSV column (known before benchmark start) +// cmdLine - written as a trailing comment after all ops are flushed +func NewStreamingOpsWriter(path, clientID, cmdLine string) (*StreamingOpsWriter, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + + enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + if err != nil { + f.Close() + return nil, err + } + + bw := bufio.NewWriterSize(enc, 256*1024) + + // Write header immediately so the file is valid even if the benchmark is + // interrupted before any ops complete. + if _, err := bw.WriteString("idx\tthread\top\tclient_id\tn_objects\tbytes\tendpoint\tfile\terror\tstart\tfirst_byte\tend\tduration_ns\tcat\n"); err != nil { + enc.Close() + f.Close() + return nil, err + } + + w := &StreamingOpsWriter{ + ch: make(chan Operation, 1000), // matches collector.rcv buffer size + done: make(chan struct{}), + clientID: clientID, + cmdLine: cmdLine, + } + + go func() { + defer close(w.done) + var idx int + for op := range w.ch { + op.ClientID = w.clientID // stamp here, same as SetClientID did before + if err := op.WriteCSV(bw, idx); err != nil { + w.err = err + // Drain channel so senders are not blocked + for range w.ch { + } + break + } + idx++ + } + // Write trailing command-line comment (same as Operations.CSV) + if w.err == nil && len(w.cmdLine) > 0 { + for _, line := range strings.SplitSeq(w.cmdLine, "\n") { + if _, err := fmt.Fprintf(bw, "# %s\n", line); err != nil { + w.err = err + break + } + } + } + if w.err == nil { + w.err = bw.Flush() + } + enc.Close() // finalises the zstd frame + f.Close() + }() + + return w, nil +} + +// Receiver returns the send-only channel for incoming Operations. +// This channel should be passed as an `extra` channel to NewNullCollector or +// NewOpsCollector so that each completed operation is forwarded here. +func (w *StreamingOpsWriter) Receiver() chan<- Operation { + return w.ch +} + +// Close signals that no more operations will arrive and blocks until the +// background goroutine has fully flushed and closed the file. +// Returns the first write error encountered during streaming, or nil. +func (w *StreamingOpsWriter) Close() error { + close(w.ch) + <-w.done + return w.err +} +``` + +### 4.2 Changes to `cli/benchmark.go` — local benchmark path + +**Before** (current): +```go +fileName := ctx.String("benchdata") +cID := pRandASCII(4) +if fileName == "" { + fileName = fmt.Sprintf(...) +} +// ... b.Start() runs, ops accumulate in memory ... +ops := retrieveOps() +ops.SortByStartTime() +ops.SetClientID(cID) +if len(ops) > 0 { + f, _ := os.Create(fileName + ".csv.zst") + enc, _ := zstd.NewWriter(f, ...) + ops.CSV(enc, commandLine(ctx)) +} +``` + +**After** (streaming): +```go +fileName := ctx.String("benchdata") +cID := pRandASCII(4) +if fileName == "" { + fileName = fmt.Sprintf(...) +} + +// Create streaming writer BEFORE b.Start() — file exists from t=0, +// even if benchmark is interrupted. +var csvWriter *bench.StreamingOpsWriter +if ctx.Bool("full") { + var err error + csvWriter, err = bench.NewStreamingOpsWriter( + fileName+".csv.zst", cID, commandLine(ctx)) + if err != nil { + monitor.Errorln("Unable to create benchmark data file:", err) + } +} + +// ... b.Start() runs — ops flow through channel to csvWriter in real time ... + +// After benchmark ends: +if csvWriter != nil { + if err := csvWriter.Close(); err != nil { + monitor.Errorln("Error finalising benchmark data:", err) + } else { + monitor.InfoLn(fmt.Sprintf("\nBenchmark data written to %q\n", fileName+".csv.zst")) + } +} +// json.zst write path unchanged +``` + +### 4.3 Changes to `cli/benchmark.go` `addCollector()` + +**Before** (current): +```go +liveC := aggregate.LiveCollector(ctx, updates, pRandASCII(4), nil) +common.Collector, retrieveOps = bench.NewOpsCollector( + append(common.ExtraOut, liveC.Receiver())..., +) +return retrieveOps, updates +``` + +**After** (streaming): +```go +liveC := aggregate.LiveCollector(ctx, updates, pRandASCII(4), nil) +// NewNullCollector discards ops after forwarding — no in-memory accumulation. +// csvWriter.Receiver() and liveC.Receiver() both receive every op. +common.Collector = bench.NewNullCollector( + append(common.ExtraOut, liveC.Receiver(), csvWriter.Receiver())..., +) +return bench.EmptyOpsCollector, updates +``` + +`retrieveOps` is no longer needed; `EmptyOpsCollector` is returned as a sentinel. +`csvWriter` is passed in as a parameter or via closure. + +### 4.4 Changes to `cli/benchserver.go` — distributed path + +In distributed mode, the streaming opportunity is different: ops arrive as bulk +downloads from agents (not as a real-time stream). The streaming writer can still be +used to write each agent's batch as it arrives, avoiding holding all agents' ops +simultaneously in RAM: + +```go +if ctx.Bool("full") { + csvWriter, err := bench.NewStreamingOpsWriter( + fileName+".csv.zst", "", commandLine(ctx)) + // ... + for _, agentOps := range conns.downloadOpsStreaming() { + agentOps.SortByStartTime() + for _, op := range agentOps { + csvWriter.Receiver() <- op + } + } + csvWriter.Close() +} +``` + +Note: distributed mode does not use a `clientID` in the same way — agents set +their own `ClientID` before sending ops. The `StreamingOpsWriter` should accept an +empty `clientID` string and skip the stamp when empty. + +**Simpler alternative for distributed**: keep the existing batch write for +`benchserver.go` in the initial implementation and only stream the local path. +The memory savings are largest for local benchmarks. This avoids changes to the +gRPC download path and reduces implementation risk. + +### 4.5 `SortByStartTime` — no longer possible, and not needed + +Streaming writes in arrival order, not start-time order. This is acceptable: + +- `warp analyze` uses `StreamOperationsFromCSV` which processes ops via + `bench.OperationsFromCSV` — this function does not require sorted input +- `warp replay` uses `StreamOperationsFromCSV` similarly +- Downstream tools (polarWarp, s3dlio replay) handle unsorted input + +If sorted output is required for some reason, it could be done as a post-processing +step in `warp analyze` (already partially supported). Do not add an in-memory sort +back into the streaming path. + +--- + +## 5. Memory Impact Summary + +With streaming, the entire `[]Operation` slice is eliminated. The only memory overhead is: + +- Channel buffer: 1000 ops × ~200 B (struct size, not GC-arena size) ≈ **200 KB** +- zstd write buffer: `bufio.NewWriterSize(..., 256*1024)` ≈ **256 KB** +- zstd encoder internal state: **~1 MB** (fixed, level-dependent) + +Total constant overhead: **~1.5 MB**, regardless of benchmark duration or concurrency. + +Comparison: + +| Scenario | Current batch | Streaming | Savings | +|----------|--------------|-----------|---------| +| 30s / 8c / 64KiB | +9 MB | ~1.5 MB | ~7.5 MB | +| 120s / 8c / 64KiB | +84 MB | ~1.5 MB | ~82.5 MB | +| 60s / 32c / 4KiB | +124 MB | ~1.5 MB | ~122.5 MB | +| 1hr / 32c (proj.) | ~8 GB | ~1.5 MB | ~8 GB | + +--- + +## 6. Analysis Time Impact + +Measured on a 120s / 8c / 64KiB run (119K ops): + +| Input | Analysis time | vs json.zst | +|-------|--------------|-------------| +| `.json.zst` (pre-aggregated) | 0.133 s | 1× baseline | +| `.csv.zst` with `warp analyze --full` | 0.651 s | 4.9× slower | +| `.csv.zst` without `--full` (re-agg) | 1.739 s | 13× slower | + +Streaming does not change these numbers — the on-disk format is identical. The +analysis time difference is inherent to the data volume in `.csv.zst` vs the +pre-aggregated `.json.zst`. + +--- + +## 7. Files to Create / Modify + +| File | Action | Notes | +|------|--------|-------| +| `pkg/bench/streaming_writer.go` | **Create** | New `StreamingOpsWriter` type | +| `cli/benchmark.go` | **Modify** | Two locations: `addCollector()` and local write block | +| `cli/benchserver.go` | **Modify (optional)** | Distributed write block; can defer to v2 | +| `cli/addcollector_test.go` | **Modify** | Add tests for streaming path | +| `README.md` | **Modify** | Update memory note to reflect new behaviour | + +No changes required to: +- `pkg/bench/ops.go` — `WriteCSV` and `StreamOperationsFromCSV` used as-is +- `pkg/bench/collector.go` — `NewNullCollector` used as-is; no new collector type needed +- `pkg/bench/csv.go` — TSV escape helpers used as-is +- `pkg/aggregate/` — `LiveCollector` unchanged + +--- + +## 8. Implementation Notes for Next Agent + +1. **Read `pkg/bench/ops.go` lines 1048–1100** before writing the streaming writer. + `WriteCSV` already exists and takes `(w io.Writer, i int)` — use it directly. + Do not reimplement the serialisation. + +2. **`cID` is generated before `b.Start()`** in both benchmark code paths (lines 220 + and 520 of `cli/benchmark.go`). Pass it to `NewStreamingOpsWriter` immediately after + generation. The writer must stamp `op.ClientID = w.clientID` in the goroutine loop + because all `Operation` structs arrive with `ClientID == ""`. + +3. **The channel buffer size of 1000** matches `collector.rcv`. If benchmark workers + produce ops faster than the zstd encoder can flush them, the channel will block + worker goroutines. This is the correct backpressure behaviour — it prevents silent + data loss. Do not use `try_send` / non-blocking sends. + +4. **`strings.SplitSeq`** is used in the existing `ops.CSV()` for the trailing comment + block (Go 1.24+). Use the same pattern in the streaming writer. + +5. **Error handling in the goroutine**: use a sticky `w.err` field. If a write fails, + drain the channel so worker goroutines are not blocked indefinitely, then surface + the error via `Close()`. Use `fatalIf` in the caller (consistent with existing + write-error handling in `benchmark.go`). + +6. **`addCollector()` refactor**: the function currently returns `(OpsCollector, chan UpdateReq)`. + With streaming, `OpsCollector` is no longer meaningful for `--full`. Two options: + - Return `bench.EmptyOpsCollector` (already exists as a sentinel) + - Add a `*StreamingOpsWriter` return value to `addCollector()` + + The second option is cleaner but changes the function signature. The first option + requires passing `csvWriter` as a parameter to `addCollector()`. Either works; + be consistent with whichever you choose. + +7. **Test additions** (`cli/addcollector_test.go`): add at minimum: + - `TestStreamingWriter_OpCount` — sends N ops, verifies N rows in output + - `TestStreamingWriter_ClientID` — verifies `clientID` is stamped correctly + - `TestStreamingWriter_FileCreatedEarly` — verifies file exists before `Close()` + +8. **Do not modify** `benchserver.go` in the first implementation. Keep the scope + minimal for the PR. The local benchmark path has 100% of the memory impact for + single-node usage. + +9. **Verify** with `go vet ./...` and `go test ./cli/ -run TestAddCollector -v` + after implementation. Check for data races with `go test -race ./cli/`. + +10. **The existing README memory note** (added in this PR) should be updated after + implementation to reflect that streaming eliminates the memory scaling concern + for local benchmarks. + +--- + +## 9. Relationship to PR #475 + +This streaming writer is a follow-on improvement to PR #475 ("Restore per-transaction +.csv.zst output via --full flag"). It is **not part of PR #475** — that PR establishes +the `--full` flag semantics and restores the feature. The streaming implementation +should be a separate PR after #475 is merged, titled something like: + +> `bench: stream --full ops directly to csv.zst, eliminating in-memory buffering` + +The memory measurements and README note in PR #475 accurately describe the current +(batch) behaviour. Once streaming is implemented, the README note should be updated +to reflect the improvement. diff --git a/docs/issue-streaming-full-writer.md b/docs/issue-streaming-full-writer.md new file mode 100644 index 00000000..6ee145be --- /dev/null +++ b/docs/issue-streaming-full-writer.md @@ -0,0 +1,67 @@ +# Feature: Stream `--full` ops to disk rather than buffering in memory + +## Background + +PR #475 restores the per-transaction `.csv.zst` output file behind a `--full` flag. +With `--full`, every individual operation is buffered in a `[]Operation` slice in RAM +for the entire benchmark duration, then written to disk in a single batch when the +benchmark ends. + +## Problem + +Memory overhead scales with both operation count and concurrency. +Measured on a local MinIO server (`put`, 64 KiB objects): + +| Duration | Concurrency | Extra RAM (`--full` vs default) | +|----------|-------------|---------------------------------| +| 30 s | 8 | +9 MB (~310 B/op) | +| 120 s | 8 | +84 MB (~740 B/op) | +| 60 s | 32 | +124 MB (~850 B/op) | +| 1 hr | 32 | ~8 GB (projected) | + +The per-op cost grows super-linearly with concurrency because Go's GC arena retains +the entire slice live until the process exits. At high concurrency or long durations +`--full` becomes impractical on machines with limited RAM, precisely the scenarios +where per-transaction data is most useful for diagnosing behaviour. + +## Proposed Fix + +Replace the post-benchmark batch write with a **background goroutine** that writes +each operation to a zstd encoder as it completes — a streaming writer. + +The approach is straightforward: + +1. Open the `.csv.zst` file and write the TSV header **before** `b.Start()`. +2. Spin up a goroutine that reads from a channel and calls `op.WriteCSV()` for each + arriving operation. +3. After `b.Close()`, signal the goroutine and wait for the zstd frame to be finalised. + +No changes to the on-disk format or the `warp analyze` read paths. +The `--full` flag semantics are unchanged. + +## Memory Impact After Fix + +The entire `[]Operation` slice is eliminated. Remaining overhead is constant: + +- Channel buffer (1 000 ops): ~200 KB +- zstd write buffer (256 KiB): ~256 KB +- zstd encoder state: ~1 MB + +**Total: ~1.5 MB constant, regardless of duration or concurrency.** + +## Analysis Time (unchanged) + +Streaming does not affect read-side performance. For reference: + +| Input | Analysis time | +|-------|--------------| +| `.json.zst` (default, pre-aggregated) | 0.13 s | +| `.csv.zst` with `warp analyze --full` | 0.65 s | +| `.csv.zst` without `--full` (re-agg) | 1.74 s | + +## Scope + +- New file `pkg/bench/streaming_writer.go` (~60 lines) +- Minor changes to `cli/benchmark.go` and `addCollector()` +- No changes to `pkg/bench/ops.go`, `pkg/bench/collector.go`, or any read path +- `benchserver.go` (distributed mode) can follow in a separate PR diff --git a/go.mod b/go.mod index 4e9e0bbe..4a5cc057 100644 --- a/go.mod +++ b/go.mod @@ -406,4 +406,5 @@ require ( mvdan.cc/gofumpt v0.9.2 // indirect mvdan.cc/unparam v0.0.0-20250301125049-0df0534333a4 // indirect ) -replace github.com/minio/warp => ./ \ No newline at end of file + +replace github.com/minio/warp => ./ diff --git a/go.sum b/go.sum index 7cb90acb..8cbb16ff 100644 --- a/go.sum +++ b/go.sum @@ -714,8 +714,6 @@ github.com/minio/cli v1.24.2 h1:J+fCUh9mhPLjN3Lj/YhklXvxj8mnyE/D6FpFduXJ2jg= github.com/minio/cli v1.24.2/go.mod h1:bYxnK0uS629N3Bq+AOZZ+6lwF77Sodk4+UL9vNuXhOY= github.com/minio/crc64nvme v1.1.1 h1:8dwx/Pz49suywbO+auHCBpCtlW1OfpcLN7wYgVR6wAI= github.com/minio/crc64nvme v1.1.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= -github.com/minio/madmin-go/v4 v4.6.3 h1:5F3hAs554YzjToJ7Gw/h++GoLttAU2tWDpl/d1FguRw= -github.com/minio/madmin-go/v4 v4.6.3/go.mod h1:JTPi1NPoXVxPUTmYYNEfBYBBcofa/ZQIvbHqQAaQ41E= github.com/minio/madmin-go/v4 v4.10.0 h1:sa3Gqg5EGTixN0eCEJwSsmWFIwxd449sNw60CaafX6E= github.com/minio/madmin-go/v4 v4.10.0/go.mod h1:x0dnglymF0va4vh4qJ/dU8Pf+kSCIfR8TL2xyRNIEgY= github.com/minio/mc v0.0.0-20251106162529-77f82e18b540 h1:OAeamQLGQyf7sT/JEocLpAfMTU2Me5Jx3c1MjyS/mNo= @@ -728,8 +726,6 @@ github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA= github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ= github.com/minio/pkg/v3 v3.6.1 h1:gaNT80BS/iuIany5ylTkVmfN4s6UYY30OtImFv4GQA8= github.com/minio/pkg/v3 v3.6.1/go.mod h1:fYlexVD0GMD0XNeBHeefFI6YBE0Oo8oDbDPWm3Jd68I= -github.com/minio/warp v1.4.0 h1:drunww6WC3hGVkm2gpGVcQk7NwA/sJgnED0rqU9CESY= -github.com/minio/warp v1.4.0/go.mod h1:SXc6BLPULhdK/23vvdPuDlATcdZ64bVg+iBroFGNElo= github.com/minio/websocket v1.6.0 h1:CPvnQvNvlVaQmvw5gtJNyYQhg4+xRmrPNhBbv8BdpAE= github.com/minio/websocket v1.6.0/go.mod h1:COH1CePZfHT9Ec1O7vZjTlX5uEPpyYnrifPNbu665DM= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= diff --git a/main.go b/main.go index ff76fcad..841aeaa4 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ package main import ( "os" + "github.com/russfellows/warp-replay/cli" ) diff --git a/pkg/aggregate/collector.go b/pkg/aggregate/collector.go index 48d96876..203216d8 100644 --- a/pkg/aggregate/collector.go +++ b/pkg/aggregate/collector.go @@ -38,8 +38,9 @@ func LiveCollector(ctx context.Context, updates chan UpdateReq, clientID string, updates = make(chan UpdateReq, 1000) } c.updates = updates + rcv := c.rcv go func() { - final := Live(c.rcv, updates, clientID, extra) + final := Live(rcv, updates, clientID, extra) for { select { case <-ctx.Done(): diff --git a/pkg/bench/streaming_writer.go b/pkg/bench/streaming_writer.go new file mode 100644 index 00000000..2c04a918 --- /dev/null +++ b/pkg/bench/streaming_writer.go @@ -0,0 +1,153 @@ +/* + * Warp (C) 2019-2026 MinIO, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package bench + +import ( + "bufio" + "fmt" + "os" + "strings" + + "github.com/klauspost/compress/zstd" +) + +// StreamingOpsWriter writes Operations to a zstd-compressed TSV file as they +// arrive, without buffering them in memory. It is the streaming equivalent of +// Operations.CSV(), suitable for use during a live benchmark run. +// +// Usage with a Collector (production): +// 1. Create with NewStreamingOpsWriter. +// 2. Pass Receiver() as an extra channel to bench.NewNullCollector. +// 3. After Collector.Close() (which closes the receiver channel), call Wait(). +// +// Standalone usage (tests): +// 1. Create with NewStreamingOpsWriter. +// 2. Send ops to Receiver(). +// 3. Call Close() to flush and finalize. +type StreamingOpsWriter struct { + ch chan Operation + done chan struct{} + clientID string + cmdLine string + err error +} + +// NewStreamingOpsWriter creates the output file immediately, writes the TSV +// header, and starts a background goroutine to consume and write operations. +// +// - path – full path to the output .csv.zst file (created immediately) +// - clientID – stamped on every row's client_id column; leave empty to +// preserve any ClientID already set on arriving Operations +// - cmdLine – written as a trailing "# …" comment after all ops are flushed +func NewStreamingOpsWriter(path, clientID, cmdLine string) (*StreamingOpsWriter, error) { + f, err := os.Create(path) + if err != nil { + return nil, err + } + + enc, err := zstd.NewWriter(f, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + if err != nil { + f.Close() + return nil, err + } + + bw := bufio.NewWriterSize(enc, 256*1024) + + // Write the header immediately so the file is valid even if the benchmark + // is interrupted before any ops complete. + const header = "idx\tthread\top\tclient_id\tn_objects\tbytes\tendpoint\tfile\terror\tstart\tfirst_byte\tend\tduration_ns\tcat\n" + if _, err := bw.WriteString(header); err != nil { + enc.Close() + f.Close() + return nil, err + } + + w := &StreamingOpsWriter{ + ch: make(chan Operation, 1000), + done: make(chan struct{}), + clientID: clientID, + cmdLine: cmdLine, + } + + go func() { + defer close(w.done) + + var idx int + for op := range w.ch { + if w.clientID != "" { + op.ClientID = w.clientID + } + if err := op.WriteCSV(bw, idx); err != nil { + w.err = err + // Drain remaining ops so senders are not blocked indefinitely. + for range w.ch { //nolint:revive // intentionally empty drain loop + } + break + } + idx++ + } + + // Write trailing command-line comment (mirrors Operations.CSV behavior). + if w.err == nil && len(w.cmdLine) > 0 { + for txt := range strings.SplitSeq(w.cmdLine, "\n") { + if _, err := fmt.Fprintf(bw, "# %s\n", txt); err != nil { + w.err = err + break + } + } + } + + if w.err == nil { + w.err = bw.Flush() + } + if err := enc.Close(); err != nil && w.err == nil { + w.err = err + } + if err := f.Close(); err != nil && w.err == nil { + w.err = err + } + }() + + return w, nil +} + +// Receiver returns the send-only channel for incoming Operations. +// Pass this to bench.NewNullCollector as an extra channel for collector-integrated +// use; the collector's Close() will close the channel when the benchmark ends. +func (w *StreamingOpsWriter) Receiver() chan<- Operation { + return w.ch +} + +// Close stops accepting operations and waits for all buffered operations to be +// flushed to disk. For standalone / test use only. When the writer is wired +// through a Collector, use Wait() instead — Collector.Close() already closes +// the channel; calling Close() afterwards would panic on double-close. +func (w *StreamingOpsWriter) Close() error { + close(w.ch) + return w.Wait() +} + +// Wait blocks until the background goroutine has fully flushed all operations +// and closed the output file. Call this after the Collector's Close() method +// has been called: Collector.Close() closes all extra channels it holds +// (including the one returned by Receiver()), which signals the goroutine to +// finish. +func (w *StreamingOpsWriter) Wait() error { + <-w.done + return w.err +} diff --git a/pkg/bench/streaming_writer_test.go b/pkg/bench/streaming_writer_test.go new file mode 100644 index 00000000..221b53e2 --- /dev/null +++ b/pkg/bench/streaming_writer_test.go @@ -0,0 +1,368 @@ +/* + * Warp (C) 2019-2026 MinIO, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package bench + +import ( + "os" + "testing" + "time" + + "github.com/klauspost/compress/zstd" +) + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +// openCSVZst opens a .csv.zst file and returns its decoded Operations. +// Registered as a test helper so failures show the caller's file/line. +func openCSVZst(t *testing.T, path string) Operations { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("open %s: %v", path, err) + } + defer f.Close() + + dec, err := zstd.NewReader(f) + if err != nil { + t.Fatalf("zstd.NewReader: %v", err) + } + defer dec.Close() + + ops, err := OperationsFromCSV(dec, false, 0, 0, func(string, ...any) {}) + if err != nil { + t.Fatalf("OperationsFromCSV(%s): %v", path, err) + } + return ops +} + +// makeOp returns a simple PUT operation with predictable field values. +func makeOp(thread uint32, size int64) Operation { + now := time.Now().Truncate(time.Millisecond) + return Operation{ + OpType: "PUT", + Thread: thread, + Start: now, + End: now.Add(42 * time.Millisecond), + Size: size, + Endpoint: "localhost:9000", + File: "test/obj", + } +} + +// --------------------------------------------------------------------------- +// Test 1: file is created at construction time, before any ops arrive +// --------------------------------------------------------------------------- + +func TestStreamingWriter_FileCreatedOnNew(t *testing.T) { + path := t.TempDir() + "/test.csv.zst" + + w, err := NewStreamingOpsWriter(path, "c1", "warp test") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + defer w.Close() //nolint:errcheck + + // File must exist before any ops are sent and before Close() is called. + if _, err := os.Stat(path); err != nil { + t.Errorf("file not created at construction time: %v", err) + } +} + +// --------------------------------------------------------------------------- +// Test 2: correct number of ops written +// --------------------------------------------------------------------------- + +func TestStreamingWriter_OpCount(t *testing.T) { + path := t.TempDir() + "/ops.csv.zst" + + w, err := NewStreamingOpsWriter(path, "c1", "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + const numOps = 50 + for i := 0; i < numOps; i++ { + w.Receiver() <- makeOp(uint32(i%4), int64(i*1024)) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + got := openCSVZst(t, path) + if len(got) != numOps { + t.Errorf("expected %d ops, got %d", numOps, len(got)) + } +} + +// --------------------------------------------------------------------------- +// Test 3: clientID is stamped on every row +// --------------------------------------------------------------------------- + +func TestStreamingWriter_ClientIDStamped(t *testing.T) { + path := t.TempDir() + "/cid.csv.zst" + const wantID = "XYZW" + + w, err := NewStreamingOpsWriter(path, wantID, "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + w.Receiver() <- makeOp(0, 512) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != 1 { + t.Fatalf("expected 1 op, got %d", len(ops)) + } + if ops[0].ClientID != wantID { + t.Errorf("ClientID: got %q, want %q", ops[0].ClientID, wantID) + } +} + +// --------------------------------------------------------------------------- +// Test 4: empty clientID preserves any ClientID set on the arriving Operation +// --------------------------------------------------------------------------- + +func TestStreamingWriter_EmptyClientID_PreservesField(t *testing.T) { + path := t.TempDir() + "/nocid.csv.zst" + + // Pass empty clientID — writer must not overwrite op.ClientID. + w, err := NewStreamingOpsWriter(path, "", "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + op := makeOp(0, 1024) + op.ClientID = "already-set" + w.Receiver() <- op + + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != 1 { + t.Fatalf("expected 1 op, got %d", len(ops)) + } + if ops[0].ClientID != "already-set" { + t.Errorf("ClientID: got %q, want %q", ops[0].ClientID, "already-set") + } +} + +// --------------------------------------------------------------------------- +// Test 5: zero ops — empty writer produces a valid (header-only) file +// --------------------------------------------------------------------------- + +func TestStreamingWriter_EmptyRun(t *testing.T) { + path := t.TempDir() + "/empty.csv.zst" + + w, err := NewStreamingOpsWriter(path, "c1", "mycmd") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close on empty writer: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != 0 { + t.Errorf("expected 0 ops from empty writer, got %d", len(ops)) + } +} + +// --------------------------------------------------------------------------- +// Test 6: operation field values are preserved end-to-end +// --------------------------------------------------------------------------- + +func TestStreamingWriter_ValuesPreserved(t *testing.T) { + path := t.TempDir() + "/vals.csv.zst" + + w, err := NewStreamingOpsWriter(path, "ID42", "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + now := time.Now().Truncate(time.Millisecond) + fb := now.Add(10 * time.Millisecond) + sent := Operation{ + OpType: "STAT", + Thread: 7, + Start: now, + End: now.Add(123 * time.Millisecond), + FirstByte: &fb, + Size: 65536, + ObjPerOp: 2, + Endpoint: "s3.example.com:443", + File: "bucket/prefix/obj", + Err: "", + } + w.Receiver() <- sent + + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != 1 { + t.Fatalf("expected 1 op, got %d", len(ops)) + } + got := ops[0] + + if got.ClientID != "ID42" { + t.Errorf("ClientID: got %q, want %q", got.ClientID, "ID42") + } + if got.OpType != sent.OpType { + t.Errorf("OpType: got %q, want %q", got.OpType, sent.OpType) + } + if got.Thread != sent.Thread { + t.Errorf("Thread: got %d, want %d", got.Thread, sent.Thread) + } + if !got.Start.Equal(sent.Start) { + t.Errorf("Start: got %v, want %v", got.Start, sent.Start) + } + if !got.End.Equal(sent.End) { + t.Errorf("End: got %v, want %v", got.End, sent.End) + } + if got.FirstByte == nil || !got.FirstByte.Equal(fb) { + t.Errorf("FirstByte: got %v, want %v", got.FirstByte, fb) + } + if got.Size != sent.Size { + t.Errorf("Size: got %d, want %d", got.Size, sent.Size) + } +} + +// --------------------------------------------------------------------------- +// Test 7: multiple ops, all with correct clientID +// --------------------------------------------------------------------------- + +func TestStreamingWriter_MultipleOps_AllClientIDSet(t *testing.T) { + path := t.TempDir() + "/multi.csv.zst" + const wantID = "AAAA" + + w, err := NewStreamingOpsWriter(path, wantID, "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + const n = 20 + for i := 0; i < n; i++ { + op := makeOp(uint32(i), int64(i*512)) + op.OpType = "GET" + w.Receiver() <- op + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != n { + t.Fatalf("expected %d ops, got %d", n, len(ops)) + } + for i, op := range ops { + if op.ClientID != wantID { + t.Errorf("op[%d] ClientID: got %q, want %q", i, op.ClientID, wantID) + } + } +} + +// --------------------------------------------------------------------------- +// Test 8: Wait() can be used as an alternative to Close() (collector pattern) +// --------------------------------------------------------------------------- + +// Simulate the collector-integrated pattern: +// 1. Pass Receiver() to a NullCollector as an extra channel. +// 2. Close the collector (which closes the extra channel = Receiver()). +// 3. Call Wait() to drain. + +func TestStreamingWriter_Wait_CollectorPattern(t *testing.T) { + path := t.TempDir() + "/wait.csv.zst" + + w, err := NewStreamingOpsWriter(path, "cWait", "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + // Use NewNullCollector to simulate what addCollector does in streaming mode. + col := NewNullCollector(w.Receiver()) + + const numOps = 15 + for i := 0; i < numOps; i++ { + col.Receiver() <- makeOp(0, 1024) + } + // Collector.Close() closes its extra channels, including w.Receiver(). + col.Close() + + // Wait() (not Close()) must be called here — Close() would double-close. + if err := w.Wait(); err != nil { + t.Fatalf("Wait: %v", err) + } + + ops := openCSVZst(t, path) + if len(ops) != numOps { + t.Errorf("expected %d ops after Wait, got %d", numOps, len(ops)) + } + for _, op := range ops { + if op.ClientID != "cWait" { + t.Errorf("ClientID mismatch: %q", op.ClientID) + } + } +} + +// --------------------------------------------------------------------------- +// Test 9: data-race check — concurrent sends are safe +// --------------------------------------------------------------------------- + +func TestStreamingWriter_ConcurrentSends(t *testing.T) { + if testing.Short() { + t.Skip("skipping concurrent test in short mode") + } + path := t.TempDir() + "/race.csv.zst" + + w, err := NewStreamingOpsWriter(path, "raceID", "") + if err != nil { + t.Fatalf("NewStreamingOpsWriter: %v", err) + } + + const goroutines = 8 + const opsPerGoroutine = 50 + done := make(chan struct{}) + + for g := 0; g < goroutines; g++ { + go func(g int) { + defer func() { done <- struct{}{} }() + for i := 0; i < opsPerGoroutine; i++ { + w.Receiver() <- makeOp(uint32(g), int64(g*i+1)) + } + }(g) + } + for g := 0; g < goroutines; g++ { + <-done + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + ops := openCSVZst(t, path) + if want := goroutines * opsPerGoroutine; len(ops) != want { + t.Errorf("expected %d ops, got %d", want, len(ops)) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 9fdf5cff..0e45f29f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -11,11 +11,11 @@ import ( // ReplayConfig defines the structure of our YAML configuration. type ReplayConfig struct { - DefaultS3Targets []string `yaml:"default_s3_targets"` -// 1 to 1 host mapping -// HostMapping map[string]string `yaml:"host_mapping"` -// 1 to many host mapping - HostMapping map[string][]string `yaml:"host_mapping"` + DefaultS3Targets []string `yaml:"default_s3_targets"` + // 1 to 1 host mapping + // HostMapping map[string]string `yaml:"host_mapping"` + // 1 to many host mapping + HostMapping map[string][]string `yaml:"host_mapping"` StateTrackingConfig `yaml:"state_tracking"` // Nested struct } @@ -26,14 +26,14 @@ type StateTrackingConfig struct { ProducerOperations []string `yaml:"producer_operations"` } -// compiledHostMapping stores direct and regex-compiled mappings. -type compiledHostMapping struct { - //Direct map[string]string - Direct map[string][]string +// CompiledHostMapping stores direct and regex-compiled mappings. +type CompiledHostMapping struct { + // Direct map[string]string + Direct map[string][]string Wildcard []struct { Pattern *regexp.Regexp Targets []string - //Target string + // Target string } } @@ -50,53 +50,52 @@ func LoadConfig(filePath string) (*ReplayConfig, error) { } // Default state tracking settings if not specified - if cfg.StateTrackingConfig.RetentionWindowSeconds == 0 { - cfg.StateTrackingConfig.RetentionWindowSeconds = 600 // Default 10 minutes + if cfg.RetentionWindowSeconds == 0 { + cfg.RetentionWindowSeconds = 600 // Default 10 minutes } - if len(cfg.StateTrackingConfig.ProducerOperations) == 0 { - cfg.StateTrackingConfig.ProducerOperations = []string{"PutObject", "CreateBucket"} // Common S3 producers + if len(cfg.ProducerOperations) == 0 { + cfg.ProducerOperations = []string{"PutObject", "CreateBucket"} // Common S3 producers } - return &cfg, nil } // NewCompiledHostMapping processes the raw host_mapping from config // to separate direct matches from wildcard/regex patterns and compile regexes. -//func NewCompiledHostMapping(rawMappings map[string]string) (*compiledHostMapping, error) { -func NewCompiledHostMapping(rawMappings map[string][]string) (*compiledHostMapping, error) { - cm := &compiledHostMapping{ - //Direct: make(map[string]string), - Direct: make(map[string][]string), +// func NewCompiledHostMapping(rawMappings map[string]string) (*CompiledHostMapping, error) { +func NewCompiledHostMapping(rawMappings map[string][]string) (*CompiledHostMapping, error) { + cm := &CompiledHostMapping{ + // Direct: make(map[string]string), + Direct: make(map[string][]string), Wildcard: []struct { Pattern *regexp.Regexp - Targets []string - //Target string + Targets []string + // Target string }{}, } -// for original, target := range rawMappings { + // for original, target := range rawMappings { for original, targets := range rawMappings { // Heuristic: If it contains common regex metacharacters, treat as regex. // Otherwise, treat as a direct string match. if containsRegexMetachar(original) { // Prepend and append ^$ to ensure full string match regexPattern := "^" + original + "$" - + re, err := regexp.Compile(regexPattern) if err != nil { return nil, fmt.Errorf("invalid regex pattern '%s' in host_mapping: %w", original, err) } cm.Wildcard = append(cm.Wildcard, struct { Pattern *regexp.Regexp - Targets []string + Targets []string }{Pattern: re, Targets: targets}) - //cm.Wildcard = append(cm.Wildcard, struct { - //Pattern *regexp.Regexp - //Target string - //}{Pattern: re, Target: target}) + // cm.Wildcard = append(cm.Wildcard, struct { + // Pattern *regexp.Regexp + // Target string + // }{Pattern: re, Target: target}) } else { - //cm.Direct[original] = target + // cm.Direct[original] = target cm.Direct[original] = targets } } @@ -107,40 +106,39 @@ func NewCompiledHostMapping(rawMappings map[string][]string) (*compiledHostMappi // keep the old cfg.Resolve(host) code path. It recompiles the mapping // once per call; that’s fine for replay workloads. func (cfg *ReplayConfig) Resolve(originalHost string) (string, error) { - mapper, err := NewCompiledHostMapping(cfg.HostMapping) - if err != nil { - return "", err - } - rr := 0 // single-shot call – round-robin index local - return mapper.ResolveTarget(originalHost, cfg.DefaultS3Targets, &rr), nil + mapper, err := NewCompiledHostMapping(cfg.HostMapping) + if err != nil { + return "", err + } + rr := 0 // single-shot call – round-robin index local + return mapper.ResolveTarget(originalHost, cfg.DefaultS3Targets, &rr), nil } // ResolveTarget resolves the new target URL based on original host, // using direct mapping, then regex mapping, then default targets (round-robin). -//func (cm *compiledHostMapping) ResolveTarget(originalHost string, defaultTargets []string, roundRobinIdx *int) string { -func (cm *compiledHostMapping) ResolveTarget(originalHost string, defaultTargets []string, roundRobinIdx *int) string { +// func (cm *CompiledHostMapping) ResolveTarget(originalHost string, defaultTargets []string, roundRobinIdx *int) string { +func (cm *CompiledHostMapping) ResolveTarget(originalHost string, defaultTargets []string, roundRobinIdx *int) string { // 1. Check explicit direct mappings - //if target, ok := cm.Direct[originalHost]; ok { + // if target, ok := cm.Direct[originalHost]; ok { // return target //} - if targets, ok := cm.Direct[originalHost]; ok && len(targets) > 0 { - i := *roundRobinIdx % len(targets) - target := targets[i] - *roundRobinIdx = (*roundRobinIdx + 1) % len(targets) - return target - } - + if targets, ok := cm.Direct[originalHost]; ok && len(targets) > 0 { + i := *roundRobinIdx % len(targets) + target := targets[i] + *roundRobinIdx = (*roundRobinIdx + 1) % len(targets) + return target + } // 2. Check regex mappings // Iterate through wildcard patterns in the order they were defined in YAML. // The first match wins. This is important if you have overlapping patterns. for _, mapping := range cm.Wildcard { if mapping.Pattern.MatchString(originalHost) { - // return mapping.Target - i := *roundRobinIdx % len(mapping.Targets) - target := mapping.Targets[i] - *roundRobinIdx = (*roundRobinIdx + 1) % len(mapping.Targets) - return target + // return mapping.Target + i := *roundRobinIdx % len(mapping.Targets) + target := mapping.Targets[i] + *roundRobinIdx = (*roundRobinIdx + 1) % len(mapping.Targets) + return target } } diff --git a/pkg/generator/rngfix/unsafe_enabled.go b/pkg/generator/rngfix/unsafe_enabled.go index c8d41fb8..430d579c 100644 --- a/pkg/generator/rngfix/unsafe_enabled.go +++ b/pkg/generator/rngfix/unsafe_enabled.go @@ -25,15 +25,15 @@ import ( "unsafe" ) -const unsafeEnabled = true +const unsafeEnabled = true //nolint:unused -func load64(b []byte, i int) uint64 { +func load64(b []byte, i int) uint64 { //nolint:unused // return binary.LittleEndian.Uint64(b[i:]) // return *(*uint64)(unsafe.Pointer(&b[i])) return *(*uint64)(unsafe.Pointer(uintptr(unsafe.Pointer(&b[0])) + uintptr(i)*unsafe.Sizeof(b[0]))) } -func store64(b []byte, i int, v uint64) { +func store64(b []byte, i int, v uint64) { //nolint:unused // binary.LittleEndian.PutUint64(b, v) *(*uint64)(unsafe.Pointer(uintptr(unsafe.Pointer(&b[0])) + uintptr(i)*unsafe.Sizeof(b[0]))) = v } diff --git a/pkg/generator/rngfix/xor.go b/pkg/generator/rngfix/xor.go index efb2bbe3..a7f73fa0 100644 --- a/pkg/generator/rngfix/xor.go +++ b/pkg/generator/rngfix/xor.go @@ -19,7 +19,7 @@ package rngfix // xor32Go will do out[x] = in[x] ^ v[x mod 32] // len(in) must >= len(out) and length must be a multiple of 32. -func xor32Go(in, out []byte, v *[4]uint64) { +func xor32Go(in, out []byte, v *[4]uint64) { //nolint:unused if unsafeEnabled { // Faster with "unsafe", slower without. var i int diff --git a/pkg/state/state.go b/pkg/state/state.go index a58d9170..9d106ba3 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -1,4 +1,5 @@ -// pkg/state/state.go +// Package state provides a small TTL-based mapping used to keep +// object-to-target endpoint assignments stable during a replay window. package state import ( @@ -6,7 +7,8 @@ import ( "time" ) -type StateManager struct { +// Manager tracks object-to-endpoint assignments with a TTL. +type Manager struct { ttl time.Duration mu sync.RWMutex m map[string]entry @@ -17,8 +19,8 @@ type entry struct { expire time.Time } -func New(ttl time.Duration) *StateManager { - return &StateManager{ +func New(ttl time.Duration) *Manager { + return &Manager{ ttl: ttl, m: make(map[string]entry), } @@ -26,7 +28,7 @@ func New(ttl time.Duration) *StateManager { // LookupOrSet returns the cached target for objectID if still valid, // otherwise stores `target` and returns it. -func (s *StateManager) LookupOrSet(objID, target string) string { +func (s *Manager) LookupOrSet(objID, target string) string { now := time.Now() s.mu.Lock() defer s.mu.Unlock() @@ -37,4 +39,3 @@ func (s *StateManager) LookupOrSet(objID, target string) string { s.m[objID] = entry{target: target, expire: now.Add(s.ttl)} return target } -