Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 94 additions & 27 deletions client/resource_group/controller/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ type groupMetricsCollection struct {
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer

// Paging pre-charge observers, cached per-RG to avoid WithLabelValues
// on the hot path.
prechargeCounter prometheus.Counter
prechargeBytesCounter prometheus.Counter
actualBytesCounter prometheus.Counter
predictionResidualBytes prometheus.Observer
nonprechargeCounter prometheus.Counter
nonprechargeActualBytes prometheus.Counter
prechargeRU prometheus.Counter
settlementRU prometheus.Counter
settlementRUDelta prometheus.Observer
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -122,9 +134,44 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),

prechargeCounter: metrics.PagingPrechargeCounter.WithLabelValues(name),
prechargeBytesCounter: metrics.PagingPrechargeBytesCounter.WithLabelValues(name),
actualBytesCounter: metrics.PagingActualBytesCounter.WithLabelValues(name),
predictionResidualBytes: metrics.PagingPredictionResidualBytes.WithLabelValues(name),

nonprechargeCounter: metrics.PagingNonprechargeCounter.WithLabelValues(name),
nonprechargeActualBytes: metrics.PagingNonprechargeActualBytes.WithLabelValues(name),
prechargeRU: metrics.PagingPrechargeRU.WithLabelValues(name),
settlementRU: metrics.PagingSettlementRU.WithLabelValues(name),
settlementRUDelta: metrics.PagingSettlementRUDelta.WithLabelValues(name),
}
}

// observePagingPrecharge requires bytesForEst > 0.
// prechargeRU is the total RU pre-acquired at BeforeKVRequest
// (base + ReadBytesCost * bytesForEst).
func (gmc *groupMetricsCollection) observePagingPrecharge(bytesForEst uint64, prechargeRU float64) {
gmc.prechargeCounter.Inc()
gmc.prechargeBytesCounter.Add(float64(bytesForEst))
gmc.prechargeRU.Add(prechargeRU)
}

// observePagingActual requires predicted > 0.
// settlementRU is the total RU finally consumed (base + CPU + ReadBytesCost * actual);
// vDelta is (settlement_ru - precharge_ru), the signed per-RPC RU adjustment.
func (gmc *groupMetricsCollection) observePagingActual(predicted, actual uint64, settlementRU, vDelta float64) {
gmc.actualBytesCounter.Add(float64(actual))
gmc.predictionResidualBytes.Observe(float64(actual) - float64(predicted))
gmc.settlementRU.Add(settlementRU)
gmc.settlementRUDelta.Observe(vDelta)
}

func (gmc *groupMetricsCollection) observePagingNonprecharge(actual uint64) {
gmc.nonprechargeCounter.Inc()
gmc.nonprechargeActualBytes.Add(float64(actual))
}

type tokenCounter struct {
fillRate uint64

Expand Down Expand Up @@ -576,6 +623,9 @@ func (gc *groupCostController) onRequestWaitImpl(
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}
if bytesForEst := estimatedReadBytes(info); bytesForEst > 0 {
gc.metrics.observePagingPrecharge(bytesForEst, getRUValueFromConsumption(delta))
}

gc.mu.Lock()
// Calculate the penalty of the store
Expand All @@ -601,23 +651,30 @@ func (gc *groupCostController) onResponseImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
// `count` is the full per-request consumption (BeforeKVRequest + AfterKVRequest).
count := &rmpb.Consumption{}
*count = *delta
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes(),
getRUValueFromConsumption(count), getRUValueFromConsumption(delta))
} else if !req.IsWrite() {
gc.metrics.observePagingNonprecharge(resp.ReadBytes())
}
if !gc.burstable.Load() {
counter := gc.run.requestUnitTokens
if v := getRUValueFromConsumption(delta); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
} else if v < 0 {
// Paging over-estimate: refund the excess pre-charge.
counter.limiter.RefundTokens(time.Now(), -v)
}
}

gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
add(gc.mu.storeCounter[req.StoreID()], count)
add(gc.mu.globalCounter, count)
gc.mu.Unlock()
Expand All @@ -632,33 +689,43 @@ func (gc *groupCostController) onResponseWaitImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
// `count` is the full per-request consumption (BeforeKVRequest + AfterKVRequest).
count := &rmpb.Consumption{}
*count = *delta
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
if bytesForEst := estimatedReadBytes(req); bytesForEst > 0 {
gc.metrics.observePagingActual(bytesForEst, resp.ReadBytes(),
getRUValueFromConsumption(count), getRUValueFromConsumption(delta))
} else if !req.IsWrite() {
gc.metrics.observePagingNonprecharge(resp.ReadBytes())
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
v := getRUValueFromConsumption(delta)
if v > 0 {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
return nil, waitDuration, err
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
} else if v < 0 {
// Paging over-estimate: refund the excess pre-charge.
gc.run.requestUnitTokens.limiter.RefundTokens(time.Now(), -v)
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
add(gc.mu.storeCounter[req.StoreID()], count)
add(gc.mu.globalCounter, count)
gc.mu.Unlock()
Expand Down
Loading
Loading