Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tests/robustness/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ test-robustness-issue20221: /tmp/etcd-bc47e771-failpoints/bin
GO_TEST_FLAGS='-v -run=TestRobustnessRegression/Issue20221 -count 100 -failfast --bin-dir=/tmp/etcd-bc47e771-failpoints/bin' $(TOPLEVEL_MAKE) test-robustness && \
echo "Failed to reproduce" || echo "Successful reproduction"

.PHONY: test-robustness-issue20221-event
test-robustness-issue20221-event: /tmp/etcd-44a094a2-failpoints/bin
GO_TEST_FLAGS='-v -run=TestRobustnessRegression/Issue20221-event -count 100 -failfast --bin-dir=/tmp/etcd-44a094a2-failpoints/bin' $(TOPLEVEL_MAKE) test-robustness && \
echo "Failed to reproduce" || echo "Successful reproduction"

# Etcd API usage by Kubernetes

.PHONY: k8s-coverage
Expand Down Expand Up @@ -144,6 +149,19 @@ $(GOPATH)/bin/gofail: $(REPOSITORY_ROOT)/tools/mod/go.mod $(REPOSITORY_ROOT)/too
$(MAKE) gofail-enable; \
$(MAKE) build;

/tmp/etcd-44a094a2-failpoints/bin: $(GOPATH)/bin/gofail
rm -rf /tmp/etcd-44a094a2-failpoints/
mkdir -p /tmp/etcd-44a094a2-failpoints/
cd /tmp/etcd-44a094a2-failpoints/; \
git init; \
git remote add origin https://github.com/etcd-io/etcd.git; \
git fetch --depth 1 origin 44a094a20a7a0ac16cf88c9f21b6ca90e9f85cf5; \
git checkout FETCH_HEAD; \
cp -r $(REPOSITORY_ROOT)/tests/robustness/patches/removeVerifyModRevision . && \
patch --fuzz=0 server/storage/mvcc/watchable_store.go ./removeVerifyModRevision/watchable_store.patch && \
$(MAKE) gofail-enable; \
$(MAKE) build;

/tmp/etcd-release-3.6-failpoints/bin: $(GOPATH)/bin/gofail
rm -rf /tmp/etcd-release-3.6-failpoints/
mkdir -p /tmp/etcd-release-3.6-failpoints/
Expand Down
4 changes: 4 additions & 0 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string {
return c.client.Endpoints()
}

func (c *RecordingClient) SetEndpoints(endpoints ...string) {
c.client.SetEndpoints(endpoints...)
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
request := model.WatchRequest{
Key: key,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go
index 1996ee6..ed9fe1c 100644
--- a/server/storage/mvcc/watchable_store.go
+++ b/server/storage/mvcc/watchable_store.go
@@ -15,14 +15,12 @@
package mvcc

import (
- "fmt"
"sync"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/mvccpb"
- "go.etcd.io/etcd/client/pkg/v3/verify"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
@@ -610,15 +608,6 @@ func (w *watcher) send(wr WatchResponse) bool {
wr.Events = ne
}

- verify.Verify(func() {
- if w.startRev > 0 {
- for _, ev := range wr.Events {
- if ev.Kv.ModRevision < w.startRev {
- panic(fmt.Sprintf("Event.ModRevision(%d) is less than the w.startRev(%d) for watchID: %d", ev.Kv.ModRevision, w.startRev, w.id))
- }
- }
- }
- })

// if all events are filtered out, we should send nothing.
if !progressEvent && len(wr.Events) == 0 {
25 changes: 25 additions & 0 deletions tests/robustness/scenarios/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,31 @@ func Regression(t *testing.T) []TestScenario {
e2e.WithSnapshotCatchUpEntries(10),
),
})
clusterClientSwitchingInterval := 500 * time.Millisecond
scenarios = append(scenarios, TestScenario{
Name: "Issue20221-event",
Failpoint: failpoint.BlackholeUntilSnapshot,
Watch: client.WatchConfig{
RequestProgress: true,
},
Profile: traffic.Profile{
KeyValue: &traffic.KeyValueHigh,
Watch: &traffic.Watch{
MemberClientCount: 6,
ClusterClientCount: 2,
RevisionOffsetRange: traffic.Range{Min: 10, Max: 100},
EndpointSwitchPeriod: &clusterClientSwitchingInterval,
},
},
Traffic: traffic.EtcdPut,
Cluster: *e2e.NewConfig(
e2e.WithSnapshotCount(10),
e2e.WithPeerProxy(true),
e2e.WithIsPeerTLS(true),
e2e.WithWatchProcessNotifyInterval(10*time.Millisecond),
e2e.WithSnapshotCatchUpEntries(10),
),
})
if v.Compare(version.V3_5) >= 0 {
opts := []e2e.EPClusterOption{
e2e.WithSnapshotCount(100),
Expand Down
43 changes: 40 additions & 3 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package traffic

import (
"context"
"math/rand"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -139,6 +141,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()

if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
wg.Add(1)
go func() {
defer wg.Done()
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
}()
}

traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{
Client: c,
QPSLimiter: limiter,
Expand Down Expand Up @@ -177,6 +187,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
go func(c *client.RecordingClient) {
defer wg.Done()
defer c.Close()

if profile.Watch != nil && profile.Watch.EndpointSwitchPeriod != nil {
wg.Add(1)
go func() {
defer wg.Done()
runEndpointSwitchLoop(ctx, c, endpoints, *profile.Watch.EndpointSwitchPeriod, finish)
}()
}

traffic.RunWatchLoop(ctx, RunWatchLoopParam{
Config: *profile.Watch,
Client: c,
Expand Down Expand Up @@ -375,9 +394,10 @@ type KeyValue struct {
}

type Watch struct {
MemberClientCount int
ClusterClientCount int
RevisionOffsetRange Range
MemberClientCount int
ClusterClientCount int
RevisionOffsetRange Range
EndpointSwitchPeriod *time.Duration
}

type Compaction struct {
Expand Down Expand Up @@ -495,3 +515,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints []
}
return nil
}

func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) {
for {
select {
case <-ctx.Done():
return
case <-finish:
return
case <-time.After(period):
shuffledEndpoints := slices.Clone(endpoints)
rand.Shuffle(len(shuffledEndpoints), func(i, j int) {
shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i]
})
c.SetEndpoints(shuffledEndpoints...)
}
}
}