Skip to content

Commit

Permalink
add new TestSnapshotByMockingPartition using blackholing.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Apr 6, 2024
1 parent f018db0 commit 3108ce6
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 25 deletions.
152 changes: 127 additions & 25 deletions tests/e2e/etcd_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
Expand Down Expand Up @@ -58,18 +60,32 @@ func clusterTestCases(size int) []clusterTestCase {
}

if fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
tcs = append(tcs, clusterTestCase{
name: "LastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)),
})
if size > 2 {
tcs = append(tcs, clusterTestCase{
name: "MinorityLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)),
tcs = append(tcs,
clusterTestCase{
name: "LastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)),
}, clusterTestCase{
name: "QuorumLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)),
})
name: "LastVersionPeerTLS",
config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.LastVersion)),
},
)
if size > 2 {
tcs = append(tcs,
clusterTestCase{
name: "MinorityLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)),
}, clusterTestCase{
name: "QuorumLastVersion",
config: e2e.NewConfig(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)),
},
clusterTestCase{
name: "MinorityLastVersionPeerTLS",
config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.MinorityLastVersion)),
}, clusterTestCase{
name: "QuorumLastVersionPeerTLS",
config: e2e.NewConfigPeerTLS().With(e2e.WithClusterSize(size), e2e.WithVersion(e2e.QuorumLastVersion)),
},
)
}
}
return tcs
Expand Down Expand Up @@ -112,12 +128,7 @@ func snapshotTestByAddingMember(t *testing.T, clusterConfig *e2e.EtcdProcessClus
}()

t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
for i := 0; i < 20; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
writeKVs(t, epc.Etcdctl(), 0, 20)

t.Log("Start a new etcd instance, which will receive a snapshot from the leader.")
newCfg := *epc.Cfg
Expand Down Expand Up @@ -161,12 +172,7 @@ func snapshotTestByRestartingMember(t *testing.T, clusterConfig *e2e.EtcdProcess
require.NoError(t, err)

t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)")
for i := 0; i < 20; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err = epc.Etcdctl().Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
writeKVs(t, epc.Etcdctl(), 0, 20)

