Skip to content

Commit

Permalink
xdsclient: fix new watcher to get both old good update and nack error…
Browse files Browse the repository at this point in the history
… (if exist) from the cache (#7851)
  • Loading branch information
purnesh42H authored Nov 21, 2024
1 parent 87f0254 commit 44a5eb9
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 22 deletions.
11 changes: 10 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,23 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true

// If we have a cached copy of the resource, notify the new watcher.
// If we have a cached copy of the resource, notify the new watcher
// immediately.
if state.cache != nil {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
// If last update was NACK'd, notify the new watcher of error
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
if state.md.Status == xdsresource.ServiceStatusNotExist {
Expand Down
198 changes: 177 additions & 21 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,47 @@ func newListenerWatcher() *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannel()}
}

func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
lw.updateCh.Replace(listenerUpdateErrTuple{err: err})
onDone()
}

func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

type listenerWatcherMultiple struct {
updateCh *testutils.Channel
}

// TODO: delete this once `newListenerWatcher` is modified to handle multiple
// updates (https://github.com/grpc/grpc-go/issues/7864).
func newListenerWatcherMultiple(size int) *listenerWatcherMultiple {
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)}
}

func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: err})
onDone()
}

func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

Expand Down Expand Up @@ -155,6 +180,18 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
return nil
}

func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr)
}
return nil
}

// TestLDSWatch covers the case where a single watcher exists for a single
// listener resource. The test verifies the following scenarios:
// 1. An update from the management server containing the resource being
Expand Down Expand Up @@ -953,8 +990,9 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
}

// TestLDSWatch_NACKError covers the case where an update from the management
// server is NACK'ed by the xdsclient. The test verifies that the error is
// propagated to the watcher.
// server is NACKed by the xdsclient. The test verifies that the error is
// propagated to the existing watcher. After NACK, if a new watcher registers
// for the resource, error is propagated to the new watcher as well.
func (s) TestLDSWatch_NACKError(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

Expand Down Expand Up @@ -992,19 +1030,141 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the watcher.
u, err := lw.updateCh.Receive(ctx)
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Verify that the expected error is propagated to the new watcher as well.
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}
}

// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is
// registered for a resource which is already present in the cache with an old
// good update as well as latest NACK error. The test verifies that new watcher
// receives both good update and error without a new resource request being
// sent to the management server.
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
firstRequestReceived := false
firstAckReceived := grpcsync.NewEvent()
secondAckReceived := grpcsync.NewEvent()
secondRequestReceived := grpcsync.NewEvent()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
// The first request has an empty version string.
if !firstRequestReceived && req.GetVersionInfo() == "" {
firstRequestReceived = true
return nil
}
// The first ack has a non-empty version string.
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
firstAckReceived.Fire()
return nil
}
// The second ack has a non-empty version string.
if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" {
secondAckReceived.Fire()
return nil
}
// Any requests after the first request and two acks, are not expected.
secondRequestReceived.Fire()
return nil
},
})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Create an xDS client with the above bootstrap contents.
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
Contents: bc,
})
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
t.Fatalf("Failed to create xDS client: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
defer close()

// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()

// Configure the management server to return a single listener
// resource, corresponding to the one we registered a watch for.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), 1000*defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify the contents of the received update.
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Configure the management server to return a single listener resource
// which is expected to be NACKed by the client.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Register another watch for the same resource. This should get the update
// and error from the cache.
lw2 := newListenerWatcherMultiple(2)
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// No request should get sent out as part of this watch.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-secondRequestReceived.Done():
t.Fatal("xdsClient sent out request instead of using update from cache")
default:
}
}

// TestLDSWatch_PartialValid covers the case where a response from the
// management server contains both valid and invalid resources and is expected
// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding
// to be NACKed by the xdsclient. The test verifies that watchers corresponding
// to the valid resource receive the update, while watchers corresponding to the
// invalid resource receive an error.
func (s) TestLDSWatch_PartialValid(t *testing.T) {
Expand Down Expand Up @@ -1071,13 +1231,9 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {

// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
u, err := lw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Verify that the watcher watching the good resource receives a good
Expand Down

0 comments on commit 44a5eb9

Please sign in to comment.