From 48f34b12a18cd09544c82fb0251cbde30847ab21 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 21 Nov 2023 14:27:09 +0800 Subject: [PATCH 1/4] Integrate PD HTTP client into the label manager Signed-off-by: JmPotato --- DEPS.bzl | 12 +++--- go.mod | 2 +- go.sum | 4 +- pkg/ddl/attributes_sql_test.go | 8 ++-- pkg/ddl/label/BUILD.bazel | 2 + pkg/ddl/label/attributes.go | 41 +++++++++----------- pkg/ddl/label/attributes_test.go | 31 +++++++-------- pkg/ddl/label/rule.go | 38 ++++++++----------- pkg/ddl/label/rule_test.go | 11 +++--- pkg/ddl/main_test.go | 2 +- pkg/ddl/partition.go | 4 +- pkg/ddl/tests/serial/main_test.go | 2 +- pkg/domain/BUILD.bazel | 1 + pkg/domain/db_test.go | 4 +- pkg/domain/domain.go | 15 ++++++-- pkg/domain/infosync/info.go | 38 ++++++++++++++++--- pkg/domain/infosync/info_test.go | 6 +-- pkg/domain/infosync/label_manager.go | 56 +++++++++------------------- pkg/executor/infoschema_reader.go | 15 ++++---- pkg/kv/BUILD.bazel | 1 + pkg/kv/kv.go | 2 + pkg/server/stat_test.go | 2 +- pkg/store/gcworker/gc_worker.go | 2 +- 23 files changed, 155 insertions(+), 144 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index f75530ea23ccf..f4e3876b2ce85 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7132,13 +7132,13 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "014bb8796797b8b5cecc22866a1aab8491e3718c540168ac91257cf7f220cc84", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231117041718-dda748abe55d", + sha256 = "59cdbbd817215302d32ea46520891e71afb4fbc8b8c5e8543eb23043e19efdfc", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231121064140-2349f011e4ef", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 412516e3c31fc..36d44a22c2c18 100644 --- a/go.mod +++ b/go.mod @@ -103,7 +103,7 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 - github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d + github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index c8eb62ed5532a..d7f36145553c0 100644 --- a/go.sum +++ b/go.sum @@ -993,8 +993,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE= github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g= -github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d h1:6isljjnUH8zzkJx2X8MUGh+5AlMv+pCEhCy5MSyuhSM= -github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef h1:LA4n4tKcdK5tu1AesE8rEVHYaAeZMoKYqJ/qX9wrvyw= +github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/ddl/attributes_sql_test.go b/pkg/ddl/attributes_sql_test.go index c7c240d80e2bd..b5215b9b6ef28 100644 --- a/pkg/ddl/attributes_sql_test.go +++ b/pkg/ddl/attributes_sql_test.go @@ -252,7 +252,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -310,7 +310,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -363,7 +363,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -427,7 +427,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/ddl/label/BUILD.bazel b/pkg/ddl/label/BUILD.bazel index 3e8f40d4c88ca..75a89a0767e97 100644 --- a/pkg/ddl/label/BUILD.bazel +++ b/pkg/ddl/label/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/parser/ast", "//pkg/tablecodec", "//pkg/util/codec", + "@com_github_tikv_pd_client//http", "@in_gopkg_yaml_v2//:yaml_v2", ], ) @@ -32,6 +33,7 @@ go_test( "//pkg/parser/ast", "//pkg/testkit/testsetup", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//http", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/ddl/label/attributes.go b/pkg/ddl/label/attributes.go index b797f66e73168..b51642373cca1 100644 --- a/pkg/ddl/label/attributes.go +++ b/pkg/ddl/label/attributes.go @@ -17,6 +17,8 @@ package label import ( "fmt" "strings" + + pd "github.com/tikv/pd/client/http" ) const ( @@ -37,15 +39,9 @@ const ( AttributesDuplicated ) -// Label is used to describe attributes -type Label struct { - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` -} - // NewLabel creates a new label for a given string. -func NewLabel(attr string) (Label, error) { - l := Label{} +func NewLabel(attr string) (pd.RegionLabel, error) { + l := pd.RegionLabel{} kv := strings.Split(attr, "=") if len(kv) != 2 { return l, fmt.Errorf("%w: %s", ErrInvalidAttributesFormat, attr) @@ -66,14 +62,14 @@ func NewLabel(attr string) (Label, error) { return l, nil } -// Restore converts a Attribute to a string. -func (l *Label) Restore() string { +// RestoreRegionLabel converts a Attribute to a string. +func RestoreRegionLabel(l *pd.RegionLabel) string { return l.Key + "=" + l.Value } // CompatibleWith will check if two constraints are compatible. // Return (compatible, duplicated). -func (l *Label) CompatibleWith(o *Label) AttributesCompatibility { +func CompatibleWith(l *pd.RegionLabel, o *pd.RegionLabel) AttributesCompatibility { if l.Key != o.Key { return AttributesCompatible } @@ -85,26 +81,23 @@ func (l *Label) CompatibleWith(o *Label) AttributesCompatibility { return AttributesIncompatible } -// Labels is a slice of Label. -type Labels []Label - // NewLabels creates a slice of Label for given attributes. -func NewLabels(attrs []string) (Labels, error) { - labels := make(Labels, 0, len(attrs)) +func NewLabels(attrs []string) ([]pd.RegionLabel, error) { + labels := make([]pd.RegionLabel, 0, len(attrs)) for _, attr := range attrs { label, err := NewLabel(attr) if err != nil { return nil, err } - if err := labels.Add(label); err != nil { + if err := Add(&labels, label); err != nil { return nil, err } } return labels, nil } -// Restore converts Attributes to a string. -func (labels *Labels) Restore() string { +// RestoreRegionLabels converts Attributes to a string. +func RestoreRegionLabels(labels *[]pd.RegionLabel) string { var sb strings.Builder for i, label := range *labels { switch label.Key { @@ -117,25 +110,25 @@ func (labels *Labels) Restore() string { sb.WriteByte(',') } sb.WriteByte('"') - sb.WriteString(label.Restore()) + sb.WriteString(RestoreRegionLabel(&label)) sb.WriteByte('"') } return sb.String() } // Add will add a new attribute, with validation of all attributes. -func (labels *Labels) Add(label Label) error { +func Add(labels *[]pd.RegionLabel, label pd.RegionLabel) error { for i := range *labels { l := (*labels)[i] - res := label.CompatibleWith(&l) + res := CompatibleWith(&label, &l) if res == AttributesCompatible { continue } if res == AttributesDuplicated { return nil } - s1 := label.Restore() - s2 := l.Restore() + s1 := RestoreRegionLabel(&label) + s2 := RestoreRegionLabel(&l) return fmt.Errorf("'%s' and '%s' are conflicted", s1, s2) } diff --git a/pkg/ddl/label/attributes_test.go b/pkg/ddl/label/attributes_test.go index fcf53db3706b8..a80794a72014d 100644 --- a/pkg/ddl/label/attributes_test.go +++ b/pkg/ddl/label/attributes_test.go @@ -18,20 +18,21 @@ import ( "testing" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) func TestNewLabel(t *testing.T) { type TestCase struct { name string input string - label Label + label pd.RegionLabel } tests := []TestCase{ { name: "normal", input: "merge_option=allow", - label: Label{ + label: pd.RegionLabel{ Key: "merge_option", Value: "allow", }, @@ -39,7 +40,7 @@ func TestNewLabel(t *testing.T) { { name: "normal with space", input: " merge_option=allow ", - label: Label{ + label: pd.RegionLabel{ Key: "merge_option", Value: "allow", }, @@ -58,7 +59,7 @@ func TestNewLabel(t *testing.T) { func TestRestoreLabel(t *testing.T) { type TestCase struct { name string - input Label + input pd.RegionLabel output string } @@ -83,7 +84,7 @@ func TestRestoreLabel(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - output := test.input.Restore() + output := RestoreRegionLabel(&test.input) require.Equal(t, test.output, output) }) } @@ -124,8 +125,8 @@ func TestNewLabels(t *testing.T) { func TestAddLabels(t *testing.T) { type TestCase struct { name string - labels Labels - label Label + labels []pd.RegionLabel + label pd.RegionLabel err bool } @@ -154,7 +155,7 @@ func TestAddLabels(t *testing.T) { }, { "duplicated attributes, skip", - append(labels, Label{ + append(labels, pd.RegionLabel{ Key: "merge_option", Value: "allow", }), @@ -171,7 +172,7 @@ func TestAddLabels(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - err = test.labels.Add(test.label) + err = Add(&test.labels, test.label) if test.err { require.Error(t, err) } else { @@ -185,7 +186,7 @@ func TestAddLabels(t *testing.T) { func TestRestoreLabels(t *testing.T) { type TestCase struct { name string - input Labels + input []pd.RegionLabel output string } @@ -203,29 +204,29 @@ func TestRestoreLabels(t *testing.T) { tests := []TestCase{ { "normal1", - Labels{}, + []pd.RegionLabel{}, "", }, { "normal2", - Labels{input1, input2}, + []pd.RegionLabel{input1, input2}, `"merge_option=allow","key=value"`, }, { "normal3", - Labels{input3, input4, input5}, + []pd.RegionLabel{input3, input4, input5}, "", }, { "normal4", - Labels{input1, input2, input3}, + []pd.RegionLabel{input1, input2, input3}, `"merge_option=allow","key=value"`, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - output := test.input.Restore() + output := RestoreRegionLabels(&test.input) require.Equal(t, test.output, output) }) } diff --git a/pkg/ddl/label/rule.go b/pkg/ddl/label/rule.go index e3d08999ee8ed..2d5316a7dfe89 100644 --- a/pkg/ddl/label/rule.go +++ b/pkg/ddl/label/rule.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" + pd "github.com/tikv/pd/client/http" "gopkg.in/yaml.v2" ) @@ -53,13 +54,7 @@ var ( ) // Rule is used to establish the relationship between labels and a key range. -type Rule struct { - ID string `json:"id"` - Index int `json:"index"` - Labels Labels `json:"labels"` - RuleType string `json:"rule_type"` - Data []interface{} `json:"data"` -} +type Rule pd.LabelRule // NewRule creates a rule. func NewRule() *Rule { @@ -69,7 +64,7 @@ func NewRule() *Rule { // ApplyAttributesSpec will transfer attributes defined in AttributesSpec to the labels. func (r *Rule) ApplyAttributesSpec(spec *ast.AttributesSpec) error { if spec.Default { - r.Labels = []Label{} + r.Labels = []pd.RegionLabel{} return nil } // construct a string list @@ -129,26 +124,27 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule { } if !hasDBKey { - r.Labels = append(r.Labels, Label{Key: dbKey, Value: dbName}) + r.Labels = append(r.Labels, pd.RegionLabel{Key: dbKey, Value: dbName}) } if !hasTableKey { - r.Labels = append(r.Labels, Label{Key: tableKey, Value: tableName}) + r.Labels = append(r.Labels, pd.RegionLabel{Key: tableKey, Value: tableName}) } if isPartition && !hasPartitionKey { - r.Labels = append(r.Labels, Label{Key: partitionKey, Value: partName}) + r.Labels = append(r.Labels, pd.RegionLabel{Key: partitionKey, Value: partName}) } r.RuleType = ruleType - r.Data = []interface{}{} + dataSlice := make([]interface{}, 0, len(ids)) slices.Sort(ids) for i := 0; i < len(ids); i++ { data := map[string]string{ "start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]))), "end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]+1))), } - r.Data = append(r.Data, data) + dataSlice = append(dataSlice, data) } + r.Data = dataSlice // We may support more types later. r.Index = RuleIndexTable if isPartition { @@ -157,16 +153,14 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule { return r } -// RulePatch is the patch to update the label rules. -type RulePatch struct { - SetRules []*Rule `json:"sets"` - DeleteRules []string `json:"deletes"` -} - // NewRulePatch returns a patch of rules which need to be set or deleted. -func NewRulePatch(setRules []*Rule, deleteRules []string) *RulePatch { - return &RulePatch{ - SetRules: setRules, +func NewRulePatch(setRules []*Rule, deleteRules []string) *pd.LabelRulePatch { + labelRules := make([]*pd.LabelRule, 0, len(setRules)) + for _, rule := range setRules { + labelRules = append(labelRules, (*pd.LabelRule)(rule)) + } + return &pd.LabelRulePatch{ + SetRules: labelRules, DeleteRules: deleteRules, } } diff --git a/pkg/ddl/label/rule_test.go b/pkg/ddl/label/rule_test.go index f392e3dc58c9e..69928b3614e89 100644 --- a/pkg/ddl/label/rule_test.go +++ b/pkg/ddl/label/rule_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) func TestApplyAttributesSpec(t *testing.T) { @@ -74,13 +75,13 @@ func TestReset(t *testing.T) { require.Equal(t, "t1", rule.Labels[2].Value) require.Equal(t, rule.Index, 2) - r := rule.Data[0].(map[string]string) + r := rule.Data.([]interface{})[0].(map[string]string) require.Equal(t, "7480000000000000ff0100000000000000f8", r["start_key"]) require.Equal(t, "7480000000000000ff0200000000000000f8", r["end_key"]) - r = rule.Data[1].(map[string]string) + r = rule.Data.([]interface{})[1].(map[string]string) require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"]) require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"]) - r = rule.Data[2].(map[string]string) + r = rule.Data.([]interface{})[2].(map[string]string) require.Equal(t, "7480000000000000ff0300000000000000f8", r["start_key"]) require.Equal(t, "7480000000000000ff0400000000000000f8", r["end_key"]) @@ -96,14 +97,14 @@ func TestReset(t *testing.T) { require.Equal(t, "p2", rule.Labels[3].Value) require.Equal(t, rule.Index, 3) - r = r2.Data[0].(map[string]string) + r = r2.Data.([]interface{})[0].(map[string]string) require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"]) require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"]) // default case spec = &ast.AttributesSpec{Default: true} rule, expected := NewRule(), NewRule() - expected.ID, expected.Labels = "schema/db3/t3/p3", []Label{} + expected.ID, expected.Labels = "schema/db3/t3/p3", []pd.RegionLabel{} require.NoError(t, rule.ApplyAttributesSpec(spec)) r3 := rule.Reset("db3", "t3", "p3", 3) require.Equal(t, r3, expected) diff --git a/pkg/ddl/main_test.go b/pkg/ddl/main_test.go index a763a5dff4ff1..405ea5594706a 100644 --- a/pkg/ddl/main_test.go +++ b/pkg/ddl/main_test.go @@ -54,7 +54,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 1e6316e6f77a5..7ebba13273101 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -1887,14 +1887,14 @@ func getTableInfoWithOriginalPartitions(t *model.TableInfo, oldIDs []int64, newI return nt } -func dropLabelRules(_ *ddlCtx, schemaName, tableName string, partNames []string) error { +func dropLabelRules(d *ddlCtx, schemaName, tableName string, partNames []string) error { deleteRules := make([]string, 0, len(partNames)) for _, partName := range partNames { deleteRules = append(deleteRules, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, schemaName, tableName, partName)) } // delete batch rules patch := label.NewRulePatch([]*label.Rule{}, deleteRules) - return infosync.UpdateLabelRules(context.TODO(), patch) + return infosync.UpdateLabelRules(d.ctx, patch) } // onDropTablePartition deletes old partition meta. diff --git a/pkg/ddl/tests/serial/main_test.go b/pkg/ddl/tests/serial/main_test.go index bb582f02785d0..73100516be9f6 100644 --- a/pkg/ddl/tests/serial/main_test.go +++ b/pkg/ddl/tests/serial/main_test.go @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index 0641165203a8a..7f70278cf5dd6 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -96,6 +96,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@com_github_tikv_pd_client//resource_group/controller", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", diff --git a/pkg/domain/db_test.go b/pkg/domain/db_test.go index bcb0d3f53c3dc..983e526745c93 100644 --- a/pkg/domain/db_test.go +++ b/pkg/domain/db_test.go @@ -74,7 +74,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 60e5cc444ac16..d9bd664fd036f 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -90,6 +90,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" rmclient "github.com/tikv/pd/client/resource_group/controller" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -1186,11 +1187,11 @@ func (do *Domain) Init( } // step 1: prepare the info/schema syncer which domain reload needed. - pdCli := do.GetPDClient() + pdCli, pdHTTPCli := do.GetPDClient(), do.GetPDHTTPClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, - do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(), - skipRegisterToDashboard) + do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli, + do.Store().GetCodec(), skipRegisterToDashboard) if err != nil { return err } @@ -1629,6 +1630,14 @@ func (do *Domain) GetPDClient() pd.Client { return nil } +// GetPDHTTPClient returns the PD HTTP client. +func (do *Domain) GetPDHTTPClient() pdhttp.Client { + if store, ok := do.store.(kv.StorageWithPD); ok { + return store.GetPDHTTPClient() + } + return nil +} + // LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it // should be called only once in BootstrapSession. func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error { diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 1278fd2469da7..cbb60fc3f573b 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -194,7 +194,7 @@ func GlobalInfoSyncerInit( id string, serverIDGetter func() uint64, etcdCli, unprefixedEtcdCli *clientv3.Client, - pdCli pd.Client, + pdCli pd.Client, pdHTTPCli pdhttp.Client, codec tikv.Codec, skipRegisterToDashBoard bool, ) (*InfoSyncer, error) { @@ -209,7 +209,10 @@ func GlobalInfoSyncerInit( if err != nil { return nil, err } - is.labelRuleManager = initLabelRuleManager(etcdCli) + if pdHTTPCli != nil { + pdHTTPCli = pdHTTPCli.WithRespHandler(pdResponseHandler) + } + is.labelRuleManager = initLabelRuleManager(pdHTTPCli) is.placementManager = initPlacementManager(etcdCli) is.scheduleManager = initScheduleManager(etcdCli) is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec) @@ -244,11 +247,11 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager { return is.managerMu.SessionManager } -func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager { - if etcdCli == nil { +func initLabelRuleManager(pdHTTPCli pdhttp.Client) LabelRuleManager { + if pdHTTPCli == nil { return &mockLabelManager{labelRules: map[string][]byte{}} } - return &PDLabelManager{etcdCli: etcdCli} + return &PDLabelManager{pdHTTPCli} } func initPlacementManager(etcdCli *clientv3.Client) PlacementManager { @@ -446,6 +449,29 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m return progress, nil } +// pdResponseHandler will be injected into the PD HTTP client to handle the response, +// this is to maintain consistency with the logic in the `doRequest`. +func pdResponseHandler(resp *http.Response) error { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + terror.Log(resp.Body.Close()) + return err + } + if resp.StatusCode != http.StatusOK { + logutil.BgLogger().Warn("response not 200", + zap.String("method", resp.Request.Method), + zap.String("host", resp.Request.URL.Host), + zap.String("url", resp.Request.URL.RequestURI()), + zap.Int("http status", resp.StatusCode), + ) + if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusPreconditionFailed { + err = ErrHTTPServiceError.FastGen("%s", bodyBytes) + } + } + terror.Log(resp.Body.Close()) + return err +} + // TODO: replace with the unified PD HTTP client. func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) { var err error @@ -1052,7 +1078,7 @@ func PutLabelRule(ctx context.Context, rule *label.Rule) error { } // UpdateLabelRules synchronizes the label rule to PD. -func UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { +func UpdateLabelRules(ctx context.Context, patch *pdhttp.LabelRulePatch) error { if patch == nil || (len(patch.DeleteRules) == 0 && len(patch.SetRules) == 0) { return nil } diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index e56aebb054d7a..79c8ebf37e0a6 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -69,7 +69,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, keyspace.CodecV1, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -154,7 +154,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -218,7 +218,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, nil, keyspace.CodecV1, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/pkg/domain/infosync/label_manager.go b/pkg/domain/infosync/label_manager.go index a6b95ce964991..0c5d0296136a4 100644 --- a/pkg/domain/infosync/label_manager.go +++ b/pkg/domain/infosync/label_manager.go @@ -15,79 +15,59 @@ package infosync import ( - "bytes" "context" "encoding/json" - "path" "sync" "github.com/pingcap/tidb/pkg/ddl/label" pd "github.com/tikv/pd/client/http" - clientv3 "go.etcd.io/etcd/client/v3" ) // LabelRuleManager manages label rules type LabelRuleManager interface { PutLabelRule(ctx context.Context, rule *label.Rule) error - UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error + UpdateLabelRules(ctx context.Context, patch *pd.LabelRulePatch) error GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) } // PDLabelManager manages rules with pd type PDLabelManager struct { - etcdCli *clientv3.Client + pdHTTPCli pd.Client } // PutLabelRule implements PutLabelRule func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) error { - r, err := json.Marshal(rule) - if err != nil { - return err - } - _, err = doRequest(ctx, "PutLabelRule", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) - return err + return lm.pdHTTPCli.SetRegionLabelRule(ctx, (*pd.LabelRule)(rule)) } // UpdateLabelRules implements UpdateLabelRules -func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { - r, err := json.Marshal(patch) - if err != nil { - return err - } - - _, err = doRequest(ctx, "UpdateLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) - return err +func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *pd.LabelRulePatch) error { + return lm.pdHTTPCli.PatchRegionLabelRules(ctx, patch) } // GetAllLabelRules implements GetAllLabelRules func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { - var rules []*label.Rule - res, err := doRequest(ctx, "GetAllLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules"), "GET", nil) - - if err == nil && res != nil { - err = json.Unmarshal(res, &rules) + labelRules, err := lm.pdHTTPCli.GetAllRegionLabelRules(ctx) + if err != nil { + return nil, err } - return rules, err + r := make([]*label.Rule, 0, len(labelRules)) + for _, labelRule := range labelRules { + r = append(r, (*label.Rule)(labelRule)) + } + return r, nil } // GetLabelRules implements GetLabelRules func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) { - ids, err := json.Marshal(ruleIDs) + labelRules, err := lm.pdHTTPCli.GetAllRegionLabelRules(ctx) if err != nil { return nil, err } - - rules := []*label.Rule{} - res, err := doRequest(ctx, "GetLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) - - if err == nil && res != nil { - err = json.Unmarshal(res, &rules) - } - - ruleMap := make(map[string]*label.Rule, len((rules))) - for _, r := range rules { - ruleMap[r.ID] = r + ruleMap := make(map[string]*label.Rule, len((labelRules))) + for _, r := range labelRules { + ruleMap[r.ID] = (*label.Rule)(r) } return ruleMap, err } @@ -113,7 +93,7 @@ func (mm *mockLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) } // UpdateLabelRules implements UpdateLabelRules -func (mm *mockLabelManager) UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { +func (mm *mockLabelManager) UpdateLabelRules(ctx context.Context, patch *pd.LabelRulePatch) error { mm.Lock() defer mm.Unlock() if patch == nil { diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index 9fabb6cfbb8b5..ad0ee0dc25da3 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -3200,7 +3200,7 @@ func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is info rules = []*label.Rule{ { ID: "schema/test/test_label", - Labels: []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, + Labels: []pd.RegionLabel{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, RuleType: "key-range", Data: convert(map[string]interface{}{ "start_key": "7480000000000000ff395f720000000000fa", @@ -3209,7 +3209,7 @@ func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is info }, { ID: "invalidIDtest", - Labels: []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, + Labels: []pd.RegionLabel{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, RuleType: "key-range", Data: convert(map[string]interface{}{ "start_key": "7480000000000000ff395f720000000000fa", @@ -3218,7 +3218,7 @@ func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is info }, { ID: "schema/test/test_label", - Labels: []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, + Labels: []pd.RegionLabel{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}}, RuleType: "key-range", Data: convert(map[string]interface{}{ "start_key": "aaaaa", @@ -3259,9 +3259,9 @@ func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is info continue } - labels := rule.Labels.Restore() + labels := label.RestoreRegionLabels(&rule.Labels) var ranges []string - for _, data := range rule.Data { + for _, data := range rule.Data.([]interface{}) { if kv, ok := data.(map[string]interface{}); ok { startKey := kv["start_key"] endKey := kv["end_key"] @@ -3462,11 +3462,12 @@ func checkRule(rule *label.Rule) (dbName, tableName string, partitionName string } func decodeTableIDFromRule(rule *label.Rule) (tableID int64, err error) { - if len(rule.Data) == 0 { + datas := rule.Data.([]interface{}) + if len(datas) == 0 { err = fmt.Errorf("there is no data in rule %s", rule.ID) return } - data := rule.Data[0] + data := datas[0] dataMap, ok := data.(map[string]interface{}) if !ok { err = fmt.Errorf("get the label rules %s failed", rule.ID) diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index 99ba85562acdd..61433c6824390 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "@com_github_tikv_client_go_v2//tikvrpc/interceptor", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 6a2d547f284ae..f82c7919e5180 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/atomic" ) @@ -715,6 +716,7 @@ type EtcdBackend interface { // StorageWithPD is used to get pd client. type StorageWithPD interface { GetPDClient() pd.Client + GetPDHTTPClient() pdhttp.Client } // FnKeyCmp is the function for iterator the keys diff --git a/pkg/server/stat_test.go b/pkg/server/stat_test.go index 31f7983cd000d..5b0fca478780d 100644 --- a/pkg/server/stat_test.go +++ b/pkg/server/stat_test.go @@ -48,7 +48,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), dom.GetPDHTTPClient(), keyspace.CodecV1, true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/pkg/store/gcworker/gc_worker.go b/pkg/store/gcworker/gc_worker.go index caedf9e9ec33e..b090f082e6582 100644 --- a/pkg/store/gcworker/gc_worker.go +++ b/pkg/store/gcworker/gc_worker.go @@ -2017,7 +2017,7 @@ func getGCRules(ids []int64, rules map[string]*label.Rule) []string { var gcRules []string for _, rule := range rules { find := false - for _, d := range rule.Data { + for _, d := range rule.Data.([]interface{}) { if r, ok := d.(map[string]interface{}); ok { nowRange := fmt.Sprintf("%s%s", r["start_key"], r["end_key"]) if _, ok := oldRange[nowRange]; ok { From 76d5266d163c037159d74dea984bdd282a6b89d2 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 21 Nov 2023 16:08:37 +0800 Subject: [PATCH 2/4] Fix PD HTTP client WithRespHandler Signed-off-by: JmPotato --- DEPS.bzl | 12 ++++++------ go.mod | 2 +- go.sum | 4 ++-- pkg/domain/infosync/info.go | 37 +++++++++++++++++++++---------------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index f4e3876b2ce85..3c72cb69803bb 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7132,13 +7132,13 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "59cdbbd817215302d32ea46520891e71afb4fbc8b8c5e8543eb23043e19efdfc", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231121064140-2349f011e4ef", + sha256 = "440821579da980d0405695b463da892608a59252a296cd7e52b4f97881c5fdb7", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231121080541-8919bc11f770", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121064140-2349f011e4ef.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 36d44a22c2c18..170be161d2ddf 100644 --- a/go.mod +++ b/go.mod @@ -103,7 +103,7 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 - github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef + github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770 github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index d7f36145553c0..2004d8b946856 100644 --- a/go.sum +++ b/go.sum @@ -993,8 +993,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE= github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g= -github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef h1:LA4n4tKcdK5tu1AesE8rEVHYaAeZMoKYqJ/qX9wrvyw= -github.com/tikv/pd/client v0.0.0-20231121064140-2349f011e4ef/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770 h1:YSXDKT9+KngRSAShoSQVKD/CK1kR4X/9hutKkSK9gn0= +github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index cbb60fc3f573b..8873d48f30543 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -451,32 +451,37 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m // pdResponseHandler will be injected into the PD HTTP client to handle the response, // this is to maintain consistency with the logic in the `doRequest`. -func pdResponseHandler(resp *http.Response) error { +func pdResponseHandler(resp *http.Response, res interface{}) error { + defer func() { terror.Log(resp.Body.Close()) }() bodyBytes, err := io.ReadAll(resp.Body) if err != nil { - terror.Log(resp.Body.Close()) return err } - if resp.StatusCode != http.StatusOK { - logutil.BgLogger().Warn("response not 200", - zap.String("method", resp.Request.Method), - zap.String("host", resp.Request.URL.Host), - zap.String("url", resp.Request.URL.RequestURI()), - zap.Int("http status", resp.StatusCode), - ) - if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusPreconditionFailed { - err = ErrHTTPServiceError.FastGen("%s", bodyBytes) + if resp.StatusCode == http.StatusOK { + if res != nil { + return json.Unmarshal(bodyBytes, res) } + return nil } - terror.Log(resp.Body.Close()) - return err + logutil.BgLogger().Warn("response not 200", + zap.String("method", resp.Request.Method), + zap.String("host", resp.Request.URL.Host), + zap.String("url", resp.Request.URL.RequestURI()), + zap.Int("http status", resp.StatusCode), + ) + if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusPreconditionFailed { + return ErrHTTPServiceError.FastGen("%s", bodyBytes) + } + return nil } // TODO: replace with the unified PD HTTP client. func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) { - var err error - var req *http.Request - var res *http.Response + var ( + err error + req *http.Request + res *http.Response + ) for idx, addr := range addrs { url := util2.ComposeURL(addr, route) req, err = http.NewRequestWithContext(ctx, method, url, body) From e6bed4213b0f20aec25d6746054578786c59a52f Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 21 Nov 2023 17:29:46 +0800 Subject: [PATCH 3/4] Fix the HTTP handler tests Signed-off-by: JmPotato --- pkg/server/handler/tests/BUILD.bazel | 1 + pkg/server/handler/tests/http_handler_test.go | 15 ++++++--------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index 7acaf6cb34af5..f992b3170f853 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -28,6 +28,7 @@ go_test( "//pkg/server/handler/optimizor", "//pkg/server/handler/tikvhandler", "//pkg/server/internal/testserverclient", + "//pkg/server/internal/testutil", "//pkg/server/internal/util", "//pkg/session", "//pkg/sessionctx", diff --git a/pkg/server/handler/tests/http_handler_test.go b/pkg/server/handler/tests/http_handler_test.go index 4113a3ef42675..7610e5d3d9bac 100644 --- a/pkg/server/handler/tests/http_handler_test.go +++ b/pkg/server/handler/tests/http_handler_test.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/server/handler/optimizor" "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/server/internal/testserverclient" + "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" @@ -449,13 +450,7 @@ func TestBinlogRecover(t *testing.T) { func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) { var err error - ts.Port = uint(rand.Int31n(50000)) + 10000 - ts.StatusPort = ts.Port + 1 - ts.store, err = mockstore.NewMockStore( - mockstore.WithTiKVOptions( - tikv.WithPDHTTPClient([]string{ts.Addr()}), - ), - ) + ts.store, err = mockstore.NewMockStore() require.NoError(t, err) ts.domain, err = session.BootstrapSession(ts.store) require.NoError(t, err) @@ -463,12 +458,14 @@ func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) { cfg := util.NewTestConfig() cfg.Store = "tikv" - cfg.Port = ts.Port - cfg.Status.StatusPort = ts.StatusPort + cfg.Port = 0 + cfg.Status.StatusPort = 0 cfg.Status.ReportStatus = true server, err := server2.NewServer(cfg, ts.tidbdrv) require.NoError(t, err) + ts.Port = testutil.GetPortFromTCPAddr(server.ListenAddr()) + ts.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr()) ts.server = server ts.server.SetDomain(ts.domain) go func() { From fb36395c7aef601085cadf0a173930bb99995bf8 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 22 Nov 2023 13:40:23 +0800 Subject: [PATCH 4/4] Check bodyBytes before unmarshal Signed-off-by: JmPotato --- pkg/domain/infosync/info.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 8873d48f30543..d60806c0f941b 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -458,7 +458,7 @@ func pdResponseHandler(resp *http.Response, res interface{}) error { return err } if resp.StatusCode == http.StatusOK { - if res != nil { + if res != nil && bodyBytes != nil { return json.Unmarshal(bodyBytes, res) } return nil