From 09923f0ee6b1c47024b1c25cde93371bd18c0d72 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 1 Mar 2026 10:58:02 +0100 Subject: [PATCH 1/3] Cleanup the request debug logs Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/v3rpc/interceptor.go | 203 +++++++++------------ 1 file changed, 88 insertions(+), 115 deletions(-) diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 697d0b075ed9..1c6e875928bf 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -77,139 +77,112 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + lg := s.Logger() + debug := lg.Core().Enabled(zap.DebugLevel) + if debug { + logUnaryRequest(ctx, lg.Debug, info, req) + } + startTime := time.Now() resp, err := handler(ctx, req) - lg := s.Logger() - if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive - defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp) + duration := time.Since(startTime) + + if duration > s.Cfg.WarningUnaryRequestDuration { + var fields []zap.Field + if !debug { + fields = requestLogFields(req) + } + logUnaryResponseStats(ctx, lg.Warn, duration, info, startTime, resp, fields...) + } else if debug { + logUnaryResponseStats(ctx, lg.Debug, duration, info, startTime, resp) } return resp, err } } -func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, warnLatency time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, req any, resp any) { - duration := time.Since(startTime) - var enabledDebugLevel, expensiveRequest bool - if lg.Core().Enabled(zap.DebugLevel) { - enabledDebugLevel = true - } - if duration > warnLatency { - expensiveRequest = true - } - if !enabledDebugLevel && !expensiveRequest { - return - } - remote := "No remote client info." +func logUnaryRequest(ctx context.Context, log func(msg string, fields ...zap.Field), info *grpc.UnaryServerInfo, req any) { + remote := "" peerInfo, ok := peer.FromContext(ctx) if ok { remote = peerInfo.Addr.String() } - responseType := info.FullMethod - var reqCount, respCount int64 - var reqSize, respSize int - var reqContent string - switch _resp := resp.(type) { - case *pb.RangeResponse: - _req, ok := req.(*pb.RangeRequest) - if ok { - reqCount = 0 - reqSize = _req.Size() - reqContent = _req.String() - } - if _resp != nil { - respCount = _resp.GetCount() - respSize = _resp.Size() - } - case *pb.PutResponse: - _req, ok := req.(*pb.PutRequest) - if ok { - reqCount = 1 - reqSize = _req.Size() - reqContent = pb.NewLoggablePutRequest(_req).String() - // redact value field from request content, see PR #9821 - } - if _resp != nil { - respCount = 0 - respSize = _resp.Size() - } - case *pb.DeleteRangeResponse: - _req, ok := req.(*pb.DeleteRangeRequest) - if ok { - reqCount = 0 - reqSize = _req.Size() - reqContent = _req.String() - } - if _resp != nil { - respCount = _resp.GetDeleted() - respSize = _resp.Size() - } - case *pb.TxnResponse: - _req, ok := req.(*pb.TxnRequest) - if ok && _resp != nil { - if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure - reqCount = int64(len(_req.GetSuccess())) - reqSize = 0 - for _, r := range _req.GetSuccess() { - reqSize += r.Size() - } - } else { - reqCount = int64(len(_req.GetFailure())) - reqSize = 0 - for _, r := range _req.GetFailure() { - reqSize += r.Size() - } - } - reqContent = pb.NewLoggableTxnRequest(_req).String() - // redact value field from request content, see PR #9821 - } - if _resp != nil { - respCount = 0 - respSize = _resp.Size() - } - default: - reqCount = -1 - reqSize = -1 - respCount = -1 - respSize = -1 + var size int + if msg, ok := req.(Sizer); ok { + size = msg.Size() } - - if enabledDebugLevel { - logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) - } else if expensiveRequest { - logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) + fields := []zap.Field{ + zap.String("method", info.FullMethod), + zap.String("remote", remote), + zap.Int("size", size), } + fields = append(fields, requestLogFields(req)...) + log("request", fields...) } -func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, - reqCount int64, reqSize int, respCount int64, respSize int, reqContent string, -) { - lg.Debug("request stats", - zap.Time("start time", startTime), - zap.Duration("time spent", duration), - zap.String("remote", remote), - zap.String("response type", responseType), - zap.Int64("request count", reqCount), - zap.Int("request size", reqSize), - zap.Int64("response count", respCount), - zap.Int("response size", respSize), - zap.String("request content", reqContent), - ) +type Sizer interface { + Size() int } -func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, - reqCount int64, reqSize int, respCount int64, respSize int, reqContent string, -) { - lg.Warn("request stats", - zap.Time("start time", startTime), - zap.Duration("time spent", duration), - zap.String("remote", remote), - zap.String("response type", responseType), - zap.Int64("request count", reqCount), - zap.Int("request size", reqSize), - zap.Int64("response count", respCount), - zap.Int("response size", respSize), - zap.String("request content", reqContent), +func requestLogFields(req any) []zap.Field { + fields := []zap.Field{} + switch _req := req.(type) { + case *pb.RangeRequest: + fields = append(fields, + zap.String("range_begin", string(_req.GetKey())), + zap.String("range_end", string(_req.GetRangeEnd())), + zap.Int64("range_revision", _req.GetRevision()), + zap.Int64("range_limit", _req.GetLimit()), + zap.Bool("range_count_only", _req.GetCountOnly()), + zap.Bool("range_keys_only", _req.GetKeysOnly()), + ) + case *pb.PutRequest: + fields = append(fields, + zap.String("put_key", string(_req.GetKey())), + ) + case *pb.DeleteRangeRequest: + fields = append(fields, + zap.String("delete_range_begin", string(_req.GetKey())), + zap.String("delete_range_end", string(_req.GetRangeEnd())), + ) + case *pb.TxnRequest: + fields = append(fields, + zap.Int("txn_compare_len", len(_req.GetCompare())), + zap.Int("txn_success_len", len(_req.GetSuccess())), + zap.Int("txn_failure_len", len(_req.GetFailure())), + ) + default: + } + return fields +} + +func logUnaryResponseStats(ctx context.Context, log func(msg string, fields ...zap.Field), duration time.Duration, info *grpc.UnaryServerInfo, startTime time.Time, resp any, fields ...zap.Field) { + var size int + if msg, ok := resp.(Sizer); ok { + size = msg.Size() + } + fields = append(fields, + zap.String("method", info.FullMethod), + zap.Duration("duration", duration), + zap.Int("size", size), ) + switch _resp := resp.(type) { + case *pb.RangeResponse: + fields = append(fields, + zap.Int("size", _resp.Size()), + zap.Int64("count", _resp.GetCount()), + ) + case *pb.PutResponse: + fields = append(fields, + zap.Int("size", _resp.Size()), + ) + case *pb.DeleteRangeResponse: + fields = append(fields, + zap.Int64("delete_range_deleted", _resp.GetDeleted()), + ) + case *pb.TxnResponse: + default: + } + log("response", fields...) } func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor { From 89ef589e49695d3170d0e2068117c83ce2b096c0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 1 Mar 2026 11:29:02 +0100 Subject: [PATCH 2/3] Add request id passed via context and surface it in debug logs Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/v3rpc/interceptor.go | 8 +++++++- server/etcdserver/requestid/requestid.go | 17 +++++++++++++++ server/etcdserver/server.go | 4 ++++ server/etcdserver/util.go | 4 +++- server/etcdserver/v3_server.go | 24 +++++++++++++++++----- 5 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 server/etcdserver/requestid/requestid.go diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 1c6e875928bf..eb003c70af7c 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api" + "go.etcd.io/etcd/server/v3/etcdserver/requestid" "go.etcd.io/raft/v3" ) @@ -77,6 +78,8 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + requestID := s.RequestID() + ctx = requestid.NewContext(ctx, requestID) lg := s.Logger() debug := lg.Core().Enabled(zap.DebugLevel) if debug { @@ -106,12 +109,14 @@ func logUnaryRequest(ctx context.Context, log func(msg string, fields ...zap.Fie if ok { remote = peerInfo.Addr.String() } + requestID := requestid.FromContext(ctx) var size int if msg, ok := req.(Sizer); ok { size = msg.Size() } fields := []zap.Field{ - zap.String("method", info.FullMethod), + zap.String("method", info.FullMethod), + zap.Uint64("request_id", requestID), zap.String("remote", remote), zap.Int("size", size), } @@ -164,6 +169,7 @@ func logUnaryResponseStats(ctx context.Context, log func(msg string, fields ...z zap.String("method", info.FullMethod), zap.Duration("duration", duration), zap.Int("size", size), + zap.Uint64("request_id", requestid.FromContext(ctx)), ) switch _resp := resp.(type) { case *pb.RangeResponse: diff --git a/server/etcdserver/requestid/requestid.go b/server/etcdserver/requestid/requestid.go new file mode 100644 index 000000000000..3404d33d50b4 --- /dev/null +++ b/server/etcdserver/requestid/requestid.go @@ -0,0 +1,17 @@ +package requestid + +import "context" + +type requestIDKey struct{} + +func NewContext(ctx context.Context, requestID uint64) context.Context { + return context.WithValue(ctx, requestIDKey{}, requestID) +} + +func FromContext(ctx context.Context) uint64 { + val := ctx.Value(requestIDKey{}) + if val == nil { + return 0 + } + return val.(uint64) +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b1f80094bbc5..2e208e303f44 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2464,3 +2464,7 @@ func addFeatureGateMetrics(fg featuregate.FeatureGate, guageVec *prometheus.Gaug guageVec.With(prometheus.Labels{"name": string(feature), "stage": string(featureSpec.PreRelease)}).Set(metricVal) } } + +func (s *EtcdServer) RequestID() uint64 { + return s.reqIDGen.Next() +} diff --git a/server/etcdserver/util.go b/server/etcdserver/util.go index b23ab682e494..5bde39ec3930 100644 --- a/server/etcdserver/util.go +++ b/server/etcdserver/util.go @@ -83,6 +83,7 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool type notifier struct { c chan struct{} err error + readIndexID uint64 } func newNotifier() *notifier { @@ -91,7 +92,8 @@ func newNotifier() *notifier { } } -func (nc *notifier) notify(err error) { +func (nc *notifier) notify(err error, readIndexID uint64) { nc.err = err + nc.readIndexID = readIndexID close(nc.c) } diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 00a3b8fbeb6b..3864c353910e 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -36,6 +36,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/errors" + "go.etcd.io/etcd/server/v3/etcdserver/requestid" "go.etcd.io/etcd/server/v3/etcdserver/txn" "go.etcd.io/etcd/server/v3/features" "go.etcd.io/etcd/server/v3/lease" @@ -830,9 +831,15 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In if ci > ai+maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests } + lg := s.Logger() + requestID := requestid.FromContext(ctx) + if requestID == 0 { + lg.Warn("request ID not found in context, log correlation might not work, generating new") + requestID = s.reqIDGen.Next() + } r.Header = &pb.RequestHeader{ - ID: s.reqIDGen.Next(), + ID: requestID, } // check authinfo if it is not InternalAuthenticateRequest @@ -877,6 +884,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In defer cancel() span := trace.SpanFromContext(ctx) + lg.Debug("Send raft proposal", zap.String("request_type", reqType), zap.Uint64("request_id", requestID)) span.AddEvent("Send raft proposal") err = s.r.Propose(cctx, data) if err != nil { @@ -890,6 +898,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In select { case x := <-ch: span.AddEvent("Receive raft result") + lg.Debug("Receive raft result", zap.String("request_type", reqType), zap.Uint64("request_id", requestID)) return x.(*apply2.Result), nil case <-cctx.Done(): proposalsFailed.Inc() @@ -997,7 +1006,7 @@ func (s *EtcdServer) linearizableReadLoop() { return } if err != nil { - nr.notify(err) + nr.notify(err, requestID) continue } @@ -1016,7 +1025,7 @@ func (s *EtcdServer) linearizableReadLoop() { } } // unblock all l-reads requested at indices before confirmedIndex - nr.notify(nil) + nr.notify(nil, requestID) trace.Step("applied index is now lower than readState.Index") trace.LogAllStepsIfLong(traceThreshold) @@ -1114,16 +1123,19 @@ func uint64ToBigEndianBytes(number uint64) []byte { } func (s *EtcdServer) sendReadIndex(requestIndex uint64) error { + lg := s.Logger() + timeout := s.Cfg.ReqTimeout() + lg.Debug("sending read index request", zap.Uint64("read-request-id", requestIndex), zap.Duration("timeout", timeout)) ctxToSend := uint64ToBigEndianBytes(requestIndex) - cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + + cctx, cancel := context.WithTimeout(context.Background(), timeout) err := s.r.ReadIndex(cctx, ctxToSend) cancel() if errorspkg.Is(err, raft.ErrStopped) { return err } if err != nil { - lg := s.Logger() lg.Warn("failed to get read index from Raft", zap.Error(err)) readIndexFailed.Inc() return err @@ -1149,6 +1161,8 @@ func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { // wait for read state notification select { case <-nc.c: + requestID := requestid.FromContext(ctx) + s.Logger().Debug("linearizable read notify", zap.Uint64("read-index-id", nc.readIndexID), zap.Uint64("request-id", requestID)) return nc.err case <-ctx.Done(): return ctx.Err() From afd8dffbf85a6db8ca001f3e4be003051f01bcd5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 2 Mar 2026 10:34:52 +0100 Subject: [PATCH 3/3] Add raft communication debug logs Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/rafthttp/http.go | 1 + server/etcdserver/api/rafthttp/pipeline.go | 1 + server/etcdserver/api/rafthttp/stream.go | 2 ++ server/etcdserver/api/rafthttp/transport.go | 39 +++++++++++++++++++++ server/etcdserver/v3_server.go | 6 ++-- 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/api/rafthttp/http.go b/server/etcdserver/api/rafthttp/http.go index 2610240e5ace..b903026f397e 100644 --- a/server/etcdserver/api/rafthttp/http.go +++ b/server/etcdserver/api/rafthttp/http.go @@ -137,6 +137,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) + logRaftCommunication(h.lg, h.localID, m, types.ID(m.From), "receive") if err := h.r.Process(context.TODO(), m); err != nil { var writerErr writerToResponse switch { diff --git a/server/etcdserver/api/rafthttp/pipeline.go b/server/etcdserver/api/rafthttp/pipeline.go index 0790b58d03be..0359f566f2a6 100644 --- a/server/etcdserver/api/rafthttp/pipeline.go +++ b/server/etcdserver/api/rafthttp/pipeline.go @@ -96,6 +96,7 @@ func (p *pipeline) handle() { for { select { case m := <-p.msgc: + logRaftCommunication(p.tr.Logger, p.tr.ID, m, p.peerID, "send") start := time.Now() err := p.post(pbutil.MustMarshal(&m)) end := time.Now() diff --git a/server/etcdserver/api/rafthttp/stream.go b/server/etcdserver/api/rafthttp/stream.go index cd4e11e1f21e..23a6da43b315 100644 --- a/server/etcdserver/api/rafthttp/stream.go +++ b/server/etcdserver/api/rafthttp/stream.go @@ -201,6 +201,7 @@ func (cw *streamWriter) run() { heartbeatc, msgc = nil, nil case m := <-msgc: + logRaftCommunication(cw.lg, cw.localID, m, cw.peerID, "send") err := enc.encode(&m) if err == nil { unflushed += m.Size() @@ -497,6 +498,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { cr.mu.Unlock() return err } + logRaftCommunication(cr.lg, cr.tr.ID, m, cr.peerID, "receive") // gofail: var raftDropHeartbeat struct{} // continue labelRaftDropHeartbeat diff --git a/server/etcdserver/api/rafthttp/transport.go b/server/etcdserver/api/rafthttp/transport.go index b376d578b6c4..bf59df4d529d 100644 --- a/server/etcdserver/api/rafthttp/transport.go +++ b/server/etcdserver/api/rafthttp/transport.go @@ -16,6 +16,7 @@ package rafthttp import ( "context" + "encoding/binary" "net/http" "sync" "time" @@ -24,6 +25,7 @@ import ( "go.uber.org/zap" "golang.org/x/time/rate" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" @@ -451,3 +453,40 @@ func (t *Transport) ActivePeers() (cnt int) { } return cnt } + +func logRaftCommunication(lg *zap.Logger, localID types.ID, m raftpb.Message, remote types.ID, direction string) { + if !lg.Core().Enabled(zap.DebugLevel) { + return + } + var requestID uint64 + switch m.Type { + case raftpb.MsgBeat, raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + if len(m.Context) != 8 { + return + } + requestID = binary.BigEndian.Uint64(m.Context) + case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: + if len(m.Entries) > 0 && len(m.Entries[0].Data) == 8 { + requestID = binary.BigEndian.Uint64(m.Entries[0].Data) + } + case raftpb.MsgProp: + if len(m.Entries) > 0 { + r := pb.InternalRaftRequest{} + if err := r.Unmarshal(m.Entries[0].Data); err == nil { + requestID = r.Header.ID + } + } + default: + } + lg.Debug( + "Raft communication", + zap.String("direction", direction), + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", localID.String()), + zap.String("remote-peer-id", remote.String()), + zap.Uint64("term", m.Term), + zap.Uint64("index", m.Index), + zap.Uint64("commit", m.Commit), + zap.Uint64("request-id", requestID), + ) +} \ No newline at end of file diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 3864c353910e..8e2e217e3482 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -1122,11 +1122,11 @@ func uint64ToBigEndianBytes(number uint64) []byte { return byteResult } -func (s *EtcdServer) sendReadIndex(requestIndex uint64) error { +func (s *EtcdServer) sendReadIndex(requestID uint64) error { lg := s.Logger() timeout := s.Cfg.ReqTimeout() - lg.Debug("sending read index request", zap.Uint64("read-request-id", requestIndex), zap.Duration("timeout", timeout)) - ctxToSend := uint64ToBigEndianBytes(requestIndex) + lg.Debug("sending read index request", zap.Uint64("read-request-id", requestID), zap.Duration("timeout", timeout)) + ctxToSend := uint64ToBigEndianBytes(requestID) cctx, cancel := context.WithTimeout(context.Background(), timeout)