Skip to content

Commit

Permalink
Fix the failpoints
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 19, 2024
1 parent 3b56132 commit fd62812
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 46 deletions.
14 changes: 1 addition & 13 deletions client/sd/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,6 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
Expand Down Expand Up @@ -840,9 +837,6 @@ func (c *pdServiceDiscovery) initClusterID() error {
clusterID = members.GetHeader().GetClusterId()
continue
}
failpoint.Inject("skipClusterIDCheck", func() {
failpoint.Continue()
})
// All URLs passed in should have the same cluster ID.
if members.GetHeader().GetClusterId() != clusterID {
return errors.WithStack(errs.ErrUnmatchedClusterID)
Expand Down Expand Up @@ -885,13 +879,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error {
}

func (c *pdServiceDiscovery) updateMember() error {
for i, url := range c.GetServiceURLs() {
failpoint.Inject("skipFirstUpdateMember", func() {
if i == 0 {
failpoint.Continue()
}
})

for _, url := range c.GetServiceURLs() {
members, err := c.getMembers(c.ctx, url, UpdateMemberTimeout)
// Check the cluster ID.
updatedClusterID := members.GetHeader().GetClusterId()
Expand Down
8 changes: 4 additions & 4 deletions client/sd/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
re.False(follower.IsConnectedToLeader())
re.True(leader.IsConnectedToLeader())

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", "return(true)"))
follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
re.False(follower.Available())
re.False(leader.Available())
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))

follower.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
leader.(*pdServiceClient).checkNetworkAvailable(suite.ctx)
Expand Down Expand Up @@ -237,7 +237,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
followerAPIClient := newPDServiceAPIClient(follower, regionAPIErrorFn)
leaderAPIClient := newPDServiceAPIClient(leader, regionAPIErrorFn)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastCheckAvailable", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/fastCheckAvailable", "return(true)"))

re.True(followerAPIClient.Available())
re.True(leaderAPIClient.Available())
Expand Down Expand Up @@ -269,7 +269,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
re.True(followerAPIClient.Available())
re.True(leaderAPIClient.Available())

re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastCheckAvailable"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/fastCheckAvailable"))
}

func (suite *serviceClientTestSuite) TestServiceClientBalancer() {
Expand Down
6 changes: 3 additions & 3 deletions client/utils/mics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 TiKV Project Authors.
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,12 +15,12 @@
package utils

const (
// DefaultKeyspaceID is the default key space id.
// DefaultKeyspaceID is the default keyspace ID.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized
// when PD bootstrap and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)
// NullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
// NullKeyspaceID is used for API v1 or legacy path where is keyspace agnostic.
NullKeyspaceID = uint32(0xFFFFFFFF)
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
Expand Down
24 changes: 12 additions & 12 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestTSOFollowerProxy(t *testing.T) {

func TestTSOFollowerProxyWithTSOService(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
Expand All @@ -346,7 +346,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) {
// TSO service does not support the follower proxy, so enabling it should fail.
err = cli.UpdateOption(opt.EnableTSOFollowerProxy, true)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/fastUpdateServiceMode"))
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
Expand Down Expand Up @@ -497,13 +497,13 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwardin

cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true))
defer cli.Close()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", "return(true)"))
time.Sleep(200 * time.Millisecond)
r, err := cli.GetRegion(context.Background(), []byte("a"))
re.NoError(err)
re.NotNil(r)

re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))
time.Sleep(200 * time.Millisecond)
r, err = cli.GetRegion(context.Background(), []byte("a"))
re.NoError(err)
Expand Down Expand Up @@ -719,7 +719,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {

// because we can't check whether this request is processed by followers from response,
// we can disable forward and make network problem for leader.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr())))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr())))
time.Sleep(150 * time.Millisecond)
cnt = 0
for range 100 {
Expand All @@ -730,11 +730,11 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {
re.Equal(resp.Meta.Id, suite.regionID)
}
re.Equal(100, cnt)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))

// make network problem for follower.
follower := cluster.GetServer(cluster.GetFollower())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", follower.GetAddr())))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", follower.GetAddr())))
time.Sleep(100 * time.Millisecond)
cnt = 0
for range 100 {
Expand All @@ -745,7 +745,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {
re.Equal(resp.Meta.Id, suite.regionID)
}
re.Equal(100, cnt)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))

// follower client failed will retry by leader service client.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/followerHandleError", "return(true)"))
Expand All @@ -761,8 +761,8 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/followerHandleError"))

// test after being healthy
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr())))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastCheckAvailable", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr())))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/fastCheckAvailable", "return(true)"))
time.Sleep(100 * time.Millisecond)
cnt = 0
for range 100 {
Expand All @@ -773,8 +773,8 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {
re.Equal(resp.Meta.Id, suite.regionID)
}
re.Equal(100, cnt)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastCheckAvailable"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/fastCheckAvailable"))
}

func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() {
Expand Down
8 changes: 4 additions & 4 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func NewAPIServerForward(re *require.Assertions) APIServerForward {
re.NoError(suite.pdLeader.BootstrapCluster())
suite.addRegions()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/usePDServiceMode", "return(true)"))
suite.pdClient, err = pd.NewClientWithContext(context.Background(),
[]string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1))
re.NoError(err)
Expand All @@ -265,7 +265,7 @@ func (suite *APIServerForward) ShutDown() {
}
suite.cluster.Destroy()
suite.cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/usePDServiceMode"))
}

func TestForwardTSORelated(t *testing.T) {
Expand Down Expand Up @@ -591,7 +591,7 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
// If `EnableTSODynamicSwitching` is disabled, the PD should not provide TSO service after the TSO server is stopped.
func TestTSOServiceSwitch(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -663,7 +663,7 @@ func TestTSOServiceSwitch(t *testing.T) {

// Verify PD is now providing TSO service and timestamps are monotonically increasing
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/fastUpdateServiceMode"))
}

func checkTSOMonotonic(ctx context.Context, pdClient pd.Client, globalLastTS *uint64, count int) error {
Expand Down
20 changes: 10 additions & 10 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
failpointValue := fmt.Sprintf(`return(%d)`, keyspaceID)
// Simulate the case that the server has lower version than the client and returns no tso addrs
// in the GetClusterInfo RPC.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID", failpointValue))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/serverReturnsNoTSOAddrs", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unexpectedCallOfFindGroupByKeyspaceID", failpointValue))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/serverReturnsNoTSOAddrs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unexpectedCallOfFindGroupByKeyspaceID"))
}()

ctx, cancel := context.WithCancel(suite.ctx)
Expand Down Expand Up @@ -318,14 +318,14 @@ func (suite *tsoClientTestSuite) TestGetMinTS() {
}
wg.Wait()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/unreachableNetwork1", "return(true)"))
time.Sleep(time.Second)
testutil.Eventually(re, func() bool {
var err error
_, _, err = suite.clients[0].GetMinTS(suite.ctx)
return err == nil
})
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/unreachableNetwork1"))
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
Expand Down Expand Up @@ -485,10 +485,10 @@ func TestMixedTSODeployment(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/skipUpdateServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/skipUpdateServiceMode"))
}()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) {
backendEndpoints := pdLeader.GetAddr()

// Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/sd/usePDServiceMode", "return(true)"))
pdClient, err := pd.NewClientWithContext(context.Background(),
[]string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1))
re.NoError(err)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) {
tsoCluster.Destroy()
apiCluster.Destroy()
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/sd/usePDServiceMode"))
}

func checkTSO(
Expand Down

0 comments on commit fd62812

Please sign in to comment.