Skip to content

Commit

Permalink
send mulitiple apis in xds update
Browse files Browse the repository at this point in the history
  • Loading branch information
AmaliMatharaarachchi committed Jan 9, 2024
1 parent a59c8ca commit 63f7968
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 166 deletions.
2 changes: 0 additions & 2 deletions adapter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ module github.com/wso2/apk/adapter

go 1.19

replace github.com/wso2/apk/common-go-libs => ../common-go-libs

require (
github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81
github.com/fsnotify/fsnotify v1.6.0
Expand Down
51 changes: 40 additions & 11 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var (
listenerToRouteArrayMap map[string][]*routev3.Route // Listener -> Routes map

// Common Enforcer Label as map key
// TODO(amali) This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
// This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future.
enforcerLabelMap map[string]*EnforcerInternalAPI // Enforcer Label -> EnforcerInternalAPI struct map

// KeyManagerList to store data
Expand Down Expand Up @@ -331,7 +331,6 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
for _, oldLabel := range oldLabels {
if !stringutils.StringInSlice(oldLabel, newLabels) {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel)
GenerateEnvoyResoucesForGateway(oldLabel)
UpdateEnforcerApis(oldLabel, apis, "")
UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel)
Expand All @@ -340,6 +339,27 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
return revisionStatus
}

// UpdateXdsCacheOnAPIChange1 when this method is called, openAPIEnvoy map is updated.
// Old labels refers to the previously assigned labels
// New labels refers to the the updated labels
func UpdateXdsCacheOnAPIChange1(labels map[string]struct{}) bool {
revisionStatus := false
// TODO: (VirajSalaka) check possible optimizations, Since the number of labels are low by design it should not be an issue
for newLabel := range labels {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(newLabel)
UpdateEnforcerApis(newLabel, apis, "")
success := UpdateXdsCacheWithLock(newLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the label : %v", newLabel)
if success {
// if even one label was updated with latest revision, we take the revision as deployed.
// (other labels also will get updated successfully)
revisionStatus = success
continue
}
}
return revisionStatus
}

// SetReady Method to set the status after the last api is fected and updated in router.
func SetReady() {
logger.LoggerXds.Infof("Finished deploying startup APIs. Deploying the readiness endpoint...")
Expand Down Expand Up @@ -684,7 +704,8 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) {
}

// UpdateAPICache updates the xDS cache related to the API Lifecycle event.
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error {
func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string,
adapterInternalAPI model.AdapterInternalAPI) (map[string]struct{}, error) {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()

Expand All @@ -699,37 +720,46 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()] = vHostsMap
}

updatedLabelsMap := make(map[string]struct{}, 0)

// Remove internal mappigs for old vHosts
for _, oldvhost := range oldvHosts {
apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID)
var oldLabels []string
if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgMap[apiIdentifier]; apiExists {
oldLabels = orgMap[apiIdentifier].envoyLabels
for _, oldLabel := range orgMap[apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier)
}
}
updateXdsCacheOnAPIChange(oldLabels, newLabels)
}

// Create internal mappigs for new vHosts
// Create internal mappings for new vHosts
for _, vHost := range vHosts {
logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost)
apiUUID := adapterInternalAPI.UUID
apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID)
var oldLabels []string
var orgExists bool

// get changing label set
if _, orgExists = orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists {
if _, apiExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier]; apiExists {
oldLabels = orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels
for _, oldLabel := range orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels {
updatedLabelsMap[oldLabel] = struct{}{}
}
}
}
for _, newLabel := range newLabels {
updatedLabelsMap[newLabel] = struct{}{}
}

routes, clusters, endpoints, err := oasParser.GetRoutesClustersEndpoints(adapterInternalAPI, nil,
vHost, adapterInternalAPI.GetOrganizationID())

if err != nil {
return fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
return nil, fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s",
adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), adapterInternalAPI.GetOrganizationID(),
apiUUID, err.Error())
}
Expand All @@ -754,11 +784,10 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio
} else {
listenerToRouteArrayMap[listener] = routes
}

revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels)
logger.LoggerXds.Infof("Deployed Revision: %v:%v, API_UUID: %v", apiIdentifier, revisionStatus, apiUUID)
}
return nil
return updatedLabelsMap, nil
}

// UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event.
Expand Down
31 changes: 22 additions & 9 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/wso2/apk/adapter/internal/dataholder"
"github.com/wso2/apk/adapter/internal/discovery/xds"
envoy "github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/operator/constants"
"github.com/wso2/apk/adapter/internal/operator/synchronizer"
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T
}

dataholder.UpdateGateway(gateway)

xds.SanitizeGateway("default-gateway", true)
httpRouteState.HTTPRouteCombined = &httpRoute

backendMapping := make(map[string]*v1alpha1.ResolvedBackend)
Expand All @@ -130,7 +131,8 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T

apiState.ProdHTTPRoute = &httpRouteState

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -181,8 +183,9 @@ func TestGenerateAdapterInternalAPIForDefaultCase(t *testing.T) {
apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0")
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
xds.SanitizeGateway("default-gateway", true)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "Default", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand All @@ -193,8 +196,10 @@ func TestGenerateAdapterInternalAPIForSpecificEnvironment(t *testing.T) {
httpRouteState := synchronizer.HTTPRouteState{}
httpRouteState = *apiState.ProdHTTPRoute
apiState.APIDefinition.Spec.Environment = "dev"
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
assert.Equal(t, "dev", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.")
}
Expand Down Expand Up @@ -341,8 +346,10 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -473,8 +480,10 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -799,8 +808,10 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.")
Expand Down Expand Up @@ -883,8 +894,10 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) {
httpRouteState.BackendMapping = backendMapping

apiState.ProdHTTPRoute = &httpRouteState
xds.SanitizeGateway("default-gateway", true)

adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production)
assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.")
assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object")
_, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super")
assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.")
Expand Down
14 changes: 10 additions & 4 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ
apiReconciler.ods.DeleteCachedAPI(req.NamespacedName)
loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache",
req.NamespacedName.String(), string(apiCR.ObjectMeta.UID))
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Event: apiState}
*apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Events: []synchronizer.APIState{apiState}}
return ctrl.Result{}, nil
}
loggers.LoggerAPKOperator.Warnf("Api CR related to the reconcile request with key: %s returned error. Assuming API with API UUID : %v is already deleted, hence ignoring the error : %v",
Expand Down Expand Up @@ -270,14 +270,20 @@ func (apiReconciler *APIReconciler) applyStartupAPIs() {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2605, logging.CRITICAL, "Unable to list APIs: %v", err))
return
}
combinedapiEvent := &synchronizer.APIEvent{
EventType: constants.Create,
Events: make([]synchronizer.APIState, 0),
}
for _, api := range apisList {
if apiState, err := apiReconciler.resolveAPIRefs(ctx, api); err != nil {
loggers.LoggerAPKOperator.Warnf("Error retrieving ref CRs for API : %s in namespace : %s with API UUID : %v, %v",
api.Name, api.Namespace, string(api.ObjectMeta.UID), err)
} else if apiState != nil {
*apiReconciler.ch <- *apiState
combinedapiEvent.Events = append(combinedapiEvent.Events, apiState.Events...)
}
}
// Send all the API events to the channel
*apiReconciler.ch <- *combinedapiEvent
xds.SetReady()
loggers.LoggerAPKOperator.Info("Initial APIs were deployed successfully")
}
Expand Down Expand Up @@ -394,13 +400,13 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1

