Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type client struct {
svrUrls []string
pdSvcDiscovery *pdServiceDiscovery
tokenDispatcher *tokenDispatcher
regionClient *regionClient

// For service mode switching.
serviceModeKeeper
Expand Down Expand Up @@ -511,6 +512,9 @@ func (c *client) setup() error {
// Close closes the client.
func (c *client) Close() {
c.cancel()
if c.regionClient != nil {
c.regionClient.close()
}
c.wg.Wait()

c.serviceModeKeeper.close()
Expand Down Expand Up @@ -619,6 +623,15 @@ func (c *client) scheduleUpdateTokenConnection() {
}
}

func (c *client) getRegionClient() *regionClient {
c.Lock()
defer c.Unlock()
if c.regionClient == nil {
c.regionClient = newRegionClient(c.ctx, c.option, c.pdSvcDiscovery)
}
return c.regionClient
}

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(context.Context) uint64 {
return c.pdSvcDiscovery.GetClusterID()
Expand Down Expand Up @@ -671,6 +684,12 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.option.setTSOClientRPCConcurrency(value)
case EnableQueryRegion:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableQueryRegion option, it should be bool")
}
c.option.setEnableQueryRegion(enable)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When EnableQueryRegion is turned off dynamically, the existing regionClient keeps its daemon and streams alive until Close. Should we close/release it on disable to actually stop the stream path?

default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -888,8 +907,19 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

// GetRegion implements the RPCClient interface.
func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if c.option.getEnableQueryRegion() {
region, err := c.getRegionClient().GetRegion(ctx, key, opts...)
if err == nil {
return region, nil
}
log.Warn("[pd] query region stream failed, fallback to unary GetRegion", errs.ZapError(err))
}
return c.getRegionUnary(ctx, key, opts...)
}

// getRegionUnary implements GetRegion through the unary API.
func (c *client) getRegionUnary(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -926,8 +956,19 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
return handleRegionResponse(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if c.option.getEnableQueryRegion() {
region, err := c.getRegionClient().GetPrevRegion(ctx, key, opts...)
if err == nil {
return region, nil
}
log.Warn("[pd] query region stream failed, fallback to unary GetPrevRegion", errs.ZapError(err))
}
return c.getPrevRegionUnary(ctx, key, opts...)
}

// getPrevRegionUnary implements GetPrevRegion through the unary API.
func (c *client) getPrevRegionUnary(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -965,8 +1006,19 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
return handleRegionResponse(resp), nil
}

// GetRegionByID implements the RPCClient interface.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if c.option.getEnableQueryRegion() {
region, err := c.getRegionClient().GetRegionByID(ctx, regionID, opts...)
if err == nil {
return region, nil
}
log.Warn("[pd] query region stream failed, fallback to unary GetRegionByID", errs.ZapError(err))
}
return c.getRegionByIDUnary(ctx, regionID, opts...)
}

// getRegionByIDUnary implements GetRegionByID through the unary API.
func (c *client) getRegionByIDUnary(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
ErrClientRouterConnectionTimeout = errors.Normalize("router connection is not ready until timeout", errors.RFCCodeText("PD:client:ErrClientRouterConnectionTimeout"))
ErrCircuitBreakerOpen = errors.Normalize("circuit breaker is open", errors.RFCCodeText("PD:client:ErrCircuitBreakerOpen"))
)

Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20251212013835-ed676560b3b4
github.com/pingcap/kvproto v0.0.0-20260518092652-f96c651c7702
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.20.5
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20251212013835-ed676560b3b4 h1:uXlwBh9XoxQVfzI9vDkY6X4AusnuQA3ei1SHJ0484h4=
github.com/pingcap/kvproto v0.0.0-20251212013835-ed676560b3b4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20260518092652-f96c651c7702 h1:rCT/GGlOh2MK5ULmCbA+ZIGP/e2q4UHIUjeTVsQYKMg=
github.com/pingcap/kvproto v0.0.0-20260518092652-f96c651c7702/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
67 changes: 67 additions & 0 deletions client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ var (
EstimateTSOLatencyGauge *prometheus.GaugeVec
// CircuitBreakerCounters is a vector for different circuit breaker counters
CircuitBreakerCounters *prometheus.CounterVec
// QueryRegionBestBatchSize is the histogram of the best batch size of query region requests.
QueryRegionBestBatchSize prometheus.Histogram
// QueryRegionBatchSize is the histogram of the batch size of query region requests.
QueryRegionBatchSize *prometheus.HistogramVec
// QueryRegionBatchSendLatency is the histogram of the latency of sending query region requests.
QueryRegionBatchSendLatency prometheus.Histogram
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -178,6 +184,36 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "Circuit breaker counters",
ConstLabels: constLabels,
}, []string{"name", "event"})

QueryRegionBestBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "handle_query_region_best_batch_size",
Help: "Bucketed histogram of the best batch size of handled query region requests.",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
})

QueryRegionBatchSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "handle_query_region_batch_size",
Help: "Bucketed histogram of the batch size of handled query region requests.",
ConstLabels: constLabels,
Buckets: []float64{1, 2, 4, 8, 10, 14, 18, 22, 26, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 110, 120, 140, 160, 180, 200, 500, 1000},
}, []string{"type"})

QueryRegionBatchSendLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "query_region_batch_send_latency",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
Help: "query region batch send latency",
})
}

// CmdDurationXXX and CmdFailedDurationXXX are the durations of the client commands.
Expand Down Expand Up @@ -207,6 +243,9 @@ var (
CmdDurationPut prometheus.Observer
CmdDurationUpdateGCSafePointV2 prometheus.Observer
CmdDurationUpdateServiceSafePointV2 prometheus.Observer
CmdDurationQueryRegionAsyncWait prometheus.Observer
CmdDurationQueryRegionWait prometheus.Observer
CmdDurationQueryRegion prometheus.Observer

CmdFailDurationGetRegion prometheus.Observer
CmdFailDurationTSO prometheus.Observer
Expand All @@ -226,11 +265,23 @@ var (
CmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
CmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer
CmdFailedDurationGetAllKeyspaces prometheus.Observer
CmdFailedDurationQueryRegionAsyncWait prometheus.Observer
CmdFailedDurationQueryRegionWait prometheus.Observer
CmdFailedDurationQueryRegion prometheus.Observer

// RequestDurationTSO records the durations of the successful TSO requests.
RequestDurationTSO prometheus.Observer
// RequestFailedDurationTSO records the durations of the failed TSO requests.
RequestFailedDurationTSO prometheus.Observer
// RequestDurationQueryRegion records the durations of the successful query region requests.
RequestDurationQueryRegion prometheus.Observer
// RequestFailedDurationQueryRegion records the durations of the failed query region requests.
RequestFailedDurationQueryRegion prometheus.Observer

QueryRegionBatchSizeTotal prometheus.Observer
QueryRegionBatchSizeByKeys prometheus.Observer
QueryRegionBatchSizeByPrevKeys prometheus.Observer
QueryRegionBatchSizeByIDs prometheus.Observer
)

func initCmdDurations() {
Expand Down Expand Up @@ -260,6 +311,9 @@ func initCmdDurations() {
CmdDurationPut = cmdDuration.WithLabelValues("put")
CmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
CmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")
CmdDurationQueryRegionAsyncWait = cmdDuration.WithLabelValues("query_region_async_wait")
CmdDurationQueryRegionWait = cmdDuration.WithLabelValues("query_region_wait")
CmdDurationQueryRegion = cmdDuration.WithLabelValues("query_region")

CmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
CmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -279,9 +333,19 @@ func initCmdDurations() {
CmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
CmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")
CmdFailedDurationGetAllKeyspaces = cmdFailedDuration.WithLabelValues("get_all_keyspaces")
CmdFailedDurationQueryRegionAsyncWait = cmdFailedDuration.WithLabelValues("query_region_async_wait")
CmdFailedDurationQueryRegionWait = cmdFailedDuration.WithLabelValues("query_region_wait")
CmdFailedDurationQueryRegion = cmdFailedDuration.WithLabelValues("query_region")

RequestDurationTSO = requestDuration.WithLabelValues("tso")
RequestFailedDurationTSO = requestDuration.WithLabelValues("tso-failed")
RequestDurationQueryRegion = requestDuration.WithLabelValues("query_region")
RequestFailedDurationQueryRegion = requestDuration.WithLabelValues("query_region-failed")

QueryRegionBatchSizeTotal = QueryRegionBatchSize.WithLabelValues("total")
QueryRegionBatchSizeByKeys = QueryRegionBatchSize.WithLabelValues("by_keys")
QueryRegionBatchSizeByPrevKeys = QueryRegionBatchSize.WithLabelValues("by_prev_keys")
QueryRegionBatchSizeByIDs = QueryRegionBatchSize.WithLabelValues("by_ids")
}

func registerMetrics() {
Expand All @@ -294,4 +358,7 @@ func registerMetrics() {
prometheus.MustRegister(RequestForwarded)
prometheus.MustRegister(EstimateTSOLatencyGauge)
prometheus.MustRegister(CircuitBreakerCounters)
prometheus.MustRegister(QueryRegionBestBatchSize)
prometheus.MustRegister(QueryRegionBatchSize)
prometheus.MustRegister(QueryRegionBatchSendLatency)
}
31 changes: 31 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultEnableQueryRegion = false
defaultTSOClientRPCConcurrency = 1
)

Expand All @@ -46,6 +47,9 @@ const (
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency
// EnableQueryRegion is the QueryRegion option.
// It is stored as bool.
EnableQueryRegion

dynamicOptionCount
)
Expand All @@ -66,6 +70,7 @@ type option struct {
dynamicOptions [dynamicOptionCount]atomic.Value

enableTSOFollowerProxyCh chan struct{}
enableFollowerHandleCh chan struct{}
}

// newOption creates a new PD client option with the default values set.
Expand All @@ -74,13 +79,15 @@ func newOption() *option {
timeout: defaultPDTimeout,
maxRetryTimes: maxInitClusterRetries,
enableTSOFollowerProxyCh: make(chan struct{}, 1),
enableFollowerHandleCh: make(chan struct{}, 1),
initMetrics: true,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency)
co.dynamicOptions[EnableQueryRegion].Store(defaultEnableQueryRegion)
return co
}

Expand All @@ -102,6 +109,10 @@ func (o *option) setEnableFollowerHandle(enable bool) {
old := o.getEnableFollowerHandle()
if enable != old {
o.dynamicOptions[EnableFollowerHandle].Store(enable)
select {
case o.enableFollowerHandleCh <- struct{}{}:
default:
}
}
}

Expand All @@ -110,6 +121,19 @@ func (o *option) getEnableFollowerHandle() bool {
return o.dynamicOptions[EnableFollowerHandle].Load().(bool)
}

// setEnableQueryRegion sets the QueryRegion option.
func (o *option) setEnableQueryRegion(enable bool) {
old := o.getEnableQueryRegion()
if enable != old {
o.dynamicOptions[EnableQueryRegion].Store(enable)
}
}

// getEnableQueryRegion gets the QueryRegion option.
func (o *option) getEnableQueryRegion() bool {
return o.dynamicOptions[EnableQueryRegion].Load().(bool)
}

// getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option.
func (o *option) getMaxTSOBatchWaitInterval() time.Duration {
return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration)
Expand Down Expand Up @@ -239,6 +263,13 @@ func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
}
}

// WithEnableQueryRegion enables the QueryRegion streaming client.
func WithEnableQueryRegion() ClientOption {
return func(c *client) {
c.option.setEnableQueryRegion(true)
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
Expand Down
12 changes: 12 additions & 0 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestDynamicOptionChange(t *testing.T) {
re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval())
re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy())
re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle())
re.Equal(defaultEnableQueryRegion, o.getEnableQueryRegion())

// Check the invalid value setting.
re.Error(o.setMaxTSOBatchWaitInterval(time.Second))
Expand Down Expand Up @@ -59,8 +60,19 @@ func TestDynamicOptionChange(t *testing.T) {

expectBool = true
o.setEnableFollowerHandle(expectBool)
testutil.Eventually(re, func() bool {
<-o.enableFollowerHandleCh
return true
})
re.Equal(expectBool, o.getEnableFollowerHandle())
expectBool = false
o.setEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.getEnableFollowerHandle())

expectBool = true
o.setEnableQueryRegion(expectBool)
re.Equal(expectBool, o.getEnableQueryRegion())
expectBool = false
o.setEnableQueryRegion(expectBool)
re.Equal(expectBool, o.getEnableQueryRegion())
}
Loading
Loading