Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send mulitiple apis in a XDS update #1938

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions adapter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ module github.com/wso2/apk/adapter

go 1.19

replace github.com/wso2/apk/common-go-libs => ../common-go-libs

require (
github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81
github.com/fsnotify/fsnotify v1.6.0
Expand Down
80 changes: 37 additions & 43 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var (
listenerToRouteArrayMap map[string][]*routev3.Route // Listener -> Routes map

// Common Enforcer Label as map key
// TODO(amali) This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
// This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
enforcerLabelMap map[string]*EnforcerInternalAPI // Enforcer Label -> EnforcerInternalAPI struct map

// KeyManagerList to store data
Expand Down Expand Up @@ -260,26 +260,28 @@ func deleteAPI(apiIdentifier string, environments []string, organizationID strin
existingLabels := orgAPIMap[organizationID][apiIdentifier].envoyLabels
toBeDelEnvs, toBeKeptEnvs := getEnvironmentsToBeDeleted(existingLabels, environments)

var isAllowedToDelete bool
updatedLabelsMap := make(map[string]struct{})
for _, val := range toBeDelEnvs {
isAllowedToDelete := stringutils.StringInSlice(val, existingLabels)
if isAllowedToDelete {
// do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs
orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs
updateXdsCacheOnAPIChange(toBeDelEnvs, []string{})
if len(toBeKeptEnvs) != 0 {
return nil
}
logger.LoggerXds.Infof("API identifier: %v does not have any gateways. Hence deleting the API from label : %s. API_UUID: %v",
apiIdentifier, val, apiUUID)
cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs)
updatedLabelsMap[val] = struct{}{}
if stringutils.StringInSlice(val, existingLabels) {
isAllowedToDelete = true
}
}
if isAllowedToDelete {
// do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs
orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs
if len(toBeKeptEnvs) != 0 {
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
return nil
}
}

//clean maps of routes, clusters, endpoints, enforcerAPIs
if len(environments) == 0 {
if len(environments) == 0 || isAllowedToDelete {
cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs)
}
UpdateXdsCacheOnAPIChange(updatedLabelsMap)
return nil
}

Expand All @@ -288,11 +290,6 @@ func cleanMapResources(apiIdentifier string, organizationID string, toBeDelEnvs
delete(orgAPIMap[organizationID], apiIdentifier)
}

//updateXdsCacheOnAPIAdd is called after cleaning maps of routes, clusters, endpoints, enforcerAPIs.
//Therefore resources that belongs to the deleting API do not exist. Caches updated only with
//resources that belongs to the remaining APIs
updateXdsCacheOnAPIChange(toBeDelEnvs, []string{})

deleteBasepathForVHost(organizationID, apiIdentifier)
//TODO: (SuKSW) clean any remaining in label wise maps, if this is the last API of that label
logger.LoggerXds.Infof("Deleted API %v of organization %v", apiIdentifier, organizationID)
Expand All @@ -310,33 +307,24 @@ func deleteBasepathForVHost(organizationID, apiIdentifier string) {
}
}

// when this method is called, openAPIEnvoy map is updated.
// UpdateXdsCacheOnAPIChange when this method is called, openAPIEnvoy map is updated.
// Old labels refers to the previously assigned labels
// New labels refers to the the updated labels
func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
func UpdateXdsCacheOnAPIChange(labels map[string]struct{}) bool {
revisionStatus := false
// TODO: (VirajSalaka) check possible optimizations, Since the number of labels are low by design it should not be an issue
for _, newLabel := range newLabels {
for newLabel := range labels {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(newLabel)
UpdateEnforcerApis(newLabel, apis, "")
success := UpdateXdsCacheWithLock(newLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the newly added label : %v", newLabel)
logger.LoggerXds.Debugf("Xds Cache is updated for the label : %v", newLabel)
if success {
// if even one label was updated with latest revision, we take the revision as deployed.
// (other labels also will get updated successfully)
revisionStatus = success
continue
}
}
for _, oldLabel := range oldLabels {
if !stringutils.StringInSlice(oldLabel, newLabels) {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel)
GenerateEnvoyResoucesForGateway(oldLabel)
UpdateEnforcerApis(oldLabel, apis, "")
UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel)
}
}
return revisionStatus
}

Expand Down Expand Up @@ -684,7 +672,8 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
}

// UpdateAPICache updates the xDS cache related to the API Lifecycle event.
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error {
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string,
adapterInternalAPI model.AdapterInternalAPI) (map[string]struct{}, error) {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

Expand All @@ -699,37 +688,45 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()] = vHostsMap
}

updatedLabelsMap := make(map[string]struct{}, 0)

// Remove internal mappigs for old vHosts
for _, oldvhost := range oldvHosts {
apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID)
var oldLabels []string
if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgMap[apiIdentifier]; apiExists {
oldLabels = orgMap[apiIdentifier].envoyLabels
for _, oldLabel := range orgMap[apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier)
}
}
updateXdsCacheOnAPIChange(oldLabels, newLabels)
}

