Skip to content

Commit

Permalink
client/http: implement the marshaler interfaces for Rule/RuleOp (#7462)
Browse files Browse the repository at this point in the history
ref #7300

Implement the marshaler interfaces for `Rule` and `RuleOP` to make sure we could set/get the correct start/end key.

Ref https://github.com/pingcap/tidb/blob/46d4231c8b0ade353b98572e7c2a015bddf940f4/pkg/ddl/placement/rule.go#L76-L91.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Nov 30, 2023
1 parent beedacb commit 862eee1
Show file tree
Hide file tree
Showing 9 changed files with 582 additions and 24 deletions.
14 changes: 13 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
regionsReplicated = "/pd/api/v1/regions/replicated"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
Expand Down Expand Up @@ -95,9 +96,20 @@ func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID)
}

// RegionsReplicatedByKeyRange returns the path of PD HTTP API to get replicated regions with given start key and end key.
func RegionsReplicatedByKeyRange(keyRange *KeyRange) string {
startKeyStr, endKeyStr := keyRange.EscapeAsHexStr()
return fmt.Sprintf("%s?startKey=%s&endKey=%s",
regionsReplicated, startKeyStr, endKeyStr)
}

// RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByKeyRange(keyRange *KeyRange) string {
func RegionStatsByKeyRange(keyRange *KeyRange, onlyCount bool) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
if onlyCount {
return fmt.Sprintf("%s?start_key=%s&end_key=%s&count",
StatsRegion, startKeyStr, endKeyStr)
}
return fmt.Sprintf("%s?start_key=%s&end_key=%s",
StatsRegion, startKeyStr, endKeyStr)
}
Expand Down
21 changes: 18 additions & 3 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ type Client interface {
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetRegionsReplicatedStateByKeyRange(context.Context, *KeyRange) (string, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Config-related interfaces */
GetScheduleConfig(context.Context) (map[string]interface{}, error)
Expand Down Expand Up @@ -356,6 +357,19 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
return &regions, nil
}

// GetRegionsReplicatedStateByKeyRange gets the regions replicated state info by key range.
// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes).
func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRange *KeyRange) (string, error) {
var state string
err := c.requestWithRetry(ctx,
"GetRegionsReplicatedStateByKeyRange", RegionsReplicatedByKeyRange(keyRange),
http.MethodGet, http.NoBody, &state)
if err != nil {
return "", err
}
return state, nil
}

// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
Expand Down Expand Up @@ -398,11 +412,12 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion
}

// GetRegionStatusByKeyRange gets the region status by key range.
// If the `onlyCount` flag is true, the result will only include the count of regions.
// The keys in the key range should be encoded in the UTF-8 bytes format.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange, onlyCount bool) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange),
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange, onlyCount),
http.MethodGet, http.NoBody, &regionStats,
)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions client/http/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"encoding/hex"

"github.com/pingcap/errors"
)

const (
encGroupSize = 8
encMarker = byte(0xFF)
encPad = byte(0x0)
)

var pads = make([]byte, encGroupSize)

// encodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
//
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
//
// For example:
//
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
//
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
func encodeBytes(data []byte) []byte {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

func decodeBytes(b []byte) ([]byte, error) {
buf := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
return nil, errors.New("insufficient bytes to decode value")
}

groupBytes := b[:encGroupSize+1]

group := groupBytes[:encGroupSize]
marker := groupBytes[encGroupSize]

padCount := encMarker - marker
if padCount > encGroupSize {
return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}

realGroupSize := encGroupSize - padCount
buf = append(buf, group[:realGroupSize]...)
b = b[encGroupSize+1:]

if padCount != 0 {
// Check validity of padding bytes.
for _, v := range group[realGroupSize:] {
if v != encPad {
return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
}
break
}
}
return buf, nil
}

// rawKeyToKeyHexStr converts a raw key to a hex string after encoding.
func rawKeyToKeyHexStr(key []byte) string {
if len(key) == 0 {
return ""
}
return hex.EncodeToString(encodeBytes(key))
}

// keyHexStrToRawKey converts a hex string to a raw key after decoding.
func keyHexStrToRawKey(hexKey string) ([]byte, error) {
if len(hexKey) == 0 {
return make([]byte, 0), nil
}
key, err := hex.DecodeString(hexKey)
if err != nil {
return nil, err
}
return decodeBytes(key)
}
64 changes: 64 additions & 0 deletions client/http/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBytesCodec(t *testing.T) {
inputs := []struct {
enc []byte
dec []byte
}{
{[]byte{}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 248}},
{[]byte{1, 2, 3}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 250}},
{[]byte{1, 2, 3, 0}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 251}},
{[]byte{1, 2, 3, 4, 5, 6, 7}, []byte{1, 2, 3, 4, 5, 6, 7, 0, 254}},
{[]byte{0, 0, 0, 0, 0, 0, 0, 0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{1, 2, 3, 4, 5, 6, 7, 8}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248}},
}

for _, input := range inputs {
b := encodeBytes(input.enc)
require.Equal(t, input.dec, b)

d, err := decodeBytes(b)
require.NoError(t, err)
require.Equal(t, input.enc, d)
}

// Test error decode.
errInputs := [][]byte{
{1, 2, 3, 4},
{0, 0, 0, 0, 0, 0, 0, 247},
{0, 0, 0, 0, 0, 0, 0, 0, 246},
{0, 0, 0, 0, 0, 0, 0, 1, 247},
{1, 2, 3, 4, 5, 6, 7, 8, 0},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 255},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 0},
}

for _, input := range errInputs {
_, err := decodeBytes(input)
require.Error(t, err)
}
}
Loading

0 comments on commit 862eee1

Please sign in to comment.