Skip to content

Commit

Permalink
Revert "Send mulitiple apis in a XDS update"
Browse files Browse the repository at this point in the history
  • Loading branch information
tharindu1st authored Jan 11, 2024
1 parent 531aec8 commit 34ce286
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 333 deletions.
2 changes: 2 additions & 0 deletions adapter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/wso2/apk/adapter

go 1.19

replace github.com/wso2/apk/common-go-libs => ../common-go-libs

require (
github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81
github.com/fsnotify/fsnotify v1.6.0
Expand Down
80 changes: 43 additions & 37 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var (
listenerToRouteArrayMap map[string][]*routev3.Route // Listener -> Routes map

// Common Enforcer Label as map key
// This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
// TODO(amali) This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
enforcerLabelMap map[string]*EnforcerInternalAPI // Enforcer Label -> EnforcerInternalAPI struct map

// KeyManagerList to store data
Expand Down Expand Up @@ -260,28 +260,26 @@ func deleteAPI(apiIdentifier string, environments []string, organizationID strin
existingLabels := orgAPIMap[organizationID][apiIdentifier].envoyLabels
toBeDelEnvs, toBeKeptEnvs := getEnvironmentsToBeDeleted(existingLabels, environments)

var isAllowedToDelete bool
updatedLabelsMap := make(map[string]struct{})
for _, val := range toBeDelEnvs {
updatedLabelsMap[val] = struct{}{}
if stringutils.StringInSlice(val, existingLabels) {
isAllowedToDelete = true
}
}
if isAllowedToDelete {
// do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs
orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs
if len(toBeKeptEnvs) != 0 {
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
isAllowedToDelete := stringutils.StringInSlice(val, existingLabels)
if isAllowedToDelete {
// do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs
orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs
updateXdsCacheOnAPIChange(toBeDelEnvs, []string{})
if len(toBeKeptEnvs) != 0 {
return nil
}
logger.LoggerXds.Infof("API identifier: %v does not have any gateways. Hence deleting the API from label : %s. API_UUID: %v",
apiIdentifier, val, apiUUID)
cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs)
return nil
}
}

//clean maps of routes, clusters, endpoints, enforcerAPIs
if len(environments) == 0 || isAllowedToDelete {
if len(environments) == 0 {
cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs)
}
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
return nil
}

Expand All @@ -290,6 +288,11 @@ func cleanMapResources(apiIdentifier string, organizationID string, toBeDelEnvs
delete(orgAPIMap[organizationID], apiIdentifier)
}

//updateXdsCacheOnAPIAdd is called after cleaning maps of routes, clusters, endpoints, enforcerAPIs.
//Therefore resources that belongs to the deleting API do not exist. Caches updated only with
//resources that belongs to the remaining APIs
updateXdsCacheOnAPIChange(toBeDelEnvs, []string{})

deleteBasepathForVHost(organizationID, apiIdentifier)
//TODO: (SuKSW) clean any remaining in label wise maps, if this is the last API of that label
logger.LoggerXds.Infof("Deleted API %v of organization %v", apiIdentifier, organizationID)
Expand All @@ -307,24 +310,33 @@ func deleteBasepathForVHost(organizationID, apiIdentifier string) {
}
}

// UpdateXdsCacheOnAPIChange when this method is called, openAPIEnvoy map is updated.
// when this method is called, openAPIEnvoy map is updated.
// Old labels refers to the previously assigned labels
// New labels refers to the the updated labels
func UpdateXdsCacheOnAPIChange(labels map[string]struct{}) bool {
func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
revisionStatus := false
// TODO: (VirajSalaka) check possible optimizations, Since the number of labels are low by design it should not be an issue
for newLabel := range labels {
for _, newLabel := range newLabels {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(newLabel)
UpdateEnforcerApis(newLabel, apis, "")
success := UpdateXdsCacheWithLock(newLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the label : %v", newLabel)
logger.LoggerXds.Debugf("Xds Cache is updated for the newly added label : %v", newLabel)
if success {
// if even one label was updated with latest revision, we take the revision as deployed.
// (other labels also will get updated successfully)
revisionStatus = success
continue
}
}
for _, oldLabel := range oldLabels {
if !stringutils.StringInSlice(oldLabel, newLabels) {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel)
GenerateEnvoyResoucesForGateway(oldLabel)
UpdateEnforcerApis(oldLabel, apis, "")
UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel)
}
}
return revisionStatus
}

Expand Down Expand Up @@ -672,8 +684,7 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
}

// UpdateAPICache updates the xDS cache related to the API Lifecycle event.
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string,
adapterInternalAPI model.AdapterInternalAPI) (map[string]struct{}, error) {
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

Expand All @@ -688,45 +699,37 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()] = vHostsMap
}

updatedLabelsMap := make(map[string]struct{}, 0)

// Remove internal mappigs for old vHosts
for _, oldvhost := range oldvHosts {
apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID)
var oldLabels []string
if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgMap[apiIdentifier]; apiExists {
for _, oldLabel := range orgMap[apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
oldLabels = orgMap[apiIdentifier].envoyLabels
delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier)
}
}
updateXdsCacheOnAPIChange(oldLabels, newLabels)
}

