Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Nov 27, 2024
1 parent 4f18553 commit 1af9276
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 20 deletions.
57 changes: 38 additions & 19 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,9 +783,9 @@ func (s *RegionRequestSender) SendReqCtx(
}
}()

var staleReadCollector *staleReadMetricsCollector
// var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{}
// staleReadCollector = &staleReadMetricsCollector{}
defer func() {
if retryTimes == 0 {
metrics.StaleReadHitCounter.Add(1)
Expand Down Expand Up @@ -839,15 +839,22 @@ func (s *RegionRequestSender) SendReqCtx(
}
execDetails := bo.GetCtx().Value(util.ExecDetailsKey)
trafficCollector := &networkCollector{}
var isLocalTraffic bool
isTiflashTarget := et == tikvrpc.TiFlash
if staleReadCollector != nil && s.replicaSelector != nil && s.replicaSelector.target != nil {
isLocalTraffic = s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels)
staleReadCollector.onReq(req, isLocalTraffic)
// patch the access location if it is not set under region request sender. which inculdes the coprocessor,
// txn relative tikv request.
// note: MPP not use this path. need specified in the MPP layer.
if s.replicaSelector != nil &&
s.replicaSelector.target != nil &&
req.AccessLocation == kv.AccessUnknown &&
len(s.replicaSelector.option.labels) != 0 {
if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) {
req.AccessLocation = kv.AccessLocalZone
} else {
req.AccessLocation = kv.AccessCrossZone
}
}
if execDetails != nil {
detail := execDetails.(*util.ExecDetails)
trafficCollector.onReq(req, detail, isLocalTraffic, isTiflashTarget)
trafficCollector.onReq(req, detail)
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
Expand Down Expand Up @@ -931,13 +938,10 @@ func (s *RegionRequestSender) SendReqCtx(
s.replicaSelector.onSendSuccess(req)
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
}

if execDetails != nil {
detail := execDetails.(*util.ExecDetails)
trafficCollector.onResp(req.Type, resp, detail, isLocalTraffic, isTiflashTarget)
trafficCollector.onResp(req, resp, detail)
}
return resp, rpcCtx, retryTimes, nil
}
Expand Down Expand Up @@ -1771,7 +1775,7 @@ func (s *RegionRequestSender) onRegionError(
type staleReadMetricsCollector struct {
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
Expand All @@ -1786,6 +1790,7 @@ func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic b
// ignore non-read requests
return
}
isLocalTraffic := req.AccessLocation == kv.AccessLocalZone
size += req.Context.Size()
if isLocalTraffic {
metrics.StaleReadLocalOutBytes.Add(float64(size))
Expand All @@ -1796,9 +1801,9 @@ func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic b
}
}

func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
func (s *staleReadMetricsCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response) {
size := 0
switch tp {
switch req.Type {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand All @@ -1811,6 +1816,7 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res
// ignore non-read requests
return
}
isLocalTraffic := req.AccessLocation == kv.AccessLocalZone
if isLocalTraffic {
metrics.StaleReadLocalInBytes.Add(float64(size))
} else {
Expand All @@ -1819,9 +1825,10 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res
}

type networkCollector struct {
staleReadMetricsCollector
}

func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails, isLocalTraffic bool, isTiflashTarget bool) {
func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
Expand All @@ -1845,6 +1852,8 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
return
}
size += req.Context.Size()
isLocalTraffic := req.AccessLocation == kv.AccessInterZone

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: kv.AccessInterZone) (typecheck)

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: kv.AccessInterZone) (typecheck)

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: kv.AccessInterZone) (typecheck)

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: kv.AccessInterZone (typecheck)

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / race-test

undefined: kv.AccessInterZone

Check failure on line 1855 in internal/locate/region_request.go

View workflow job for this annotation

GitHub Actions / test

undefined: kv.AccessInterZone
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
total = &details.BytesSendMPPTotal
Expand All @@ -1858,11 +1867,15 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
if !isLocalTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onReq(req)
}
}

func (s *networkCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, details *util.ExecDetails, isLocalTraffic bool, isTiflashTarget bool) {
func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response, details *util.ExecDetails) {
size := 0
switch tp {
switch req.Type {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand Down Expand Up @@ -1891,6 +1904,8 @@ func (s *networkCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, de
return
}
var total, crossZone *int64

isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
if isTiflashTarget {
total = &details.BytesReceivedMPPTotal
crossZone = &details.BytesReceivedMPPCrossZone
Expand All @@ -1900,9 +1915,13 @@ func (s *networkCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, de
}

atomic.AddInt64(total, int64(size))
if !isLocalTraffic {
if req.AccessLocation == kv.AccessCrossZone {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onResp(req, resp)
}
}

func patchRequestSource(req *tikvrpc.Request, replicaType string) {
Expand Down
8 changes: 8 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,11 @@ func (r ReplicaReadType) String() string {
return fmt.Sprintf("unknown-%v", byte(r))
}
}

type AccessLocationType byte

const (
AccessUnknown AccessLocationType = iota
AccessLocalZone
AccessCrossZone
)
3 changes: 2 additions & 1 deletion tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ type Request struct {
ReadType string
// InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info.
InputRequestSource string

// AccessLocationAttr indicates the request is sent to a different zone.
AccessLocation kv.AccessLocationType
// rev represents the revision of the request, it's increased when `Req.Context` gets patched.
rev uint32
}
Expand Down

0 comments on commit 1af9276

Please sign in to comment.