diff --git a/tests/robustness/Makefile b/tests/robustness/Makefile index 71e904246fa0..1c1b9cb5ae01 100644 --- a/tests/robustness/Makefile +++ b/tests/robustness/Makefile @@ -80,6 +80,11 @@ test-robustness-issue20221: /tmp/etcd-bc47e771-failpoints/bin GO_TEST_FLAGS='-v -run=TestRobustnessRegression/Issue20221 -count 100 -failfast --bin-dir=/tmp/etcd-bc47e771-failpoints/bin' $(TOPLEVEL_MAKE) test-robustness && \ echo "Failed to reproduce" || echo "Successful reproduction" +.PHONY: test-robustness-issue20221-event +test-robustness-issue20221-event: /tmp/etcd-44a094a2-failpoints/bin + GO_TEST_FLAGS='-v -run=TestRobustnessRegression/Issue20221-event -count 100 -failfast --bin-dir=/tmp/etcd-44a094a2-failpoints/bin' $(TOPLEVEL_MAKE) test-robustness && \ + echo "Failed to reproduce" || echo "Successful reproduction" + # Etcd API usage by Kubernetes .PHONY: k8s-coverage @@ -144,6 +149,19 @@ $(GOPATH)/bin/gofail: $(REPOSITORY_ROOT)/tools/mod/go.mod $(REPOSITORY_ROOT)/too $(MAKE) gofail-enable; \ $(MAKE) build; +/tmp/etcd-44a094a2-failpoints/bin: $(GOPATH)/bin/gofail + rm -rf /tmp/etcd-44a094a2-failpoints/ + mkdir -p /tmp/etcd-44a094a2-failpoints/ + cd /tmp/etcd-44a094a2-failpoints/; \ + git init; \ + git remote add origin https://github.com/etcd-io/etcd.git; \ + git fetch --depth 1 origin 44a094a20a7a0ac16cf88c9f21b6ca90e9f85cf5; \ + git checkout FETCH_HEAD; \ + cp -r $(REPOSITORY_ROOT)/tests/robustness/patches/removeVerifyModRevision . && \ + patch --fuzz=0 server/storage/mvcc/watchable_store.go ./removeVerifyModRevision/watchable_store.patch && \ + $(MAKE) gofail-enable; \ + $(MAKE) build; + /tmp/etcd-release-3.6-failpoints/bin: $(GOPATH)/bin/gofail rm -rf /tmp/etcd-release-3.6-failpoints/ mkdir -p /tmp/etcd-release-3.6-failpoints/ diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 19d75c2ec562..3d5befd97444 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string { return c.client.Endpoints() } +func (c *RecordingClient) SetEndpoints(endpoints ...string) { + c.client.SetEndpoints(endpoints...) +} + func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan { request := model.WatchRequest{ Key: key, diff --git a/tests/robustness/patches/removeVerifyModRevision/watchable_store.patch b/tests/robustness/patches/removeVerifyModRevision/watchable_store.patch new file mode 100644 index 000000000000..7c986299c79e --- /dev/null +++ b/tests/robustness/patches/removeVerifyModRevision/watchable_store.patch @@ -0,0 +1,35 @@ +diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go +index 1996ee6..ed9fe1c 100644 +--- a/server/storage/mvcc/watchable_store.go ++++ b/server/storage/mvcc/watchable_store.go +@@ -15,14 +15,12 @@ + package mvcc + + import ( +- "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "go.etcd.io/etcd/api/v3/mvccpb" +- "go.etcd.io/etcd/client/pkg/v3/verify" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/lease" +@@ -610,15 +608,6 @@ func (w *watcher) send(wr WatchResponse) bool { + wr.Events = ne + } + +- verify.Verify(func() { +- if w.startRev > 0 { +- for _, ev := range wr.Events { +- if ev.Kv.ModRevision < w.startRev { +- panic(fmt.Sprintf("Event.ModRevision(%d) is less than the w.startRev(%d) for watchID: %d", ev.Kv.ModRevision, w.startRev, w.id)) +- } +- } +- } +- }) + + // if all events are filtered out, we should send nothing. + if !progressEvent && len(wr.Events) == 0 { diff --git a/tests/robustness/scenarios/scenarios.go b/tests/robustness/scenarios/scenarios.go index c6fe324951a1..8887a7ecf991 100644 --- a/tests/robustness/scenarios/scenarios.go +++ b/tests/robustness/scenarios/scenarios.go @@ -331,6 +331,31 @@ func Regression(t *testing.T) []TestScenario { e2e.WithSnapshotCatchUpEntries(10), ), }) + clusterClientSwitchingInterval := 500 * time.Millisecond + scenarios = append(scenarios, TestScenario{ + Name: "Issue20221-event", + Failpoint: failpoint.BlackholeUntilSnapshot, + Watch: client.WatchConfig{ + RequestProgress: true, + }, + Profile: traffic.Profile{ + KeyValue: &traffic.KeyValueHigh, + Watch: &traffic.Watch{ + MemberClientCount: 6, + ClusterClientCount: 2, + RevisionOffsetRange: traffic.Range{Min: 10, Max: 100}, + EndpointSwitchPeriod: &clusterClientSwitchingInterval, + }, + }, + Traffic: traffic.EtcdPut, + Cluster: *e2e.NewConfig( + e2e.WithSnapshotCount(10), + e2e.WithPeerProxy(true), + e2e.WithIsPeerTLS(true), + e2e.WithWatchProcessNotifyInterval(10*time.Millisecond), + e2e.WithSnapshotCatchUpEntries(10), + ), + }) if v.Compare(version.V3_5) >= 0 { opts := []e2e.EPClusterOption{ e2e.WithSnapshotCount(100), diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 4d22f376a82d..d2a40dceb336 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -16,6 +16,8 @@ package traffic import ( "context" + "math/rand" + "slices" "sync" "testing" "time" @@ -139,6 +141,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 defer wg.Done() defer c.Close() + if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil { + wg.Add(1) + go func() { + defer wg.Done() + runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish) + }() + } + traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{ Client: c, QPSLimiter: limiter, @@ -177,6 +187,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 go func(c *client.RecordingClient) { defer wg.Done() defer c.Close() + + if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil { + wg.Add(1) + go func() { + defer wg.Done() + runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish) + }() + } + traffic.RunWatchLoop(ctx, RunWatchLoopParam{ Config: *profile.Watch, Client: c, @@ -375,9 +394,10 @@ type KeyValue struct { } type Watch struct { - MemberClientCount int - ClusterClientCount int - RevisionOffsetRange Range + MemberClientCount int + ClusterClientCount int + RevisionOffsetRange Range + EndpointSwitchPeriod *time.Duration } type Compaction struct { @@ -495,3 +515,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints [] } return nil } + +func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) { + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + case <-time.After(period): + shuffledEndpoints := slices.Clone(endpoints) + rand.Shuffle(len(shuffledEndpoints), func(i, j int) { + shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i] + }) + c.SetEndpoints(shuffledEndpoints...) + } + } +}