t.Log("Verify logs to check leader has saved snapshot")
leaderEPC := epc.Procs[epc.WaitLeader(t)]
Expand All @@ -179,13 +185,90 @@ func snapshotTestByRestartingMember(t *testing.T, clusterConfig *e2e.EtcdProcess
assertKVHash(t, clusterConfig.ClusterSize, epc)

// assert process logs to check snapshot be sent
if clusterConfig.Version == e2e.CurrentVersion || clusterConfig.Version == e2e.MinorityLastVersion {
leaderEPC = epc.Procs[epc.WaitLeader(t)]
if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd {
t.Log("Verify logs to check snapshot be sent from leader to follower")
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
}

func TestSnapshotByMockingPartition(t *testing.T) {
mockPartitionNodeIndex := 2
for _, tc := range clusterTestCases(3) {
if !tc.config.IsPeerTLS {
continue
}
t.Run(tc.name, func(t *testing.T) {
snapshotTestByMockingPartition(t, tc.config, mockPartitionNodeIndex)
})
}
}

func snapshotTestByMockingPartition(t *testing.T, clusterConfig *e2e.EtcdProcessClusterConfig, mockPartitionNodeIndex int) {
e2e.BeforeTest(t)

t.Logf("Create an etcd cluster with %d member\n", clusterConfig.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithConfig(clusterConfig),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithPeerProxy(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
require.NoError(t, epc.Close(), "failed to close etcd cluster")
}()

leaderId := epc.WaitLeader(t)
partitionedMember := epc.Procs[mockPartitionNodeIndex]
if leaderId != mockPartitionNodeIndex {
partitionedMemberId, err := epc.MemberId(mockPartitionNodeIndex)
require.NoError(t, err)
// If the partitioned member is not the original leader, Blackhole would not block all its communication with other members.
t.Logf("Move leader to Proc[%d]: %d\n", mockPartitionNodeIndex, partitionedMemberId)
require.NoError(t, epc.Etcdctl().MoveLeader(context.TODO(), partitionedMemberId))
epc.WaitLeader(t)
}
// Mock partition
proxy := partitionedMember.PeerProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
time.Sleep(2 * time.Second)

t.Logf("Wait for new leader election with remaining members")
leaderEPC := epc.Procs[waitLeader(t, epc, mockPartitionNodeIndex)]
t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)")
writeKVs(t, leaderEPC.Etcdctl(), 0, 20)
e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot")
assertRevision(t, leaderEPC, 21)
assertRevision(t, partitionedMember, 1)

// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()

assertKVHash(t, clusterConfig.ClusterSize, epc)

// assert process logs to check snapshot be sent
leaderEPC = epc.Procs[epc.WaitLeader(t)]
if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd {
t.Log("Verify logs to check snapshot be sent from leader to follower")
leaderEPC = epc.Procs[epc.WaitLeader(t)]
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
}

func writeKVs(t *testing.T, etcdctl *e2e.EtcdctlV3, startIdx, endIdx int) {
for i := startIdx; i < endIdx; i++ {
key := fmt.Sprintf("key-%d", i)
value := fmt.Sprintf("value-%d", i)
err := etcdctl.Put(context.TODO(), key, value, config.PutOptions{})
require.NoError(t, err, "failed to put %q, error: %v", key, err)
}
}

func assertKVHash(t *testing.T, clusterSize int, epc *e2e.EtcdProcessCluster) {
if clusterSize < 2 {
return
Expand All @@ -212,3 +295,22 @@ func assertKVHash(t *testing.T, clusterSize int, epc *e2e.EtcdProcessCluster) {
return true
}, 10*time.Second, 500*time.Millisecond)
}

func waitLeader(t testing.TB, epc *e2e.EtcdProcessCluster, excludeNode int) int {
var membs []e2e.EtcdProcess
for i := 0; i < len(epc.Procs); i++ {
if i == excludeNode {
continue
}
membs = append(membs, epc.Procs[i])
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, membs)
}

func assertRevision(t testing.TB, member e2e.EtcdProcess, expectedRevision int64) {
responses, err := member.Etcdctl().Status(context.TODO())
require.NoError(t, err)
assert.Equal(t, expectedRevision, responses[0].Header.Revision, "revision mismatch")
}
10 changes: 10 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,3 +1057,13 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi
t.Fatal("impossible path of execution")
return -1
}

// MemberId returns the MemberId of the ith Proc in the cluster.
func (epc *EtcdProcessCluster) MemberId(i int) (uint64, error) {
etcdctl := epc.Etcdctl()
memberList, err := etcdctl.MemberList(context.Background(), false)
if err != nil {
return 0, fmt.Errorf("failed to get member list: %w", err)
}
return findMemberIDByEndpoint(memberList.Members, epc.Procs[i].Config().ClientURL)
}
12 changes: 12 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,18 @@ func (ctl *EtcdctlV3) RoleDelete(ctx context.Context, role string) (*clientv3.Au
return &resp, err
}

func (ctl *EtcdctlV3) MoveLeader(ctx context.Context, transfereeID uint64) error {
args := []string{"move-leader", fmt.Sprintf("%x", transfereeID)}
cmd, err := SpawnCmd(ctl.cmdArgs(args...), nil)
if err != nil {
return err
}
defer cmd.Close()

_, err = cmd.ExpectWithContext(ctx, expect.ExpectedResponse{Value: "Leadership transferred"})
return err
}

func (ctl *EtcdctlV3) spawnJSONCmd(ctx context.Context, output any, args ...string) error {
args = append(args, "-w", "json")
cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), nil)
Expand Down

0 comments on commit 3108ce6

Please sign in to comment.