Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ func MakeKeyspacePrefix(mode byte, id uint32) []byte {
return prefix
}

// ParseKeyspacePrefix parses a raw keyspace prefix from key.
// It returns false for keys that do not start with a known keyspace mode byte.
func ParseKeyspacePrefix(key []byte) (mode byte, id uint32, ok bool) {
if len(key) < KeyspacePrefixLen {
return 0, 0, false
}
mode = key[0]
if mode != RawKeyspaceModePrefix && mode != TxnKeyspaceModePrefix {
return 0, 0, false
}
idBytes := [KeyspacePrefixLen]byte{0, key[1], key[2], key[3]}
id = binary.BigEndian.Uint32(idBytes[:])
return mode, id, true
}

// RegionBound represents the region boundary of the given keyspace.
// For a keyspace with id ['a', 'b', 'c'], it has four boundaries:
//
Expand Down
38 changes: 38 additions & 0 deletions pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,44 @@ func TestMaxKeyspaceLabelRuleSplitKeys(t *testing.T) {
)
}

func TestParseKeyspacePrefix(t *testing.T) {
re := require.New(t)

testCases := []struct {
name string
key []byte
mode byte
id uint32
}{
{
name: "raw",
key: []byte{'r', 0x01, 0x02, 0x03},
mode: RawKeyspaceModePrefix,
id: 0x010203,
},
{
name: "txn with suffix",
key: []byte{'x', 0xff, 0xff, 0xff, 't'},
mode: TxnKeyspaceModePrefix,
id: constant.MaxValidKeyspaceID,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(_ *testing.T) {
mode, id, ok := ParseKeyspacePrefix(testCase.key)
re.True(ok)
re.Equal(testCase.mode, mode)
re.Equal(testCase.id, id)
})
}

_, _, ok := ParseKeyspacePrefix([]byte{'x', 0x01, 0x02})
re.False(ok)
_, _, ok = ParseKeyspacePrefix([]byte{'t', 0x01, 0x02, 0x03})
re.False(ok)
}