// Create internal mappings for new vHosts
// Create internal mappigs for new vHosts
for _, vHost := range vHosts {
logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost)
apiUUID := adapterInternalAPI.UUID
apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID)
var oldLabels []string
var orgExists bool

// get changing label set
if _, orgExists = orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier]; apiExists {
for _, oldLabel := range orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
oldLabels = orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels
}
}
for _, newLabel := range newLabels {
updatedLabelsMap[newLabel] = struct{}{}
}

routes, clusters, endpoints, err := oasParser.GetRoutesClustersEndpoints(adapterInternalAPI, nil,
vHost, adapterInternalAPI.GetOrganizationID())

if err != nil {
return nil, fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
return fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), adapterInternalAPI.GetOrganizationID(),
apiUUID, err.Error())
}
Expand All @@ -751,8 +754,11 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
} else {
listenerToRouteArrayMap[listener] = routes
}

revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels)
logger.LoggerXds.Infof("Deployed Revision: %v:%v, API_UUID: %v", apiIdentifier, revisionStatus, apiUUID)
}
return updatedLabelsMap, nil
return nil
}

// UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event.
Expand Down
31 changes: 9 additions & 22 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/wso2/apk/adapter/internal/dataholder"
"github.com/wso2/apk/adapter/internal/discovery/xds"
envoy "github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/operator/constants"
"github.com/wso2/apk/adapter/internal/operator/synchronizer"
Expand Down Expand Up @@ -119,7 +118,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T
}

dataholder.UpdateGateway(gateway)
xds.SanitizeGateway("default-gateway", true)

httpRouteState.HTTPRouteCombined = &httpRoute

backendMapping := make(map[string]*v1alpha1.ResolvedBackend)
Expand All @@ -131,8 +130,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T

apiState.ProdHTTPRoute = &httpRouteState

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -183,9 +181,8 @@ func TestGenerateAdapterInternalAPIForDefaultCase(t *testing.T) {
apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0")
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute
xds.SanitizeGateway("default-gateway", true)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "Default", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand All @@ -196,10 +193,8 @@ func TestGenerateAdapterInternalAPIForSpecificEnvironment(t *testing.T) {
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute
apiState.APIDefinition.Spec.Environment = "dev"
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "dev", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand Down Expand Up @@ -346,10 +341,8 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -480,10 +473,8 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -808,10 +799,8 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -894,10 +883,8 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down
14 changes: 4 additions & 10 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ
apiReconciler.ods.DeleteCachedAPI(req.NamespacedName)
loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache",
req.NamespacedName.String(), string(apiCR.ObjectMeta.UID))
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Events: []synchronizer.APIState{apiState}}
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Event: apiState}
return ctrl.Result{}, nil
}
loggers.LoggerAPKOperator.Warnf("Api CR related to the reconcile request with key: %s returned error. Assuming API with API UUID : %v is already deleted, hence ignoring the error : %v",
Expand Down Expand Up @@ -272,20 +272,14 @@ func (apiReconciler *APIReconciler) applyStartupAPIs() {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2605, logging.CRITICAL, "Unable to list APIs: %v", err))
return
}
combinedapiEvent := &synchronizer.APIEvent{
EventType: constants.Create,
Events: make([]synchronizer.APIState, 0),
}
for _, api := range apisList {
if apiState, err := apiReconciler.resolveAPIRefs(ctx, api); err != nil {
loggers.LoggerAPKOperator.Warnf("Error retrieving ref CRs for API : %s in namespace : %s with API UUID : %v, %v",
api.Name, api.Namespace, string(api.ObjectMeta.UID), err)
} else if apiState != nil {
combinedapiEvent.Events = append(combinedapiEvent.Events, apiState.Events...)
*apiReconciler.ch <- *apiState
}
}
// Send all the API events to the channel
*apiReconciler.ch <- *combinedapiEvent
xds.SetReady()
loggers.LoggerAPKOperator.Info("Initial APIs were deployed successfully")
}
Expand Down Expand Up @@ -411,13 +405,13 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1

if !api.Status.DeploymentStatus.Accepted {
apiReconciler.ods.AddAPIState(apiRef, apiState)
return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil
return &synchronizer.APIEvent{EventType: constants.Create, Event: *apiState, UpdatedEvents: []string{}}, nil
} else if cachedAPI, events, updated :=
apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated {
apiReconciler.removeOldOwnerRefs(ctx, cachedAPI)
loggers.LoggerAPI.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(),
string(api.ObjectMeta.UID), events)
return &synchronizer.APIEvent{EventType: constants.Update, Events: []synchronizer.APIState{cachedAPI}, UpdatedEvents: events}, nil
return &synchronizer.APIEvent{EventType: constants.Update, Event: cachedAPI, UpdatedEvents: events}, nil
}

return nil, nil
Expand Down
Loading

0 comments on commit 34ce286

Please sign in to comment.