Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 106 additions & 69 deletions api/etcdserverpb/raft_internal.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/etcdserverpb/raft_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ message RequestHeader {
string username = 2;
// auth_revision is a revision number of auth.authStore. It is not related to mvcc
uint64 auth_revision = 3 [(versionpb.etcd_version_field) = "3.1"];
uint64 requester_member_id = 4 [(versionpb.etcd_version_field) = "3.7"];
}

// An InternalRaftRequest is the union of all requests which can be
Expand Down
21 changes: 0 additions & 21 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ func Apply(lg *zap.Logger, e *raftpb.Entry, uberApply UberApplier, w wait.Wait,

needResult := w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
return uberApply.Apply(&raftReq, shouldApplyV3), id
}
return nil, id
Expand All @@ -50,21 +47,3 @@ func Apply(lg *zap.Logger, e *raftpb.Entry, uberApply UberApplier, w wait.Wait,
func noSideEffect(r *pb.InternalRaftRequest) bool {
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
}

func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
f := func(ops []*pb.RequestOp) []*pb.RequestOp {
j := 0
for i := 0; i < len(ops); i++ {
if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
continue
}
ops[j] = ops[i]
j++
}

return ops[:j]
}

txn.Success = f(txn.Success)
txn.Failure = f(txn.Failure)
}
4 changes: 2 additions & 2 deletions server/etcdserver/apply/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeR
return aa.applierV3.DeleteRange(r)
}

func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Txn(rt *pb.TxnRequest, header *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error) {
if err := txn.CheckTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
return nil, nil, err
}
return aa.applierV3.Txn(rt)
return aa.applierV3.Txn(rt, header)
}

func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestAuthApplierV3_Txn(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Txn(tc.request)
_, _, err := authApplier.Txn(tc.request, nil)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand Down
14 changes: 10 additions & 4 deletions server/etcdserver/apply/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ import (
)

type applierV3backend struct {
options ApplierOptions
options ApplierOptions
memberID uint64
}

func newApplierV3Backend(opts ApplierOptions) applierV3 {
memberID := uint64(0)
if opts.RaftStatus != nil {
memberID = uint64(opts.RaftStatus.MemberID())
}
return &applierV3backend{
options: opts,
options: opts,
memberID: memberID,
}
}

Expand All @@ -59,8 +65,8 @@ func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceu
return mvcctxn.Range(context.TODO(), a.options.Logger, a.options.KV, r)
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.options.Logger, rt, a.options.TxnModeWriteWithSharedBuffer, a.options.KV, a.options.Lessor)
func (a *applierV3backend) Txn(rt *pb.TxnRequest, header *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.options.Logger, rt, a.options.TxnModeWriteWithSharedBuffer, a.options.KV, a.options.Lessor, header, a.memberID)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/apply/capped.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func (a *applierV3Capped) Put(_ *pb.PutRequest) (*pb.PutResponse, *traceutil.Tra
return nil, nil, errors.ErrNoSpace
}

func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Txn(r *pb.TxnRequest, header *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error) {
if a.q.Cost(r) > 0 {
return nil, nil, errors.ErrNoSpace
}
return a.applierV3.Txn(r)
return a.applierV3.Txn(r, header)
}

