From 44a5eb9231ec6e753dab5ded7cda6f81788fc3f7 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 22 Nov 2024 01:02:44 +0530 Subject: [PATCH] xdsclient: fix new watcher to get both old good update and nack error (if exist) from the cache (#7851) --- xds/internal/xdsclient/authority.go | 11 +- .../xdsclient/tests/lds_watchers_test.go | 198 ++++++++++++++++-- 2 files changed, 187 insertions(+), 22 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 04bd278d2c47..24673a8d9077 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -633,7 +633,8 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // Always add the new watcher to the set of watchers. state.watchers[watcher] = true - // If we have a cached copy of the resource, notify the new watcher. + // If we have a cached copy of the resource, notify the new watcher + // immediately. if state.cache != nil { if a.logger.V(2) { a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) @@ -641,6 +642,14 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) } + // If last update was NACK'd, notify the new watcher of error + // immediately as well. + if state.md.Status == xdsresource.ServiceStatusNACKed { + if a.logger.V(2) { + a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON()) + } + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) }) + } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 38e1f1760383..7b49b9b17b74 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -71,22 +71,47 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { - cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. - cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) onDone() } -func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) +func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + onDone() +} + +type listenerWatcherMultiple struct { + updateCh *testutils.Channel +} + +// TODO: delete this once `newListenerWatcher` is modified to handle multiple +// updates (https://github.com/grpc/grpc-go/issues/7864). +func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { + return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} +} + +func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + onDone() +} + +func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{err: err}) + onDone() +} + +func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) onDone() } @@ -155,6 +180,18 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want return nil } +func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error { + u, err := updateCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err) + } + gotErr := u.(listenerUpdateErrTuple).err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr) + } + return nil +} + // TestLDSWatch covers the case where a single watcher exists for a single // listener resource. The test verifies the following scenarios: // 1. An update from the management server containing the resource being @@ -953,8 +990,9 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { } // TestLDSWatch_NACKError covers the case where an update from the management -// server is NACK'ed by the xdsclient. The test verifies that the error is -// propagated to the watcher. +// server is NACKed by the xdsclient. The test verifies that the error is +// propagated to the existing watcher. After NACK, if a new watcher registers +// for the resource, error is propagated to the new watcher as well. func (s) TestLDSWatch_NACKError(t *testing.T) { mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -992,19 +1030,141 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { } // Verify that the expected error is propagated to the watcher. - u, err := lw.updateCh.Receive(ctx) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) + } + + // Verify that the expected error is propagated to the new watcher as well. + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) + defer ldsCancel2() + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) + } +} + +// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is +// registered for a resource which is already present in the cache with an old +// good update as well as latest NACK error. The test verifies that new watcher +// receives both good update and error without a new resource request being +// sent to the management server. +func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { + firstRequestReceived := false + firstAckReceived := grpcsync.NewEvent() + secondAckReceived := grpcsync.NewEvent() + secondRequestReceived := grpcsync.NewEvent() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // The first request has an empty version string. + if !firstRequestReceived && req.GetVersionInfo() == "" { + firstRequestReceived = true + return nil + } + // The first ack has a non-empty version string. + if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { + firstAckReceived.Fire() + return nil + } + // The second ack has a non-empty version string. + if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" { + secondAckReceived.Fire() + return nil + } + // Any requests after the first request and two acks, are not expected. + secondRequestReceived.Fire() + return nil + }, + }) + + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create an xDS client with the above bootstrap contents. + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) + t.Fatalf("Failed to create xDS client: %v", err) } - gotErr := u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + defer close() + + // Register a watch for a listener resource and have the watch + // callback push the received update on to a channel. + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) + defer ldsCancel1() + + // Configure the management server to return a single listener + // resource, corresponding to the one we registered a watch for. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), 1000*defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: rdsName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Configure the management server to return a single listener resource + // which is expected to be NACKed by the client. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) + } + + // Register another watch for the same resource. This should get the update + // and error from the cache. + lw2 := newListenerWatcherMultiple(2) + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) + defer ldsCancel2() + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) + } + + // No request should get sent out as part of this watch. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case <-secondRequestReceived.Done(): + t.Fatal("xdsClient sent out request instead of using update from cache") + default: } } // TestLDSWatch_PartialValid covers the case where a response from the // management server contains both valid and invalid resources and is expected -// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding +// to be NACKed by the xdsclient. The test verifies that watchers corresponding // to the valid resource receive the update, while watchers corresponding to the // invalid resource receive an error. func (s) TestLDSWatch_PartialValid(t *testing.T) { @@ -1071,13 +1231,9 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { // Verify that the expected error is propagated to the watcher which // requested for the bad resource. - u, err := lw1.updateCh.Receive(ctx) - if err != nil { - t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) - } - gotErr := u.(listenerUpdateErrTuple).err - if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { - t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) + // Verify that the expected error is propagated to the existing watcher. + if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil { + t.Fatal(err) } // Verify that the watcher watching the good resource receives a good