if !api.Status.DeploymentStatus.Accepted {
apiReconciler.ods.AddAPIState(apiRef, apiState)
return &synchronizer.APIEvent{EventType: constants.Create, Event: *apiState, UpdatedEvents: []string{}}, nil
return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil
} else if cachedAPI, events, updated :=
apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated {
apiReconciler.removeOldOwnerRefs(ctx, cachedAPI)
loggers.LoggerAPI.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(),
string(api.ObjectMeta.UID), events)
return &synchronizer.APIEvent{EventType: constants.Update, Event: cachedAPI, UpdatedEvents: events}, nil
return &synchronizer.APIEvent{EventType: constants.Update, Events: []synchronizer.APIState{cachedAPI}, UpdatedEvents: events}, nil
}

return nil, nil
Expand Down
52 changes: 16 additions & 36 deletions adapter/internal/operator/synchronizer/gql_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,14 @@ import (
"github.com/wso2/apk/adapter/internal/discovery/xds/common"
"github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/internal/oasparser/model"
"github.com/wso2/apk/adapter/internal/operator/constants"
"github.com/wso2/apk/adapter/pkg/logging"
"github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
"k8s.io/apimachinery/pkg/types"
gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// deployGQLAPIInGateway deploys the related API in CREATE and UPDATE events.
func deployGQLAPIInGateway(apiState APIState) error {
var err error
if len(apiState.OldOrganizationID) != 0 {
xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID)
}
if apiState.ProdGQLRoute == nil {
var adapterInternalAPI model.AdapterInternalAPI
adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition)
xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production)
}
if apiState.SandGQLRoute == nil {
var adapterInternalAPI model.AdapterInternalAPI
adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition)
xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox)
}
if apiState.ProdGQLRoute != nil {
_, err = generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production)
}
if err != nil {
return err
}
if apiState.SandGQLRoute != nil {
_, err = generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox)
}
return err
}

// generateGQLAdapterInternalAPI this will populate a AdapterInternalAPI representation for an GQLRoute
func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, error) {
func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, map[string]struct{}, error) {
var adapterInternalAPI model.AdapterInternalAPI
adapterInternalAPI.SetIsDefaultVersion(apiState.APIDefinition.Spec.IsDefaultVersion)
adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition)
Expand Down Expand Up @@ -92,9 +63,12 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e
ResourceRateLimitPolicies: apiState.ResourceRateLimitPolicies,
}
if err := adapterInternalAPI.SetInfoGQLRouteCR(gqlRoute.GQLRouteCombined, resourceParams); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR,
"Error setting GQLRoute CR info to adapterInternalAPI. %v", err))
return nil, err
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting GQLRoute CR info to adapterInternalAPI. %v", err))
return nil, nil, err
}
if err := adapterInternalAPI.Validate(); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2632, logging.MAJOR, "Error validating adapterInternalAPI intermediate representation. %v", err))
return nil, nil, err
}
vHosts := getVhostsForGQLAPI(gqlRoute.GQLRouteCombined)
labels := getLabelsForGQLAPI(gqlRoute.GQLRouteCombined)
Expand All @@ -103,19 +77,25 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e
if len(listeners) == 0 || len(relativeSectionNames) == 0 {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MINOR, "Failed to find a matching listener for gql route: %v. ",
gqlRoute.GQLRouteCombined.Name))
return nil, errors.New("failed to find matching listener name for the provided gql route")
return nil, nil, errors.New("failed to find matching listener name for the provided gql route")
}

updatedLabelsMap := make(map[string]struct{})
listenerName := listeners[0]
sectionName := relativeSectionNames[0]
if len(listeners) != 0 {
err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI)
updatedLabels, err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI)
if err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v",
adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), vHosts, adapterInternalAPI.UUID, err))
return nil, nil, err
}
for newLabel := range updatedLabels {
updatedLabelsMap[newLabel] = struct{}{}
}
}

return &adapterInternalAPI, nil
return &adapterInternalAPI, updatedLabelsMap, nil
}

// getVhostForAPI returns the vHosts related to an API.
Expand Down
Loading

0 comments on commit 63f7968

Please sign in to comment.