func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRang
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) Txn(_ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Txn(_ *pb.TxnRequest, _ *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type applierV3 interface {
Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Txn(rt *pb.TxnRequest, header *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/apply/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func (a *quotaApplierV3) Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trac
return resp, trace, err
}

func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest, header *pb.RequestHeader) (*pb.TxnResponse, *traceutil.Trace, error) {
ok := a.q.Available(rt)
resp, trace, err := a.applierV3.Txn(rt)
resp, trace, err := a.applierV3.Txn(rt, header)
if err == nil && !ok {
err = errors.ErrNoSpace
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (a *uberApplier) dispatch(r *pb.InternalRaftRequest, shouldApplyV3 membersh
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(r.Txn)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(r.Txn, r.Header)
case r.Compaction != nil:
op = "Compaction"
ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)
Expand Down
23 changes: 21 additions & 2 deletions server/etcdserver/txn/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,32 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
}(time.Now())
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txnRead.End()
resp, err = executeRange(ctx, lg, txnRead, r)
resp, err = executeRange(ctx, lg, txnRead, r, nil, 0)
return resp, trace, err
}

func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest, header *pb.RequestHeader, thisMemberId uint64) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

// Skip range execution on non-requester members only if:
// 1. Request comes from raft (header != nil)
// 2. Requester member is known (RequesterMemberId != 0)
// 3. This member has a valid ID (thisMemberId != 0)
// 4. Range specifies a revision (Revision != 0)
//
// This optimization avoids executing expensive range queries on
// members that didn't receive the request, while ensuring all members
// validate via checkRange() for consistency.
if header != nil && header.RequesterMemberId != 0 && thisMemberId != 0 &&
r.Revision != 0 && thisMemberId != header.RequesterMemberId {
// Non-requester member, skip range execution
return &pb.RangeResponse{
Header: &pb.ResponseHeader{Revision: txnRead.Rev()},
Kvs: nil,
Count: 0,
}, nil
}

limit := rangeLimit(r)
ro := mvcc.RangeOptions{
Limit: limit,
Expand Down
14 changes: 7 additions & 7 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor, header *pb.RequestHeader, thisMemberId uint64) (txnResp *pb.TxnResponse, trace *traceutil.Trace, err error) {
ctx, trace = traceutil.EnsureTrace(ctx, lg, "transaction")
isWrite := !IsTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
Expand Down Expand Up @@ -68,7 +68,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
}
txnResp, err = txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
txnResp, err = txn(ctx, lg, txnWrite, rt, isWrite, txnPath, header, thisMemberId)
txnWrite.End()

trace.AddField(
Expand All @@ -78,9 +78,9 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
return txnResp, trace, err
}

func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool, header *pb.RequestHeader, thisMemberId uint64) (*pb.TxnResponse, error) {
txnResp, _ := newTxnResp(rt, txnPath)
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp, header, thisMemberId)
if err != nil {
if isWrite {
// CAUTION: When a txn performing write operations starts, we always expect it to be successful.
Expand Down Expand Up @@ -132,7 +132,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
return txnResp, txnCount
}

func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse, header *pb.RequestHeader, thisMemberId uint64) (txns int, err error) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
Expand All @@ -147,7 +147,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
traceutil.Field{Key: "req_type", Value: "range"},
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := executeRange(ctx, lg, txnWrite, tv.RequestRange)
resp, err := executeRange(ctx, lg, txnWrite, tv.RequestRange, header, thisMemberId)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
}
Expand All @@ -173,7 +173,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns, err := executeTxn(ctx, lg, txnWrite, tv.RequestTxn, txnPath[1:], resp)
applyTxns, err := executeTxn(ctx, lg, txnWrite, tv.RequestTxn, txnPath[1:], resp, header, thisMemberId)
if err != nil {
// don't wrap the error. It's a recursive call and err should be already wrapped
return 0, err
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestCheckTxn(t *testing.T) {

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor, nil, 0)

gotErr := ""
if err != nil {
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestReadonlyTxnError(t *testing.T) {
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}, nil, 0)
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestWriteTxnPanicWithoutApply(t *testing.T) {
// we verify the following properties below:
// 1. server panics after a write txn aply fails (invariant: server should never try to move on from a failed write)
// 2. no writes from the txn are applied to the backend (invariant: failed write should have no side-effect on DB state besides panic)
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
assert.Panicsf(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}, nil, 0) }, "Expected panic in Txn with writes")
dbHashAfter, err := computeFileHash(bePath)
require.NoErrorf(t, err, "failed to compute DB file hash after txn")
require.Equalf(t, dbHashBefore, dbHashAfter, "mismatch in DB hash before and after failed write txn")
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor)
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ServerFeatureGate.Enabled(features.TxnModeWriteWithSharedBuffer), s.KV(), s.lessor, nil, 0)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down Expand Up @@ -832,7 +832,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
}

r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
ID: s.reqIDGen.Next(),
RequesterMemberId: uint64(s.memberID),
}

// check authinfo if it is not InternalAuthenticateRequest
Expand Down