// Create internal mappigs for new vHosts
// Create internal mappings for new vHosts
for _, vHost := range vHosts {
logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost)
apiUUID := adapterInternalAPI.UUID
apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID)
var oldLabels []string
var orgExists bool

// get changing label set
if _, orgExists = orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier]; apiExists {
oldLabels = orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels
for _, oldLabel := range orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
}
}
for _, newLabel := range newLabels {
updatedLabelsMap[newLabel] = struct{}{}
}

routes, clusters, endpoints, err := oasParser.GetRoutesClustersEndpoints(adapterInternalAPI, nil,
vHost, adapterInternalAPI.GetOrganizationID())

if err != nil {
return fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
return nil, fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), adapterInternalAPI.GetOrganizationID(),
apiUUID, err.Error())
}
Expand All @@ -754,11 +751,8 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
} else {
listenerToRouteArrayMap[listener] = routes
}

revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels)
logger.LoggerXds.Infof("Deployed Revision: %v:%v, API_UUID: %v", apiIdentifier, revisionStatus, apiUUID)
}
return nil
return updatedLabelsMap, nil
}

// UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/wso2/apk/adapter/internal/dataholder"
"github.com/wso2/apk/adapter/internal/discovery/xds"
envoy "github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/operator/constants"
"github.com/wso2/apk/adapter/internal/operator/synchronizer"
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T
}

dataholder.UpdateGateway(gateway)

xds.SanitizeGateway("default-gateway", true)
httpRouteState.HTTPRouteCombined = &httpRoute

backendMapping := make(map[string]*v1alpha1.ResolvedBackend)
Expand All @@ -130,7 +131,8 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T

apiState.ProdHTTPRoute = &httpRouteState

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -181,8 +183,9 @@ func TestGenerateAdapterInternalAPIForDefaultCase(t *testing.T) {
apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0")
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
xds.SanitizeGateway("default-gateway", true)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "Default", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand All @@ -193,8 +196,10 @@ func TestGenerateAdapterInternalAPIForSpecificEnvironment(t *testing.T) {
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute
apiState.APIDefinition.Spec.Environment = "dev"
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "dev", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand Down Expand Up @@ -341,8 +346,10 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -473,8 +480,10 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -799,8 +808,10 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -883,8 +894,10 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down
14 changes: 10 additions & 4 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ
apiReconciler.ods.DeleteCachedAPI(req.NamespacedName)
loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache",
req.NamespacedName.String(), string(apiCR.ObjectMeta.UID))
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Event: apiState}
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Events: []synchronizer.APIState{apiState}}
return ctrl.Result{}, nil
}
loggers.LoggerAPKOperator.Warnf("Api CR related to the reconcile request with key: %s returned error. Assuming API with API UUID : %v is already deleted, hence ignoring the error : %v",
Expand Down Expand Up @@ -270,14 +270,20 @@ func (apiReconciler *APIReconciler) applyStartupAPIs() {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2605, logging.CRITICAL, "Unable to list APIs: %v", err))
return
}
combinedapiEvent := &synchronizer.APIEvent{
EventType: constants.Create,
Events: make([]synchronizer.APIState, 0),
}
for _, api := range apisList {
if apiState, err := apiReconciler.resolveAPIRefs(ctx, api); err != nil {
loggers.LoggerAPKOperator.Warnf("Error retrieving ref CRs for API : %s in namespace : %s with API UUID : %v, %v",
api.Name, api.Namespace, string(api.ObjectMeta.UID), err)
} else if apiState != nil {
*apiReconciler.ch <- *apiState
combinedapiEvent.Events = append(combinedapiEvent.Events, apiState.Events...)
}
}
// Send all the API events to the channel
*apiReconciler.ch <- *combinedapiEvent
xds.SetReady()
loggers.LoggerAPKOperator.Info("Initial APIs were deployed successfully")
}
Expand Down Expand Up @@ -394,13 +400,13 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1

if !api.Status.DeploymentStatus.Accepted {
apiReconciler.ods.AddAPIState(apiRef, apiState)
return &synchronizer.APIEvent{EventType: constants.Create, Event: *apiState, UpdatedEvents: []string{}}, nil
return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil
} else if cachedAPI, events, updated :=
apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated {
apiReconciler.removeOldOwnerRefs(ctx, cachedAPI)
loggers.LoggerAPI.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(),
string(api.ObjectMeta.UID), events)
return &synchronizer.APIEvent{EventType: constants.Update, Event: cachedAPI, UpdatedEvents: events}, nil
return &synchronizer.APIEvent{EventType: constants.Update, Events: []synchronizer.APIState{cachedAPI}, UpdatedEvents: events}, nil
}

return nil, nil
Expand Down
Loading
Loading