Skip to content

Commit

Permalink
Encapsulate more PD HTTP interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 20, 2023
1 parent 1e1817d commit d9e988c
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 21 deletions.
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
Expand All @@ -45,7 +45,10 @@ const (
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
Expand Down Expand Up @@ -123,6 +126,16 @@ func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// PlacementRuleBundleByGroup returns the path of PD HTTP API to get placement rule bundle by group.
func PlacementRuleBundleByGroup(group string) string {
return fmt.Sprintf("%s/%s", PlacementRuleBundle, group)
}

// PlacementRuleBundleWithPartialParameter returns the path of PD HTTP API to get placement rule bundle with partial parameter.
func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
147 changes: 131 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
/* Meta-related interfaces */
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
Expand All @@ -51,11 +52,24 @@ type Client interface {
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
WithRespHandler(func(resp *http.Response) error) Client
Close()
}

Expand All @@ -66,6 +80,8 @@ type client struct {
tlsConf *tls.Config
cli *http.Client

respHandler func(resp *http.Response) error

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}
Expand Down Expand Up @@ -143,6 +159,13 @@ func (c *client) Close() {
log.Info("[pd] http client closed")
}

// WithRespHandler sets the client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
func (c *client) WithRespHandler(handler func(resp *http.Response) error) Client {
c.respHandler = handler
return c

Check warning on line 166 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L165-L166

Added lines #L165 - L166 were not covered by tests
}

func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
Expand Down Expand Up @@ -204,6 +227,12 @@ func (c *client) request(
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)

// Give away the response handling to the caller if the handler is set.
if c.respHandler != nil {
return c.respHandler(resp)

Check warning on line 233 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L233

Added line #L233 was not covered by tests
}

defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -345,6 +374,30 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
return &stores, nil
}

// GetAllPlacementRuleBundles gets all placement rules bundles.
func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) {
var bundles []*GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundle", PlacementRuleBundle,
http.MethodGet, nil, &bundles)
if err != nil {
return nil, err

Check warning on line 384 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L384

Added line #L384 was not covered by tests
}
return bundles, nil
}

// GetPlacementRuleBundleByGroup gets the placement rules bundle by group.
func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) {
var bundle GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group),
http.MethodGet, nil, &bundle)
if err != nil {
return nil, err

Check warning on line 396 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L396

Added line #L396 was not covered by tests
}
return &bundle, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
Expand All @@ -368,13 +421,90 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
bundlesJSON, err := json.Marshal(bundles)
if err != nil {
return errors.Trace(err)

Check warning on line 429 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L429

Added line #L429 was not covered by tests
}
return c.requestWithRetry(ctx,
"SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial),
http.MethodPost, bytes.NewBuffer(bundlesJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
err := c.requestWithRetry(ctx,
"GetAllRegionLabelRules", RegionLabelRules,
http.MethodGet, nil, &labelRules)
if err != nil {
return nil, err

Check warning on line 450 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L450

Added line #L450 was not covered by tests
}
return labelRules, nil
}

// GetRegionLabelRulesByIDs gets the region label rules by IDs.
func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) {
idsJSON, err := json.Marshal(ruleIDs)
if err != nil {
return nil, errors.Trace(err)

Check warning on line 459 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L459

Added line #L459 was not covered by tests
}
var labelRules []*LabelRule
err = c.requestWithRetry(ctx,
"GetRegionLabelRulesByIDs", RegionLabelRulesByIDs,
http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules)
if err != nil {
return nil, err

Check warning on line 466 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L466

Added line #L466 was not covered by tests
}
return labelRules, nil
}

// SetRegionLabelRule sets the region label rule.
func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error {
labelRuleJSON, err := json.Marshal(labelRule)
if err != nil {
return errors.Trace(err)

Check warning on line 475 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L475

Added line #L475 was not covered by tests
}
return c.requestWithRetry(ctx,
"SetRegionLabelRule", RegionLabelRule,
http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil)
}

// PatchRegionLabelRules patches the region label rules.
func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error {
labelRulePatchJSON, err := json.Marshal(labelRulePatch)
if err != nil {
return errors.Trace(err)

Check warning on line 486 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L486

Added line #L486 was not covered by tests
}
return c.requestWithRetry(ctx,
"PatchRegionLabelRules", RegionLabelRules,
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)

Check warning on line 501 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L501

Added line #L501 was not covered by tests
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down Expand Up @@ -406,18 +536,3 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
31 changes: 31 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,34 @@ type Rule struct {
Version uint64 `json:"version,omitempty"` // only set at runtime, add 1 each time rules updated, begin from 0.
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}

// RegionLabel is the label of a region.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region.
type LabelRule struct {
ID string `json:"id"`
Index int `json:"index"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// LabelRulePatch is the patch to update the label rules.
type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}
Loading

0 comments on commit d9e988c

Please sign in to comment.