From 78aa51be7ea153a6773d421c0ac9852e3c5cd22c Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Tue, 3 Dec 2024 11:13:29 +0530 Subject: [PATCH] pickfirst: Stop test servers without closing listeners (#7872) --- .../pickfirstleaf/pickfirstleaf_ext_test.go | 99 +++++++++++-------- 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go index 69461625fbea..9e835a6731b8 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go @@ -67,20 +67,54 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +// testServer is a server than can be stopped and resumed without closing +// the listener. This guarantees the same port number (and address) is used +// after restart. When a server is stopped, it accepts and closes all tcp +// connections from clients. +type testServer struct { + stubserver.StubServer + lis *testutils.RestartableListener +} + +func (s *testServer) stop() { + s.lis.Stop() +} + +func (s *testServer) resume() { + s.lis.Restart() +} + +func newTestServer(t *testing.T) *testServer { + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + rl := testutils.NewRestartableListener(l) + ss := stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + Listener: rl, + } + return &testServer{ + StubServer: ss, + lis: rl, + } +} + // setupPickFirstLeaf performs steps required for pick_first tests. It starts a // bunch of backends exporting the TestService, and creates a ClientConn to them. func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, *backendManager) { t.Helper() r := manual.NewBuilderWithScheme("whatever") - backends := make([]*stubserver.StubServer, backendCount) + backends := make([]*testServer, backendCount) addrs := make([]resolver.Address, backendCount) for i := 0; i < backendCount; i++ { - backend := stubserver.StartTestService(t, nil) + server := newTestServer(t) + backend := stubserver.StartTestService(t, &server.StubServer) t.Cleanup(func() { backend.Stop() }) - backends[i] = backend + backends[i] = server addrs[i] = resolver.Address{Addr: backend.Address} } @@ -264,8 +298,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { stateSubscriber := &ccStateSubscriber{} internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber) - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}}) var bal *stateStoringBalancer select { @@ -287,8 +320,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) { t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } - bm.backends[2].S.Stop() - bm.backends[2].S = nil + bm.backends[2].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[3]}}) if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[3]); err != nil { @@ -327,8 +359,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing stateSubscriber := &ccStateSubscriber{} internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber) - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}}) var bal *stateStoringBalancer select { @@ -350,8 +381,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } - bm.backends[2].S.Stop() - bm.backends[2].S = nil + bm.backends[2].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[1]}}) // Verify that the ClientConn stays in READY. @@ -391,8 +421,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi stateSubscriber := &ccStateSubscriber{} internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber) - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}}) var bal *stateStoringBalancer select { @@ -414,11 +443,9 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi t.Errorf("SubConn states mismatch (-want +got):\n%s", diff) } - bm.backends[2].S.Stop() - bm.backends[2].S = nil - if err := bm.backends[0].StartServer(); err != nil { - t.Fatalf("Failed to re-start test backend: %v", err) - } + bm.backends[2].stop() + bm.backends[0].resume() + r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[2]}}) if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { @@ -456,8 +483,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) { stateSubscriber := &ccStateSubscriber{} internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber) - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}}) var bal *stateStoringBalancer select { @@ -554,14 +580,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T) } // Shut down the connected server. - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) // Start the new target server. - if err := bm.backends[0].StartServer(); err != nil { - t.Fatalf("Failed to start server: %v", err) - } + bm.backends[0].resume() if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { t.Fatal(err) @@ -620,14 +643,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T) } // Shut down the connected server. - bm.backends[1].S.Stop() - bm.backends[1].S = nil + bm.backends[1].stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) // Start the new target server. - if err := bm.backends[1].StartServer(); err != nil { - t.Fatalf("Failed to start server: %v", err) - } + bm.backends[1].resume() if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil { t.Fatal(err) @@ -692,14 +712,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T) } // Shut down the connected server. - bm.backends[1].S.Stop() - bm.backends[1].S = nil + bm.backends[1].stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) // Start the new target server. - if err := bm.backends[0].StartServer(); err != nil { - t.Fatalf("Failed to start server: %v", err) - } + bm.backends[0].resume() if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { t.Fatal(err) @@ -763,14 +780,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T) } // Shut down the connected server. - bm.backends[0].S.Stop() - bm.backends[0].S = nil + bm.backends[0].stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) // Start the new target server. - if err := bm.backends[1].StartServer(); err != nil { - t.Fatalf("Failed to start server: %v", err) - } + bm.backends[1].resume() if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil { t.Fatal(err) @@ -1308,14 +1322,13 @@ type scState struct { } type backendManager struct { - backends []*stubserver.StubServer + backends []*testServer } func (b *backendManager) stopAllExcept(index int) { for idx, b := range b.backends { if idx != index { - b.S.Stop() - b.S = nil + b.stop() } } }