func TestValidateName(t *testing.T) {
re := require.New(t)
testCases := []struct {
Expand Down
14 changes: 13 additions & 1 deletion pkg/schedule/checker/split_scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package checker

import (
"bytes"
"context"
"fmt"
"sort"
Expand All @@ -24,6 +25,7 @@ import (

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/keyspace"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -89,6 +91,16 @@ type splitScatterRangeHint struct {
scatterGroup string
}

func (c *splitScatterController) hasSplitScatterTxnKeyspaceBounds(keyspaceID uint32) bool {
regionBound := keyspace.MakeRegionBound(keyspaceID)
return c.hasRegionStartKey(regionBound.TxnLeftBound) && c.hasRegionStartKey(regionBound.TxnRightBound)
}

func (c *splitScatterController) hasRegionStartKey(key []byte) bool {
region := c.cluster.GetRegionByKey(key)
return region != nil && bytes.Equal(region.GetStartKey(), key)
}

func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []splitScatterPendingItem {
if limit <= 0 {
return nil
Expand Down Expand Up @@ -343,7 +355,7 @@ func (c *splitScatterController) dispatchSplitScatterRegions() {
splitScatterDispatchRegionMissingCounter.Inc()
continue
}
rangeHint := resolveSplitScatterRangeHint(region)
rangeHint := resolveSplitScatterRangeHintWithKeyspaceValidator(region, c.hasSplitScatterTxnKeyspaceBounds)
scatterGroup := pending.group
if rangeHint.scatterGroup != "" {
scatterGroup = rangeHint.scatterGroup
Expand Down
91 changes: 82 additions & 9 deletions pkg/schedule/checker/split_scatter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,52 @@ import (

"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/keyspace"
)

var (
splitScatterTablePrefix = []byte{'t'}
splitScatterIndexPrefix = []byte("_i")
)

func resolveSplitScatterRangeHint(region *core.RegionInfo) splitScatterRangeHint {
_, decoded, err := codec.DecodeBytes(region.GetStartKey())
if err != nil || !bytes.HasPrefix(decoded, splitScatterTablePrefix) {
type splitScatterKeyspaceValidator func(uint32) bool

type splitScatterDecodedKey struct {
rawKey []byte
keyspacePrefix []byte
keyspaceID uint32
}

func (key splitScatterDecodedKey) hasKeyspace() bool {
return len(key.keyspacePrefix) > 0
}

func resolveSplitScatterRangeHintWithKeyspaceValidator(
region *core.RegionInfo,
validateKeyspace splitScatterKeyspaceValidator,
) splitScatterRangeHint {
decodedKey, err := decodeSplitScatterRegionKey(region.GetStartKey(), validateKeyspace)
if err != nil {
return splitScatterRangeHint{}
}
rest := decoded[len(splitScatterTablePrefix):]
rawKey := decodedKey.rawKey

if !bytes.HasPrefix(rawKey, splitScatterTablePrefix) {
return splitScatterRangeHint{}
}
rest := rawKey[len(splitScatterTablePrefix):]
rest, tableID, err := codec.DecodeInt(rest)
if err != nil {
return splitScatterRangeHint{}
}

tablePrefix := append([]byte(nil), codec.GenerateTableKey(tableID)...)
tableGroup := makeSplitScatterTableGroup(tableID)
if decodedKey.hasKeyspace() {
tableGroup = makeSplitScatterKeyspaceTableGroup(decodedKey.keyspaceID, tableID)
}
if !bytes.HasPrefix(rest, splitScatterIndexPrefix) {
return splitScatterPrefixRangeWithGroup(tablePrefix, tableGroup)
return splitScatterPrefixRangeWithKeyspaceGroup(decodedKey.keyspacePrefix, tablePrefix, tableGroup)
}

indexRest := rest[len(splitScatterIndexPrefix):]
Expand All @@ -52,7 +76,10 @@ func resolveSplitScatterRangeHint(region *core.RegionInfo) splitScatterRangeHint

indexPrefix := codec.GenerateIndexKey(tableID, indexID)
indexGroup := makeSplitScatterIndexGroup(tableID, indexID)
indexRange := splitScatterPrefixRangeWithGroup(indexPrefix, indexGroup)
if decodedKey.hasKeyspace() {
indexGroup = makeSplitScatterKeyspaceIndexGroup(decodedKey.keyspaceID, tableID, indexID)
}
indexRange := splitScatterPrefixRangeWithKeyspaceGroup(decodedKey.keyspacePrefix, indexPrefix, indexGroup)
endKey := region.GetEndKey()

// We intentionally over-approximate ambiguous table-key ranges. If PD can
Expand All @@ -63,17 +90,48 @@ func resolveSplitScatterRangeHint(region *core.RegionInfo) splitScatterRangeHint
// Both endKey and indexRange.endKey are MemComparable-encoded, so
// bytes.Compare correctly reflects the key ordering.
if len(endKey) == 0 || len(indexRange.startKey) == 0 || len(indexRange.endKey) == 0 || bytes.Compare(endKey, indexRange.endKey) > 0 {
return splitScatterPrefixRangeWithGroup(tablePrefix, tableGroup)
return splitScatterPrefixRangeWithKeyspaceGroup(decodedKey.keyspacePrefix, tablePrefix, tableGroup)
}
return indexRange
}

func decodeSplitScatterRegionKey(
regionKey []byte,
validateKeyspace splitScatterKeyspaceValidator,
) (splitScatterDecodedKey, error) {
_, rawKey, err := codec.DecodeBytes(regionKey)
if err != nil {
return splitScatterDecodedKey{}, err
}
decodedKey := splitScatterDecodedKey{rawKey: rawKey}
mode, keyspaceID, ok := keyspace.ParseKeyspacePrefix(rawKey)
if !ok || mode != keyspace.TxnKeyspaceModePrefix {
return decodedKey, nil
}

// Split-scatter range hints are table/index-scoped, so only TiDB txn
// keyspace keys from a known keyspace range are decoded. With only a
// region key, an API V2 txn prefix can be indistinguishable from a
// classic/raw user key that starts with the same bytes.
if validateKeyspace == nil || !validateKeyspace(keyspaceID) {
return decodedKey, nil
}
decodedKey.rawKey = rawKey[keyspace.KeyspacePrefixLen:]
decodedKey.keyspacePrefix = keyspace.MakeKeyspacePrefix(mode, keyspaceID)
decodedKey.keyspaceID = keyspaceID
return decodedKey, nil
}

func splitScatterPrefixRange(rawPrefix []byte) splitScatterRangeHint {
return splitScatterPrefixRangeWithGroup(rawPrefix, "")
}

func splitScatterPrefixRangeWithGroup(rawPrefix []byte, scatterGroup string) splitScatterRangeHint {
startKey := append([]byte(nil), codec.EncodeBytes(rawPrefix)...)
return splitScatterPrefixRangeWithKeyspaceGroup(nil, rawPrefix, scatterGroup)
}

func splitScatterPrefixRangeWithKeyspaceGroup(keyspacePrefix, rawPrefix []byte, scatterGroup string) splitScatterRangeHint {
startKey := codec.EncodeBytes(appendKeyspacePrefix(keyspacePrefix, rawPrefix))
endRawPrefix := splitScatterNextPrefix(rawPrefix)
if len(endRawPrefix) == 0 {
// Current callers use TiDB table/index prefixes, which always start
Expand All @@ -84,11 +142,18 @@ func splitScatterPrefixRangeWithGroup(rawPrefix []byte, scatterGroup string) spl
}
return splitScatterRangeHint{
startKey: startKey,
endKey: append([]byte(nil), codec.EncodeBytes(endRawPrefix)...),
endKey: codec.EncodeBytes(appendKeyspacePrefix(keyspacePrefix, endRawPrefix)),
scatterGroup: scatterGroup,
}
}

func appendKeyspacePrefix(keyspacePrefix, rawKey []byte) []byte {
key := make([]byte, 0, len(keyspacePrefix)+len(rawKey))
key = append(key, keyspacePrefix...)
key = append(key, rawKey...)
return key
}

func makeSplitScatterTableGroup(tableID int64) string {
return fmt.Sprintf("split-scatter-table-%d", tableID)
}
Expand All @@ -97,6 +162,14 @@ func makeSplitScatterIndexGroup(tableID, indexID int64) string {
return fmt.Sprintf("split-scatter-index-%d-%d", tableID, indexID)
}

func makeSplitScatterKeyspaceTableGroup(keyspaceID uint32, tableID int64) string {
return fmt.Sprintf("split-scatter-keyspace-%d-table-%d", keyspaceID, tableID)
}

func makeSplitScatterKeyspaceIndexGroup(keyspaceID uint32, tableID, indexID int64) string {
return fmt.Sprintf("split-scatter-keyspace-%d-index-%d-%d", keyspaceID, tableID, indexID)
}

func splitScatterNextPrefix(key []byte) []byte {
next := append([]byte(nil), key...)
for i := len(next) - 1; i >= 0; i-- {
Expand Down
Loading
Loading