Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ dist

# qreleaser state file
.release-state.sh

# local secrets — never commit
.env
warp-test
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ flag warp writes **both** output files:
warp put --host=... --full
```

> **Note:** `--full` streams every individual operation directly to the `.csv.zst` file
> as it completes, using a background goroutine. Memory overhead is constant (~1.5 MB)
> regardless of benchmark duration or concurrency — earlier releases buffered all ops in
> memory, which scaled to several GB for long high-concurrency runs.
>
> Analysis time differs by file type: a `.csv.zst` file takes ~5× longer to analyze with
> `--full` and ~13× longer to re-aggregate without it, compared to a `.json.zst` aggregate
> (which analyzes in under 0.2 s regardless of run length). The `.json.zst` aggregate is
> always written alongside the `.csv.zst` when `--full` is used, so both are available.
### Analyzing Results

To get accurate per-operation statistics from a `.csv.zst` file, pass `--full` to `warp analyze`:
Expand Down
222 changes: 206 additions & 16 deletions cli/addcollector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cli

// Tests for addCollector behaviour.
// Tests for addCollector behavior.
//
// The invariants we care about:
//
Expand All @@ -17,9 +17,11 @@ package cli
import (
"context"
"flag"
"os"
"testing"
"time"

"github.com/klauspost/compress/zstd"
mcli "github.com/minio/cli"
"github.com/minio/warp/pkg/aggregate"
"github.com/minio/warp/pkg/bench"
Expand Down Expand Up @@ -95,8 +97,8 @@ func TestAddCollector_DefaultMode_NoOpsStored(t *testing.T) {
}

// Send an operation to the collector and close it.
sendOp(t, b.Common.Collector)
b.Common.Collector.Close()
sendOp(t, b.Collector)
b.Collector.Close()

// Without --full, retrieveOps is EmptyOpsCollector → must return empty.
ops := retrieveOps()
Expand Down Expand Up @@ -125,9 +127,9 @@ func TestAddCollector_FullMode_OpsAreCollected(t *testing.T) {

const numOps = 5
for i := 0; i < numOps; i++ {
sendOp(t, b.Common.Collector)
sendOp(t, b.Collector)
}
b.Common.Collector.Close()
b.Collector.Close()

ops := retrieveOps()
if len(ops) != numOps {
Expand All @@ -151,11 +153,11 @@ func TestAddCollector_FullMode_LiveCollectorAlsoReceivesOps(t *testing.T) {

const numOps = 3
for i := 0; i < numOps; i++ {
sendOp(t, b.Common.Collector)
sendOp(t, b.Collector)
}
// Close flushes bench.OpsCollector and, via the extra channel, also
// signals the live collector to finish computing its aggregate.
b.Common.Collector.Close()
b.Collector.Close()

// Sanity-check: per-transaction ops are present.
ops := retrieveOps()
Expand Down Expand Up @@ -197,9 +199,9 @@ func TestAddCollector_DefaultMode_UpdatesChannelFunctional(t *testing.T) {
// Send ops then close.
const numOps = 4
for i := 0; i < numOps; i++ {
sendOp(t, b.Common.Collector)
sendOp(t, b.Collector)
}
b.Common.Collector.Close()
b.Collector.Close()

// The updates channel is buffered (capacity 1000); a write must not block.
// A Reset request is a no-op for the live collector and safe to use here.
Expand All @@ -222,7 +224,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) {
t.Run("full=false", func(t *testing.T) {
ctx := makeCtx(false)
b := &stubBench{}
b.Common.DiscardOutput = true
b.DiscardOutput = true

retrieveOps, updates := addCollector(ctx, b)

Expand All @@ -239,7 +241,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) {
t.Run("full=true", func(t *testing.T) {
ctx := makeCtx(true)
b := &stubBench{}
b.Common.DiscardOutput = true
b.DiscardOutput = true

retrieveOps, updates := addCollector(ctx, b)

Expand All @@ -258,7 +260,7 @@ func TestAddCollector_DiscardOutput_NullCollector(t *testing.T) {
// ---------------------------------------------------------------------------

// This test validates the core contract: --full must not disable any existing
// behaviour, only add the per-transaction csv.zst path on top.
// behavior, only add the per-transaction csv.zst path on top.
// We verify this by confirming both the ops slice AND the live aggregate are
// non-empty after the same set of operations.
func TestAddCollector_FullMode_IsAdditive(t *testing.T) {
Expand All @@ -275,9 +277,9 @@ func TestAddCollector_FullMode_IsAdditive(t *testing.T) {
// Send ops.
const numOps = 6
for i := 0; i < numOps; i++ {
sendOp(t, b.Common.Collector)
sendOp(t, b.Collector)
}
b.Common.Collector.Close()
b.Collector.Close()

// Per-transaction store must be full.
ops := retrieveOps()
Expand Down Expand Up @@ -315,8 +317,8 @@ func TestAddCollector_FullMode_OpValuesPreserved(t *testing.T) {
Size: 8192,
Thread: 3,
}
b.Common.Collector.Receiver() <- op
b.Common.Collector.Close()
b.Collector.Receiver() <- op
b.Collector.Close()

ops := retrieveOps()
if len(ops) != 1 {
Expand All @@ -339,3 +341,191 @@ func TestAddCollector_FullMode_OpValuesPreserved(t *testing.T) {
t.Errorf("Thread: got %d want %d", got.Thread, op.Thread)
}
}

// ---------------------------------------------------------------------------
// Stage 2 tests: streaming mode via addCollector's fullExtra parameter
// ---------------------------------------------------------------------------

// ---------------------------------------------------------------------------
// Test 8: streaming mode — ops go to the channel, not to in-memory store
// ---------------------------------------------------------------------------

// When a streaming channel is passed to addCollector and --full is set:
// - retrieveOps must return empty (ops are streamed, not buffered in RAM)
// - the streaming channel must receive every op
func TestAddCollector_StreamingMode_OpsGoToChannel(t *testing.T) {
ctx := makeCtx(true) // --full

streamCh := make(chan bench.Operation, 100)
b := &stubBench{}

retrieveOps, updates := addCollector(ctx, b, streamCh)
if updates == nil {
t.Fatal("expected non-nil updates in streaming mode")
}

const numOps = 8
for i := 0; i < numOps; i++ {
sendOp(t, b.Collector)
}
// Collector.Close() closes streamCh (it is in the fan-out extra list).
b.Collector.Close()

// In streaming mode, retrieveOps must return empty (no in-memory accumulation).
ops := retrieveOps()
if len(ops) != 0 {
t.Errorf("streaming mode: expected 0 in-memory ops, got %d", len(ops))
}

// All ops must have arrived on the streaming channel.
var received int
for range streamCh {
received++
}
if received != numOps {
t.Errorf("streaming channel: expected %d ops, got %d", numOps, received)
}
}

// ---------------------------------------------------------------------------
// Test 9: streaming mode — live collector still receives every op
// ---------------------------------------------------------------------------

// --full with streaming must be additive: both the streaming channel AND the
// live aggregate collector receive every op.
func TestAddCollector_StreamingMode_LiveCollectorAlsoReceives(t *testing.T) {
ctx := makeCtx(true)

streamCh := make(chan bench.Operation, 100)
b := &stubBench{}

_, updates := addCollector(ctx, b, streamCh)

const numOps = 5
for i := 0; i < numOps; i++ {
sendOp(t, b.Collector)
}
b.Collector.Close()

// Drain streaming channel so the test doesn't block.
var streamed int
for range streamCh {
streamed++
}
if streamed != numOps {
t.Errorf("streaming channel: expected %d ops, got %d", numOps, streamed)
}

// Live collector must also have received all ops.
final := requestFinal(t, updates)
if final == nil {
t.Fatal("live aggregate is nil; streaming mode must not disable live collector")
}
if final.Total.TotalRequests == 0 {
t.Errorf("live TotalRequests == 0; ops were not forwarded to live collector in streaming mode")
}
}

// ---------------------------------------------------------------------------
// Test 10: backward-compat — batch fallback when no streaming channel given
// ---------------------------------------------------------------------------

// Passing no fullExtra channel with --full keeps the existing batch behavior:
// retrieveOps returns all ops (needed for distributed agent path).
func TestAddCollector_FullMode_BatchFallback_NoChannel(t *testing.T) {
ctx := makeCtx(true)
b := &stubBench{}

// No extra channel → existing batch mode.
retrieveOps, updates := addCollector(ctx, b)
if updates == nil {
t.Fatal("expected non-nil updates in batch fallback mode")
}

const numOps = 4
for i := 0; i < numOps; i++ {
sendOp(t, b.Collector)
}
b.Collector.Close()

ops := retrieveOps()
if len(ops) != numOps {
t.Errorf("batch fallback: expected %d in-memory ops, got %d", numOps, len(ops))
}
}

// ---------------------------------------------------------------------------
// Test 11: end-to-end — streaming path writes correct csv.zst file
// ---------------------------------------------------------------------------

// Full pipeline: addCollector with a StreamingOpsWriter → send ops → close
// collector (which closes writer channel) → wait writer → read back file.
func TestAddCollector_Streaming_EndToEnd(t *testing.T) {
dir := t.TempDir()
path := dir + "/out.csv.zst"
const wantClientID = "e2eID"

csvWriter, err := bench.NewStreamingOpsWriter(path, wantClientID, "")
if err != nil {
t.Fatalf("NewStreamingOpsWriter: %v", err)
}

ctx := makeCtx(true)
b := &stubBench{}

retrieveOps, updates := addCollector(ctx, b, csvWriter.Receiver())
if updates == nil {
t.Fatal("expected non-nil updates in streaming end-to-end test")
}

const numOps = 12
now := time.Now().Truncate(time.Millisecond)
for i := 0; i < numOps; i++ {
b.Collector.Receiver() <- bench.Operation{
OpType: "GET",
Thread: uint32(i % 4),
Start: now,
End: now.Add(time.Duration(i+1) * time.Millisecond),
Size: int64(1024 * (i + 1)),
}
}
// Collector.Close() closes csvWriter.Receiver() as an extra channel.
b.Collector.Close()

// Streaming mode: no in-memory ops.
ops := retrieveOps()
if len(ops) != 0 {
t.Errorf("expected 0 in-memory ops in streaming mode, got %d", len(ops))
}

// Wait for writer to flush to disk.
if err := csvWriter.Wait(); err != nil {
t.Fatalf("csvWriter.Wait: %v", err)
}

// Read back and validate.
f, err := os.Open(path)
if err != nil {
t.Fatalf("open csv file: %v", err)
}
defer f.Close()

dec, err := zstd.NewReader(f)
if err != nil {
t.Fatalf("zstd.NewReader: %v", err)
}
defer dec.Close()

written, err := bench.OperationsFromCSV(dec, false, 0, 0, func(string, ...any) {})
if err != nil {
t.Fatalf("OperationsFromCSV: %v", err)
}
if len(written) != numOps {
t.Errorf("csv file: expected %d ops, got %d", numOps, len(written))
}
for i, op := range written {
if op.ClientID != wantClientID {
t.Errorf("op[%d] ClientID: got %q, want %q", i, op.ClientID, wantClientID)
}
}
}
Loading
Loading