Skip to content

Commit 4f6863f

Browse files
committed
robustness: implement cluster client endpoint switching
Signed-off-by: Chun-Hung Tseng <henrytseng@google.com>
1 parent d949439 commit 4f6863f

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

tests/robustness/client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string {
279279
return c.client.Endpoints()
280280
}
281281

282+
func (c *RecordingClient) SetEndpoints(endpoints ...string) {
283+
c.client.SetEndpoints(endpoints...)
284+
}
285+
282286
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
283287
request := model.WatchRequest{
284288
Key: key,

tests/robustness/traffic/traffic.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package traffic
1616

1717
import (
1818
"context"
19+
"math/rand"
20+
"slices"
1921
"sync"
2022
"testing"
2123
"time"
@@ -148,6 +150,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
148150
defer wg.Done()
149151
defer c.Close()
150152

153+
if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
154+
wg.Add(1)
155+
go func() {
156+
defer wg.Done()
157+
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
158+
}()
159+
}
160+
151161
traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{
152162
Client: c,
153163
QPSLimiter: limiter,
@@ -186,6 +196,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
186196
go func(c *client.RecordingClient) {
187197
defer wg.Done()
188198
defer c.Close()
199+
200+
if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
201+
wg.Add(1)
202+
go func() {
203+
defer wg.Done()
204+
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
205+
}()
206+
}
207+
189208
traffic.RunWatchLoop(ctx, RunWatchLoopParam{
190209
Config: profile.Watch,
191210
Client: c,
@@ -384,9 +403,10 @@ type KeyValue struct {
384403
}
385404

386405
type Watch struct {
387-
MemberClientCount int
388-
ClusterClientCount int
389-
RivisionOffsetRange *Range
406+
MemberClientCount int
407+
ClusterClientCount int
408+
RivisionOffsetRange *Range
409+
EndpointSwitchPeriod *time.Duration
390410
}
391411

392412
type Compaction struct {
@@ -506,3 +526,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints []
506526
}
507527
return nil
508528
}
529+
530+
func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) {
531+
for {
532+
select {
533+
case <-ctx.Done():
534+
return
535+
case <-finish:
536+
return
537+
case <-time.After(period + time.Duration(rand.Int63n(int64(period)))):
538+
shuffledEndpoints := slices.Clone(endpoints)
539+
rand.Shuffle(len(shuffledEndpoints), func(i, j int) {
540+
shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i]
541+
})
542+
c.SetEndpoints(shuffledEndpoints...)
543+
}
544+
}
545+
}

0 commit comments

Comments
 (0)