Skip to content

Commit 62787b8

Browse files
committed
robustness: implement cluster client endpoint switching
Signed-off-by: Chun-Hung Tseng <henrytseng@google.com>
1 parent 50f4ea2 commit 62787b8

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

tests/robustness/client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string {
279279
return c.client.Endpoints()
280280
}
281281

282+
func (c *RecordingClient) SetEndpoints(endpoints ...string) {
283+
c.client.SetEndpoints(endpoints...)
284+
}
285+
282286
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
283287
request := model.WatchRequest{
284288
Key: key,

tests/robustness/traffic/traffic.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package traffic
1616

1717
import (
1818
"context"
19+
"math/rand"
20+
"slices"
1921
"sync"
2022
"testing"
2123
"time"
@@ -139,6 +141,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
139141
defer wg.Done()
140142
defer c.Close()
141143

144+
if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
145+
wg.Add(1)
146+
go func() {
147+
defer wg.Done()
148+
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
149+
}()
150+
}
151+
142152
traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{
143153
Client: c,
144154
QPSLimiter: limiter,
@@ -177,6 +187,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
177187
go func(c *client.RecordingClient) {
178188
defer wg.Done()
179189
defer c.Close()
190+
191+
if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
192+
wg.Add(1)
193+
go func() {
194+
defer wg.Done()
195+
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
196+
}()
197+
}
198+
180199
traffic.RunWatchLoop(ctx, RunWatchLoopParam{
181200
Config: *profile.Watch,
182201
Client: c,
@@ -375,9 +394,10 @@ type KeyValue struct {
375394
}
376395

377396
type Watch struct {
378-
MemberClientCount int
379-
ClusterClientCount int
380-
RevisionOffsetRange Range
397+
MemberClientCount int
398+
ClusterClientCount int
399+
RevisionOffsetRange Range
400+
EndpointSwitchPeriod *time.Duration
381401
}
382402

383403
type Compaction struct {
@@ -495,3 +515,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints []
495515
}
496516
return nil
497517
}
518+
519+
func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) {
520+
for {
521+
select {
522+
case <-ctx.Done():
523+
return
524+
case <-finish:
525+
return
526+
case <-time.After(period):
527+
shuffledEndpoints := slices.Clone(endpoints)
528+
rand.Shuffle(len(shuffledEndpoints), func(i, j int) {
529+
shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i]
530+
})
531+
c.SetEndpoints(shuffledEndpoints...)
532+
}
533+
}
534+
}

0 commit comments

Comments
 (0)