From 541981cc21b2c6093847525e146b50c620aa8f1d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 30 May 2024 14:23:50 +0800 Subject: [PATCH] Fix the multierr bug Signed-off-by: JmPotato --- client/errs/errs.go | 5 ++- client/go.mod | 2 +- client/go.sum | 3 +- client/http/client.go | 39 +++++++++--------- client/http/request_info.go | 4 +- client/retry/backoff.go | 16 ++------ client/retry/backoff_test.go | 2 +- tests/integrations/client/http_client_test.go | 41 +++++++++++++++++++ 8 files changed, 74 insertions(+), 38 deletions(-) diff --git a/client/errs/errs.go b/client/errs/errs.go index ee29e4c4349..2c25e009849 100644 --- a/client/errs/errs.go +++ b/client/errs/errs.go @@ -22,8 +22,11 @@ import ( "go.uber.org/zap/zapcore" ) -// IsLeaderChange will determine whether there is a leader change. +// IsLeaderChange will determine whether there is a leader/primary change. func IsLeaderChange(err error) bool { + if err == nil { + return false + } if err == ErrClientTSOStreamClosed { return true } diff --git a/client/go.mod b/client/go.mod index 89799796521..543f013fd11 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,7 +16,6 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.62.1 @@ -34,6 +33,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + go.uber.org/multierr v1.7.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/go.sum b/client/go.sum index 54942bb0bb8..a26571171ad 100644 --- a/client/go.sum +++ b/client/go.sum @@ -88,9 +88,8 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= diff --git a/client/http/client.go b/client/http/client.go index c90efcaa869..3abd8828e28 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -124,18 +124,20 @@ func (ci *clientInner) requestWithRetry( isLeader bool statusCode int err error - logFields = append(reqInfo.logFields(), - zap.String("source", ci.source), - zap.String("server-url", serverURL), - zap.Bool("is-leader", isLeader), - zap.Int("status-code", statusCode), - zap.Error(err)) + logFields = append(reqInfo.logFields(), zap.String("source", ci.source)) ) execFunc := func() error { defer func() { - // Handle some special status codes and errors to increase the success rate of the following requests. - ci.handleHTTPStatusCodeAndErr(statusCode, err) - log.Debug("[pd] http request finished", logFields...) + // - If the status code is 503, it indicates that there may be PD leader/follower changes. + // - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change. + if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) { + ci.sd.ScheduleCheckMemberChanged() + } + log.Info("[pd] http request finished", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) }() // It will try to send the request to the PD leader first and then try to send the request to the other PD followers. clients := ci.sd.GetAllServiceClients() @@ -154,7 +156,11 @@ func (ci *clientInner) requestWithRetry( if err == nil || noNeedRetry(statusCode) { return err } - log.Debug("[pd] http request url failed", logFields...) + log.Info("[pd] http request url failed", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) } if skipNum == len(clients) { return errs.ErrClientNoTargetMember @@ -174,14 +180,6 @@ func (ci *clientInner) requestWithRetry( return bo.Exec(ctx, execFunc) } -func (ci *clientInner) handleHTTPStatusCodeAndErr(code int, err error) { - // - If the status code is 503, it indicates that there may be PD leader/follower changes. - // - If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change. - if code == http.StatusServiceUnavailable || errs.IsLeaderChange(err) { - ci.sd.ScheduleCheckMemberChanged() - } -} - func noNeedRetry(statusCode int) bool { return statusCode == http.StatusNotFound || statusCode == http.StatusForbidden || @@ -245,11 +243,14 @@ func (ci *clientInner) doRequest( if readErr != nil { logFields = append(logFields, zap.NamedError("read-body-error", err)) } else { + bs = bytes.TrimSpace(bs) logFields = append(logFields, zap.ByteString("body", bs)) } log.Error("[pd] request failed with a non-200 status", logFields...) - return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status) + return resp.StatusCode, errors.Errorf( + "request pd http api failed with status: '%s', body: '%s'", resp.Status, bs, + ) } if res == nil { diff --git a/client/http/request_info.go b/client/http/request_info.go index b95c8ad7fa4..3fb91c6ca97 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -161,10 +161,10 @@ func (ri *requestInfo) getURL(addr string) string { func (ri *requestInfo) logFields() []zap.Field { return []zap.Field{ - zap.String("callerID", ri.callerID), + zap.String("caller-id", ri.callerID), zap.String("name", ri.name), zap.String("uri", ri.uri), zap.String("method", ri.method), - zap.String("targetURL", ri.targetURL), + zap.String("target-url", ri.targetURL), } } diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 6c72b68ab9d..4f0a8eca925 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -24,12 +24,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" - "go.uber.org/multierr" "go.uber.org/zap" ) -const maxRecordErrorCount = 20 - // Option is used to customize the backoffer. type Option func(*Backoffer) @@ -69,18 +66,13 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( - allErrors error - err error - after *time.Timer + err error + after *time.Timer ) fnName := getFunctionName(fn) for { err = fn() bo.attempt++ - if bo.attempt < maxRecordErrorCount { - // multierr.Append will ignore nil error. - allErrors = multierr.Append(allErrors, err) - } if !bo.isRetryable(err) { break } @@ -100,7 +92,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return multierr.Append(allErrors, errors.Trace(ctx.Err())) + return errors.Trace(ctx.Err()) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -115,7 +107,7 @@ func (bo *Backoffer) Exec( } } } - return allErrors + return err } // InitialBackoffer make the initial state for retrying. diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 8dd44033b55..35f6fca43a7 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -87,7 +87,7 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) - re.ErrorContains(err, "test; test; test; test") + re.ErrorContains(err, "test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index fa109946e4b..b873e689354 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -21,6 +21,7 @@ import ( "net/url" "sort" "strings" + "sync" "testing" "time" @@ -757,3 +758,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() { re.Equal("pd2", healths[1].Name) re.True(healths[0].Health && healths[1].Health) } + +func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0) + client := suite.client.WithBackoffer(bo) + for { + healths, err := client.GetHealthStatus(ctx) + if err != nil && strings.Contains(err.Error(), "context canceled") { + return + } + re.NoError(err) + re.Len(healths, 2) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + leader := suite.cluster.GetLeaderServer() + re.NotNil(leader) + for i := 0; i < 3; i++ { + leader.ResignLeader() + re.NotEmpty(suite.cluster.WaitLeader()) + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + } + + // Cancel the context to stop the goroutine. + cancel() + wg.Wait() +}