diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 19d75c2ec562..3d5befd97444 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string { return c.client.Endpoints() } +func (c *RecordingClient) SetEndpoints(endpoints ...string) { + c.client.SetEndpoints(endpoints...) +} + func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan { request := model.WatchRequest{ Key: key, diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 4d22f376a82d..d2a40dceb336 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -16,6 +16,8 @@ package traffic import ( "context" + "math/rand" + "slices" "sync" "testing" "time" @@ -139,6 +141,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 defer wg.Done() defer c.Close() + if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil { + wg.Add(1) + go func() { + defer wg.Done() + runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish) + }() + } + traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{ Client: c, QPSLimiter: limiter, @@ -177,6 +187,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 go func(c *client.RecordingClient) { defer wg.Done() defer c.Close() + + if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil { + wg.Add(1) + go func() { + defer wg.Done() + runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish) + }() + } + traffic.RunWatchLoop(ctx, RunWatchLoopParam{ Config: *profile.Watch, Client: c, @@ -375,9 +394,10 @@ type KeyValue struct { } type Watch struct { - MemberClientCount int - ClusterClientCount int - RevisionOffsetRange Range + MemberClientCount int + ClusterClientCount int + RevisionOffsetRange Range + EndpointSwitchPeriod *time.Duration } type Compaction struct { @@ -495,3 +515,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints [] } return nil } + +func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) { + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + case <-time.After(period): + shuffledEndpoints := slices.Clone(endpoints) + rand.Shuffle(len(shuffledEndpoints), func(i, j int) { + shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i] + }) + c.SetEndpoints(shuffledEndpoints...) + } + } +}