diff --git a/client/resource_group/controller/group_controller.go b/client/resource_group/controller/group_controller.go index 4fca2091c00..8394533b227 100644 --- a/client/resource_group/controller/group_controller.go +++ b/client/resource_group/controller/group_controller.go @@ -106,6 +106,18 @@ type groupMetricsCollection struct { tokenRequestCounter prometheus.Counter runningKVRequestCounter prometheus.Gauge consumeTokenHistogram prometheus.Observer + + // Paging pre-charge observers, cached per-RG to avoid WithLabelValues + // on the hot path. + prechargeCounter prometheus.Counter + prechargeBytesCounter prometheus.Counter + actualBytesCounter prometheus.Counter + predictionResidualBytes prometheus.Observer + nonprechargeCounter prometheus.Counter + nonprechargeActualBytes prometheus.Counter + prechargeRU prometheus.Counter + settlementRU prometheus.Counter + settlementRUDelta prometheus.Observer } func initMetrics(oldName, name string) *groupMetricsCollection { @@ -122,9 +134,44 @@ func initMetrics(oldName, name string) *groupMetricsCollection { tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name), runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name), consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name), + + prechargeCounter: metrics.PagingPrechargeCounter.WithLabelValues(name), + prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name), + actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name), + predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name), + + nonprechargeCounter: metrics.PagingNonprechargeCounter.WithLabelValues(name), + nonprechargeActualBytes: metrics.PagingNonprechargeActualBytes.WithLabelValues(name), + prechargeRU: metrics.PagingPrechargeRU.WithLabelValues(name), + settlementRU: metrics.PagingSettlementRU.WithLabelValues(name), + settlementRUDelta: metrics.PagingSettlementRUDelta.WithLabelValues(name), } } +// observePagingPrecharge requires bytesForEst > 0. +// prechargeRU is the total RU pre-acquired at BeforeKVRequest +// (base + ReadBytesCost * bytesForEst). +func (gmc *groupMetricsCollection) observePagingPrecharge(bytesForEst uint64, prechargeRU float64) { + gmc.prechargeCounter.Inc() + gmc.prechargeBytesCounter.Add(float64(bytesForEst)) + gmc.prechargeRU.Add(prechargeRU) +} + +// observePagingActual requires predicted > 0. +// settlementRU is the total RU finally consumed (base + CPU + ReadBytesCost * actual); +// vDelta is (settlement_ru - precharge_ru), the signed per-RPC RU adjustment. +func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64, settlementRU, vDelta float64) { + gmc.actualBytesCounter.Add(float64(actual)) + gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted)) + gmc.settlementRU.Add(settlementRU) + gmc.settlementRUDelta.Observe(vDelta) +} + +func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) { + gmc.nonprechargeCounter.Inc() + gmc.nonprechargeActualBytes.Add(float64(actual)) +} + type tokenCounter struct { fillRate uint64 @@ -576,6 +623,9 @@ func (gc *groupCostController) onRequestWaitImpl( gc.metrics.successfulRequestDuration.Observe(d.Seconds()) waitDuration += d } + if bytesForEst := estimatedReadBytes(info); bytesForEst > 0 { + gc.metrics.observePagingPrecharge(bytesForEst, getRUValueFromConsumption(delta)) + } gc.mu.Lock() // Calculate the penalty of the store @@ -601,23 +651,30 @@ func (gc *groupCostController) onResponseImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + // `count` is the full per-request consumption (BeforeKVRequest + AfterKVRequest). + count := &rmpb.Consumption{} + *count = *delta + for _, calc := range gc.calculators { + calc.BeforeKVRequest(count, req) + } + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes(), + getRUValueFromConsumption(count), getRUValueFromConsumption(delta)) + } else if !req.IsWrite() { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } if !gc.burstable.Load() { counter := gc.run.requestUnitTokens if v := getRUValueFromConsumption(delta); v > 0 { counter.limiter.RemoveTokens(time.Now(), v) + } else if v < 0 { + // Paging over-estimate: refund the excess pre-charge. + counter.limiter.RefundTokens(time.Now(), -v) } } gc.mu.Lock() - // Record the consumption of the request add(gc.mu.consumption, delta) - // Record the consumption of the request by store - count := &rmpb.Consumption{} - *count = *delta - // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` - for _, calc := range gc.calculators { - calc.BeforeKVRequest(count, req) - } add(gc.mu.storeCounter[req.StoreID()], count) add(gc.mu.globalCounter, count) gc.mu.Unlock() @@ -632,33 +689,43 @@ func (gc *groupCostController) onResponseWaitImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } + // `count` is the full per-request consumption (BeforeKVRequest + AfterKVRequest). + count := &rmpb.Consumption{} + *count = *delta + for _, calc := range gc.calculators { + calc.BeforeKVRequest(count, req) + } + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes(), + getRUValueFromConsumption(count), getRUValueFromConsumption(delta)) + } else if !req.IsWrite() { + gc.metrics.observePagingNonprecharge(resp.ReadBytes()) + } var waitDuration time.Duration if !gc.burstable.Load() { - allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() - d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) - if err != nil { - if errs.ErrClientResourceGroupThrottled.Equal(err) { - gc.metrics.failedRequestCounterWithThrottled.Inc() - gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) - } else { - gc.metrics.failedRequestCounterWithOthers.Inc() + v := getRUValueFromConsumption(delta) + if v > 0 { + allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() + d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) + if err != nil { + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() + } + return nil, waitDuration, err } - return nil, waitDuration, err + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d + } else if v < 0 { + // Paging over-estimate: refund the excess pre-charge. + gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v) } - gc.metrics.successfulRequestDuration.Observe(d.Seconds()) - waitDuration += d } gc.mu.Lock() - // Record the consumption of the request add(gc.mu.consumption, delta) - // Record the consumption of the request by store - count := &rmpb.Consumption{} - *count = *delta - // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` - for _, calc := range gc.calculators { - calc.BeforeKVRequest(count, req) - } add(gc.mu.storeCounter[req.StoreID()], count) add(gc.mu.globalCounter, count) gc.mu.Unlock() diff --git a/client/resource_group/controller/group_controller_test.go b/client/resource_group/controller/group_controller_test.go index f916f5e56c4..82301c16e36 100644 --- a/client/resource_group/controller/group_controller_test.go +++ b/client/resource_group/controller/group_controller_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" @@ -27,6 +28,12 @@ import ( "github.com/tikv/pd/client/errs" ) +func counterValue(re *require.Assertions, c interface{ Write(*dto.Metric) error }) float64 { + var m dto.Metric + re.NoError(c.Write(&m)) + return m.GetCounter().GetValue() +} + func createTestGroupCostController(re *require.Assertions) *groupCostController { group := &rmpb.ResourceGroup{ Name: "test", @@ -208,6 +215,237 @@ func TestOnResponseWaitConsumption(t *testing.T) { verify() } +func TestPredictedReadBytesPreCharge(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + + // BeforeKVRequest with a PredictedReadBytes hint should pre-charge + // baseCost + predictedReadBytes * ReadBytesCost. + predictedReadBytes := uint64(256 * 1024) // learned EMA estimate + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, req) + + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + hintCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + re.InDelta(baseCost+hintCost, precharge.RRU, 1e-6, + "BeforeKVRequest should pre-charge based on PredictedReadBytes") + + // AfterKVRequest should subtract the same hint basis, preserving + // precharge + settle == baseCost + actualCost. + actualReadBytes := uint64(300 * 1024) // close to the prediction + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + kvCPU: 10 * time.Millisecond, + succeed: true, + } + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, req, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + cpuCost := float64(cfg.CPUMsCost) * 10.0 + re.InDelta(actualReadCost+cpuCost-hintCost, settle.RRU, 1e-6, + "AfterKVRequest should settle using the same hint basis as BeforeKVRequest") + + totalRRU := precharge.RRU + settle.RRU + re.InDelta(baseCost+actualReadCost+cpuCost, totalRRU, 1e-6, + "Total RRU across precharge+settle should equal baseCost + actualCost") +} + +func TestPagingPreChargeTokenRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(1 * 1024 * 1024) // 1 MB actual + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // Refund (pre-charge - actual) should exceed the actual read cost, + // so the limiter ends settlement with more tokens than after pre-charge. + cfg := DefaultRUConfig() + preChargeCost := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + actualCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + expectedRefund := preChargeCost - actualCost + re.Positive(expectedRefund, "sanity: pre-charge should exceed actual cost") + re.InDelta(tokensAfterPreCharge+expectedRefund, tokensAfterSettlement, 1.0, + "limiter should be refunded the excess pre-charged tokens") + + gc.mu.Lock() + netRRU := gc.mu.consumption.RRU + gc.mu.Unlock() + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + re.InDelta(baseCost+actualCost, netRRU, 1e-6, + "net consumption should equal baseCost + actualReadCost") +} + +func TestPagingPreChargeNoRefundWhenActualExceedsEstimate(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(1 * 1024 * 1024) // 1 MB pre-charge + actualReadBytes := uint64(4 * 1024 * 1024) // 4 MB actual (exceeds estimate) + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + re.Less(tokensAfterSettlement, tokensAfterPreCharge, + "when actual exceeds pre-charge, settlement should consume tokens") +} + +func TestOnResponseImplPagingRefund(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + actualReadBytes := uint64(512 * 1024) // 512 KB actual + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + succeed: true, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, err = gc.onResponseImpl(req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + re.Greater(tokensAfterSettlement, tokensAfterPreCharge, + "onResponseImpl should refund excess pre-charged tokens") +} + +func TestPagingPreChargeRefundOnFailedRead(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + initialTokens := float64(100000) + gc.run.requestUnitTokens.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + newTokens: initialTokens, + newFillRate: 0, + newBurst: 0, + }) + + predictedReadBytes := uint64(4 * 1024 * 1024) // 4 MB pre-charge + + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: predictedReadBytes, + } + // Failed response: no bytes read, no CPU consumed, succeed=false. + resp := &TestResponseInfo{ + readBytes: 0, + kvCPU: 0, + succeed: false, + } + + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.NoError(err) + tokensAfterPreCharge := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + _, _, err = gc.onResponseWaitImpl(context.TODO(), req, resp) + re.NoError(err) + tokensAfterSettlement := gc.run.requestUnitTokens.limiter.AvailableTokens(time.Now()) + + // On a failed read, AfterKVRequest still runs: paging settlement subtracts + // ReadBytesCost*predicted and calculateReadCost adds 0, yielding a negative + // delta that flows through RefundTokens. ReadBaseCost is not refunded, + // matching existing behavior for non-paging read failures. + cfg := DefaultRUConfig() + expectedRefund := float64(cfg.ReadBytesCost) * float64(predictedReadBytes) + re.InDelta(tokensAfterPreCharge+expectedRefund, tokensAfterSettlement, 1.0, + "failed read with paging hint should refund ReadBytesCost*predicted") +} + +func TestNoPreChargeWithoutPredictedReadBytes(t *testing.T) { + re := require.New(t) + cfg := DefaultRUConfig() + kvCalc := newKVCalculator(cfg) + baseCost := float64(cfg.ReadBaseCost) + float64(cfg.ReadPerBatchBaseCost)*defaultAvgBatchProportion + + // Without a PredictedReadBytes hint, BeforeKVRequest must not + // pre-charge; AfterKVRequest bills actual read bytes only. + req := &TestRequestInfo{isWrite: false} + precharge := &rmpb.Consumption{} + kvCalc.BeforeKVRequest(precharge, req) + re.InDelta(baseCost, precharge.RRU, 1e-6, + "Without a hint, BeforeKVRequest should only charge baseCost") + + actualReadBytes := uint64(2 * 1024 * 1024) + resp := &TestResponseInfo{ + readBytes: actualReadBytes, + kvCPU: 10 * time.Millisecond, + succeed: true, + } + settle := &rmpb.Consumption{} + kvCalc.AfterKVRequest(settle, req, resp) + + actualReadCost := float64(cfg.ReadBytesCost) * float64(actualReadBytes) + cpuCost := float64(cfg.CPUMsCost) * 10.0 + re.InDelta(actualReadCost+cpuCost, settle.RRU, 1e-6, + "AfterKVRequest should bill actual read cost only when nothing was pre-charged") +} + func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) @@ -221,6 +459,30 @@ func TestResourceGroupThrottledError(t *testing.T) { re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) } +func TestPagingPrechargeNotObservedOnThrottle(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + + // 4 GiB predicted ~= 65536 RU at default ReadCostPerByte (1/64KiB), + // exceeding the LTBMaxWaitDuration budget at the default FillRate so + // acquireTokens returns ErrClientResourceGroupThrottled. + req := &TestRequestInfo{ + isWrite: false, + predictedReadBytes: 4 * 1024 * 1024 * 1024, + } + + before := counterValue(re, gc.metrics.prechargeCounter) + _, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req) + re.Error(err) + re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) + after := counterValue(re, gc.metrics.prechargeCounter) + + // Throttled requests never reach OnResponse for settlement, so the + // precharge counter must not be incremented either. + re.Equal(before, after, + "throttled paging request should not inflate PagingPrechargeCounter") +} + func TestAcquireTokensSignalAwareWait(t *testing.T) { re := require.New(t) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 86e1440946e..b451ab244bb 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -325,6 +325,20 @@ func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.maybeNotify() } +// RefundTokens adds tokens back to the limiter. +// +// No burst cap is applied here +func (lim *Limiter) RefundTokens(now time.Time, amount float64) { + lim.mu.Lock() + defer lim.mu.Unlock() + if lim.burst < 0 || lim.fillRate == Inf { + return + } + _, tokens := lim.getTokens(now) + lim.last = now + lim.tokens = tokens + amount +} + type tokenBucketReconfigureArgs struct { newTokens float64 newFillRate float64 diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index f5b2c144757..8bd04eecdd4 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -139,6 +139,40 @@ func TestReconfig(t *testing.T) { re.Equal(int64(-1), lim.GetBurst()) } +func TestRefundTokens(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 0, 0, 100, nc) + + // Consume some tokens. + lim.RemoveTokens(t0, 30) + checkTokens(re, lim, t0, 70) + + // Refund part of them. + lim.RefundTokens(t0, 20) + checkTokens(re, lim, t0, 90) + + // Refund beyond the initial amount - allowed (no burst cap when burst==0). + lim.RefundTokens(t0, 50) + checkTokens(re, lim, t0, 140) + + // With burst > 0, refund up to burst stays. + limBurst := NewLimiter(t0, 0, 100, 50, nc) + limBurst.RemoveTokens(t0, 30) + checkTokens(re, limBurst, t0, 20) + limBurst.RefundTokens(t0, 80) // tokens = 20 + 80 = 100, burst = 100 + checkTokens(re, limBurst, t0, 100) + + // Refund beyond burst - capped by getTokens. + limBurst.RefundTokens(t0, 50) // tokens = 100 + 50 = 150, capped to 100 + checkTokens(re, limBurst, t0, 100) + + // Burstable (burst < 0): RefundTokens is a no-op. + limUnlimited := NewLimiter(t0, 0, -1, 100, nc) + limUnlimited.RefundTokens(t0, 50) + checkTokens(re, limUnlimited, t0, 100) +} + func TestNotify(t *testing.T) { nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) diff --git a/client/resource_group/controller/metrics/metrics.go b/client/resource_group/controller/metrics/metrics.go index 01231aceeaf..9130a02a692 100644 --- a/client/resource_group/controller/metrics/metrics.go +++ b/client/resource_group/controller/metrics/metrics.go @@ -53,6 +53,27 @@ var ( FailedTokenRequestDuration prometheus.Observer // SuccessfulTokenRequestDuration comments placeholder, WithLabelValues is a heavy operation, define variable to avoid call it every time. SuccessfulTokenRequestDuration prometheus.Observer + + // PagingPrechargeCounter counts paging read requests that triggered pre-charge (hint > 0). + PagingPrechargeCounter *prometheus.CounterVec + // PagingNonprechargeCounter counts paging read requests that skipped pre-charge (hint absent or zero). + PagingNonprechargeCounter *prometheus.CounterVec + + // PagingPrechargeBytesCounter accumulates bytes used as the pre-charge basis (sum of predicted hints). + PagingPrechargeBytesCounter *prometheus.CounterVec + // PagingActualBytesCounter accumulates actual bytes read by pre-charged paging requests. + PagingActualBytesCounter *prometheus.CounterVec + // PagingNonprechargeActualBytes accumulates actual bytes read by paging requests that skipped pre-charge. + PagingNonprechargeActualBytes *prometheus.CounterVec + // PagingPredictionResidualBytes records the distribution of (actual - predicted) read bytes for pre-charged requests. + PagingPredictionResidualBytes *prometheus.HistogramVec + + // PagingPrechargeRU accumulates RU pre-acquired at BeforeKVRequest for pre-charged paging read requests. + PagingPrechargeRU *prometheus.CounterVec + // PagingSettlementRU accumulates total RU finally consumed by pre-charged paging read requests (base + CPU + bytes cost). + PagingSettlementRU *prometheus.CounterVec + // PagingSettlementRUDelta records the distribution of per-RPC signed RU delta (settlement_ru - precharge_ru) for pre-charged paging read requests. + PagingSettlementRUDelta *prometheus.HistogramVec ) func init() { @@ -156,6 +177,97 @@ func initMetrics(constLabels prometheus.Labels) { // WithLabelValues is a heavy operation, define variable to avoid call it every time. FailedTokenRequestDuration = TokenRequestDuration.WithLabelValues("fail") SuccessfulTokenRequestDuration = TokenRequestDuration.WithLabelValues("success") + + PagingPrechargeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_total", + Help: "Counter of RC paging pre-charge events (PredictedReadBytes hint present and > 0).", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_total", + Help: "Counter of paging read RPCs where no PredictedReadBytes hint was provided (hint absent or zero). These RPCs skip pre-charge and settle entirely on actual read bytes at response time.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingPrechargeBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_bytes_total", + Help: "Sum of bytes used as the RC paging pre-charge basis (predicted hint).", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingActualBytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_actual_bytes_total", + Help: "Sum of actual bytes read by paging KV requests that were pre-charged.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingNonprechargeActualBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_nonprecharge_actual_bytes_total", + Help: "Sum of actual bytes read by paging RPCs that skipped pre-charge (hint absent or zero). Quantifies read volume settled without pre-charge throttling.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingPredictionResidualBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_prediction_residual_bytes", + // Signed residual = actual - predicted. Buckets span ±64MB to + // absorb large first-page responses when the predictor has no + // prior observation (predicted=0) and workload shifts that + // leave the prediction above actual. Factor-4 spacing keeps + // resolution near zero. + Buckets: []float64{-67108864, -16777216, -4194304, -1048576, -262144, -65536, -16384, -4096, 0, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864}, + Help: "Histogram of (actual_read_bytes - predicted_read_bytes) for pre-charged paging requests. Shows predictor accuracy.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingPrechargeRU = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_precharge_ru_total", + Help: "Sum of RU pre-acquired at BeforeKVRequest for pre-charged paging read requests (read base cost + ReadBytesCost * predicted_bytes).", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingSettlementRU = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_settlement_ru_total", + Help: "Sum of total RU finally consumed by pre-charged paging read requests (read base cost + CPUMsCost * kv_cpu_ms + ReadBytesCost * actual_bytes). Equals precharge_ru + settlement_ru_delta.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) + + PagingSettlementRUDelta = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "paging_settlement_ru_delta", + // v = settlement_ru - precharge_ru = CPUMsCost * kv_cpu_ms + ReadBytesCost * (actual - predicted). + // Negative => flows through RefundTokens; positive => flows through RemoveTokens / acquireTokens. + // Factor-4 spacing over ±1024 RU covers CPU (±tens of RU) plus bytes residuals up to ±64MB. + Buckets: []float64{-1024, -256, -64, -16, -4, -1, -0.25, -0.0625, 0, 0.0625, 0.25, 1, 4, 16, 64, 256, 1024}, + Help: "Per-RPC signed settlement RU delta (settlement_ru - precharge_ru) for pre-charged paging read requests. Negative means refund, positive means extra debit.", + ConstLabels: constLabels, + }, []string{newResourceGroupNameLabel}) } // InitAndRegisterMetrics initializes and register metrics. @@ -171,4 +283,13 @@ func InitAndRegisterMetrics(constLabels prometheus.Labels) { prometheus.MustRegister(ResourceGroupTokenRequestCounter) prometheus.MustRegister(LowTokenRequestNotifyCounter) prometheus.MustRegister(TokenConsumedHistogram) + prometheus.MustRegister(PagingPrechargeCounter) + prometheus.MustRegister(PagingNonprechargeCounter) + prometheus.MustRegister(PagingPrechargeBytesCounter) + prometheus.MustRegister(PagingActualBytesCounter) + prometheus.MustRegister(PagingNonprechargeActualBytes) + prometheus.MustRegister(PagingPredictionResidualBytes) + prometheus.MustRegister(PagingPrechargeRU) + prometheus.MustRegister(PagingSettlementRU) + prometheus.MustRegister(PagingSettlementRUDelta) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 54721969811..62730dcbcad 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -52,6 +52,20 @@ type RequestInfo interface { StoreID() uint64 RequestSize() uint64 AccessLocationType() AccessLocationType + // PredictedReadBytes returns the read-bytes hint used for RC paging + // pre-charge in BeforeKVRequest and settled symmetrically in + // AfterKVRequest. Return 0 to opt out; writes always return 0. + PredictedReadBytes() uint64 +} + +// estimatedReadBytes returns the predicted read-bytes hint for read requests. +// Writes always return 0 so paging pre-charge / settlement / metrics stay +// gated to reads. +func estimatedReadBytes(req RequestInfo) uint64 { + if req.IsWrite() { + return 0 + } + return req.PredictedReadBytes() } // ResponseInfo is the interface of the response information provider. A response should be @@ -104,6 +118,10 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque // Read bytes could not be known before the request is executed, // so we only add the base cost here. consumption.RRU += float64(kc.ReadBaseCost) + float64(kc.ReadPerBatchBaseCost)*defaultAvgBatchProportion + // Paging pre-charge + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU += float64(kc.ReadBytesCost) * float64(bytesForEst) + } } if req.AccessLocationType() == AccessCrossZone { if req.IsWrite() { @@ -138,6 +156,10 @@ func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Reques if !req.IsWrite() { // For now, we can only collect the KV CPU cost for a read request. kc.calculateCPUCost(consumption, res) + // Paging settlement + if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 { + consumption.RRU -= float64(kc.ReadBytesCost) * float64(bytesForEst) + } } else if !res.Succeed() { // If the write request is not successfully returned, we need to pay back the WRU cost. kc.payBackWriteCost(consumption, req) diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index a055b32fa6f..c14d9eea342 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -22,11 +22,12 @@ import "time" // TestRequestInfo is used to test the request info interface. type TestRequestInfo struct { - isWrite bool - writeBytes uint64 - numReplicas int64 - storeID uint64 - accessType AccessLocationType + isWrite bool + writeBytes uint64 + numReplicas int64 + storeID uint64 + accessType AccessLocationType + predictedReadBytes uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -70,6 +71,11 @@ func (tri *TestRequestInfo) AccessLocationType() AccessLocationType { return tri.accessType } +// PredictedReadBytes implements the RequestInfo interface. +func (tri *TestRequestInfo) PredictedReadBytes() uint64 { + return tri.predictedReadBytes +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64