From 63f7968a0a33ded0565d75c1d3af832297ab651e Mon Sep 17 00:00:00 2001 From: AmaliMatharaarachchi Date: Mon, 8 Jan 2024 13:38:48 +0530 Subject: [PATCH] send mulitiple apis in xds update --- adapter/go.mod | 2 - adapter/internal/discovery/xds/server.go | 51 ++- .../envoyconf/routes_with_clusters_test.go | 31 +- .../operator/controllers/dp/api_controller.go | 14 +- .../internal/operator/synchronizer/gql_api.go | 52 +-- .../operator/synchronizer/rest_api.go | 47 +-- .../operator/synchronizer/synchronizer.go | 304 ++++++++++++++---- 7 files changed, 335 insertions(+), 166 deletions(-) diff --git a/adapter/go.mod b/adapter/go.mod index c70c2d85c..3ef2c7d98 100644 --- a/adapter/go.mod +++ b/adapter/go.mod @@ -2,8 +2,6 @@ module github.com/wso2/apk/adapter go 1.19 -replace github.com/wso2/apk/common-go-libs => ../common-go-libs - require ( github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81 github.com/fsnotify/fsnotify v1.6.0 diff --git a/adapter/internal/discovery/xds/server.go b/adapter/internal/discovery/xds/server.go index 098f2fdfd..16b75390d 100644 --- a/adapter/internal/discovery/xds/server.go +++ b/adapter/internal/discovery/xds/server.go @@ -110,7 +110,7 @@ var ( listenerToRouteArrayMap map[string][]*routev3.Route // Listener -> Routes map // Common Enforcer Label as map key - // TODO(amali) This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future. + // This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future. enforcerLabelMap map[string]*EnforcerInternalAPI // Enforcer Label -> EnforcerInternalAPI struct map // KeyManagerList to store data @@ -331,7 +331,6 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool { for _, oldLabel := range oldLabels { if !stringutils.StringInSlice(oldLabel, newLabels) { listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel) - GenerateEnvoyResoucesForGateway(oldLabel) UpdateEnforcerApis(oldLabel, apis, "") UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners) logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel) @@ -340,6 +339,27 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool { return revisionStatus } +// UpdateXdsCacheOnAPIChange1 when this method is called, openAPIEnvoy map is updated. +// Old labels refers to the previously assigned labels +// New labels refers to the the updated labels +func UpdateXdsCacheOnAPIChange1(labels map[string]struct{}) bool { + revisionStatus := false + // TODO: (VirajSalaka) check possible optimizations, Since the number of labels are low by design it should not be an issue + for newLabel := range labels { + listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(newLabel) + UpdateEnforcerApis(newLabel, apis, "") + success := UpdateXdsCacheWithLock(newLabel, endpoints, clusters, routes, listeners) + logger.LoggerXds.Debugf("Xds Cache is updated for the label : %v", newLabel) + if success { + // if even one label was updated with latest revision, we take the revision as deployed. + // (other labels also will get updated successfully) + revisionStatus = success + continue + } + } + return revisionStatus +} + // SetReady Method to set the status after the last api is fected and updated in router. func SetReady() { logger.LoggerXds.Infof("Finished deploying startup APIs. Deploying the readiness endpoint...") @@ -684,7 +704,8 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) { } // UpdateAPICache updates the xDS cache related to the API Lifecycle event. -func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error { +func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, + adapterInternalAPI model.AdapterInternalAPI) (map[string]struct{}, error) { mutexForInternalMapUpdate.Lock() defer mutexForInternalMapUpdate.Unlock() @@ -699,37 +720,46 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()] = vHostsMap } + updatedLabelsMap := make(map[string]struct{}, 0) + // Remove internal mappigs for old vHosts for _, oldvhost := range oldvHosts { apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID) - var oldLabels []string if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists { if _, apiExists := orgMap[apiIdentifier]; apiExists { - oldLabels = orgMap[apiIdentifier].envoyLabels + for _, oldLabel := range orgMap[apiIdentifier].envoyLabels { + updatedLabelsMap[oldLabel] = struct{}{} + } delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier) } } - updateXdsCacheOnAPIChange(oldLabels, newLabels) } - // Create internal mappigs for new vHosts + // Create internal mappings for new vHosts for _, vHost := range vHosts { logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost) apiUUID := adapterInternalAPI.UUID apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID) var oldLabels []string var orgExists bool + + // get changing label set if _, orgExists = orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists { if _, apiExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier]; apiExists { - oldLabels = orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels + for _, oldLabel := range orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels { + updatedLabelsMap[oldLabel] = struct{}{} + } } } + for _, newLabel := range newLabels { + updatedLabelsMap[newLabel] = struct{}{} + } routes, clusters, endpoints, err := oasParser.GetRoutesClustersEndpoints(adapterInternalAPI, nil, vHost, adapterInternalAPI.GetOrganizationID()) if err != nil { - return fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s", + return nil, fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), adapterInternalAPI.GetOrganizationID(), apiUUID, err.Error()) } @@ -754,11 +784,10 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio } else { listenerToRouteArrayMap[listener] = routes } - revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels) logger.LoggerXds.Infof("Deployed Revision: %v:%v, API_UUID: %v", apiIdentifier, revisionStatus, apiUUID) } - return nil + return updatedLabelsMap, nil } // UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event. diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go index 6464f24ad..b64eec0f0 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/wso2/apk/adapter/internal/dataholder" + "github.com/wso2/apk/adapter/internal/discovery/xds" envoy "github.com/wso2/apk/adapter/internal/oasparser/envoyconf" "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/internal/operator/synchronizer" @@ -118,7 +119,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T } dataholder.UpdateGateway(gateway) - + xds.SanitizeGateway("default-gateway", true) httpRouteState.HTTPRouteCombined = &httpRoute backendMapping := make(map[string]*v1alpha1.ResolvedBackend) @@ -130,7 +131,8 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T apiState.ProdHTTPRoute = &httpRouteState - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -181,8 +183,9 @@ func TestGenerateAdapterInternalAPIForDefaultCase(t *testing.T) { apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0") httpRouteState := synchronizer.HTTPRouteState{} httpRouteState = *apiState.ProdHTTPRoute - - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + xds.SanitizeGateway("default-gateway", true) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") assert.Equal(t, "Default", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.") } @@ -193,8 +196,10 @@ func TestGenerateAdapterInternalAPIForSpecificEnvironment(t *testing.T) { httpRouteState := synchronizer.HTTPRouteState{} httpRouteState = *apiState.ProdHTTPRoute apiState.APIDefinition.Spec.Environment = "dev" + xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") assert.Equal(t, "dev", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.") } @@ -341,8 +346,10 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -473,8 +480,10 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.") @@ -799,8 +808,10 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -883,8 +894,10 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) + assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.") diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index 372dbb442..a973e16d1 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -242,7 +242,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ apiReconciler.ods.DeleteCachedAPI(req.NamespacedName) loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache", req.NamespacedName.String(), string(apiCR.ObjectMeta.UID)) - *apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Event: apiState} + *apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Events: []synchronizer.APIState{apiState}} return ctrl.Result{}, nil } loggers.LoggerAPKOperator.Warnf("Api CR related to the reconcile request with key: %s returned error. Assuming API with API UUID : %v is already deleted, hence ignoring the error : %v", @@ -270,14 +270,20 @@ func (apiReconciler *APIReconciler) applyStartupAPIs() { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2605, logging.CRITICAL, "Unable to list APIs: %v", err)) return } + combinedapiEvent := &synchronizer.APIEvent{ + EventType: constants.Create, + Events: make([]synchronizer.APIState, 0), + } for _, api := range apisList { if apiState, err := apiReconciler.resolveAPIRefs(ctx, api); err != nil { loggers.LoggerAPKOperator.Warnf("Error retrieving ref CRs for API : %s in namespace : %s with API UUID : %v, %v", api.Name, api.Namespace, string(api.ObjectMeta.UID), err) } else if apiState != nil { - *apiReconciler.ch <- *apiState + combinedapiEvent.Events = append(combinedapiEvent.Events, apiState.Events...) } } + // Send all the API events to the channel + *apiReconciler.ch <- *combinedapiEvent xds.SetReady() loggers.LoggerAPKOperator.Info("Initial APIs were deployed successfully") } @@ -394,13 +400,13 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 if !api.Status.DeploymentStatus.Accepted { apiReconciler.ods.AddAPIState(apiRef, apiState) - return &synchronizer.APIEvent{EventType: constants.Create, Event: *apiState, UpdatedEvents: []string{}}, nil + return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil } else if cachedAPI, events, updated := apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated { apiReconciler.removeOldOwnerRefs(ctx, cachedAPI) loggers.LoggerAPI.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(), string(api.ObjectMeta.UID), events) - return &synchronizer.APIEvent{EventType: constants.Update, Event: cachedAPI, UpdatedEvents: events}, nil + return &synchronizer.APIEvent{EventType: constants.Update, Events: []synchronizer.APIState{cachedAPI}, UpdatedEvents: events}, nil } return nil, nil diff --git a/adapter/internal/operator/synchronizer/gql_api.go b/adapter/internal/operator/synchronizer/gql_api.go index 5a2b387b1..bae3652f2 100644 --- a/adapter/internal/operator/synchronizer/gql_api.go +++ b/adapter/internal/operator/synchronizer/gql_api.go @@ -27,43 +27,14 @@ import ( "github.com/wso2/apk/adapter/internal/discovery/xds/common" "github.com/wso2/apk/adapter/internal/loggers" "github.com/wso2/apk/adapter/internal/oasparser/model" - "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" "k8s.io/apimachinery/pkg/types" gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) -// deployGQLAPIInGateway deploys the related API in CREATE and UPDATE events. -func deployGQLAPIInGateway(apiState APIState) error { - var err error - if len(apiState.OldOrganizationID) != 0 { - xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) - } - if apiState.ProdGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - if apiState.ProdGQLRoute != nil { - _, err = generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) - } - if err != nil { - return err - } - if apiState.SandGQLRoute != nil { - _, err = generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) - } - return err -} - // generateGQLAdapterInternalAPI this will populate a AdapterInternalAPI representation for an GQLRoute -func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, error) { +func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, map[string]struct{}, error) { var adapterInternalAPI model.AdapterInternalAPI adapterInternalAPI.SetIsDefaultVersion(apiState.APIDefinition.Spec.IsDefaultVersion) adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) @@ -92,9 +63,12 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e ResourceRateLimitPolicies: apiState.ResourceRateLimitPolicies, } if err := adapterInternalAPI.SetInfoGQLRouteCR(gqlRoute.GQLRouteCombined, resourceParams); err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, - "Error setting GQLRoute CR info to adapterInternalAPI. %v", err)) - return nil, err + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting GQLRoute CR info to adapterInternalAPI. %v", err)) + return nil, nil, err + } + if err := adapterInternalAPI.Validate(); err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2632, logging.MAJOR, "Error validating adapterInternalAPI intermediate representation. %v", err)) + return nil, nil, err } vHosts := getVhostsForGQLAPI(gqlRoute.GQLRouteCombined) labels := getLabelsForGQLAPI(gqlRoute.GQLRouteCombined) @@ -103,19 +77,25 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e if len(listeners) == 0 || len(relativeSectionNames) == 0 { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MINOR, "Failed to find a matching listener for gql route: %v. ", gqlRoute.GQLRouteCombined.Name)) - return nil, errors.New("failed to find matching listener name for the provided gql route") + return nil, nil, errors.New("failed to find matching listener name for the provided gql route") } + + updatedLabelsMap := make(map[string]struct{}) listenerName := listeners[0] sectionName := relativeSectionNames[0] if len(listeners) != 0 { - err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) + updatedLabels, err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), vHosts, adapterInternalAPI.UUID, err)) + return nil, nil, err + } + for newLabel := range updatedLabels { + updatedLabelsMap[newLabel] = struct{}{} } } - return &adapterInternalAPI, nil + return &adapterInternalAPI, updatedLabelsMap, nil } // getVhostForAPI returns the vHosts related to an API. diff --git a/adapter/internal/operator/synchronizer/rest_api.go b/adapter/internal/operator/synchronizer/rest_api.go index a3137f858..652e6ac7a 100644 --- a/adapter/internal/operator/synchronizer/rest_api.go +++ b/adapter/internal/operator/synchronizer/rest_api.go @@ -27,40 +27,11 @@ import ( "github.com/wso2/apk/adapter/internal/discovery/xds/common" "github.com/wso2/apk/adapter/internal/loggers" "github.com/wso2/apk/adapter/internal/oasparser/model" - "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/pkg/logging" "k8s.io/apimachinery/pkg/types" gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) -// deployRestAPIInGateway deploys the related API in CREATE and UPDATE events. -func deployRestAPIInGateway(apiState APIState) error { - var err error - if len(apiState.OldOrganizationID) != 0 { - xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) - } - if apiState.ProdHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - if apiState.ProdHTTPRoute != nil { - _, err = GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) - } - if err != nil { - return err - } - if apiState.SandHTTPRoute != nil { - _, err = GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) - } - return err -} - // undeployAPIInGateway undeploys the related API in CREATE and UPDATE events. func undeployRestAPIInGateway(apiState APIState) error { var err error @@ -80,7 +51,7 @@ func undeployRestAPIInGateway(apiState APIState) error { } // GenerateAdapterInternalAPI this will populate a AdapterInternalAPI representation for an HTTPRoute -func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, envType string) (*model.AdapterInternalAPI, error) { +func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, envType string) (*model.AdapterInternalAPI, map[string]struct{}, error) { var adapterInternalAPI model.AdapterInternalAPI adapterInternalAPI.SetIsDefaultVersion(apiState.APIDefinition.Spec.IsDefaultVersion) adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) @@ -110,11 +81,11 @@ func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, en } if err := adapterInternalAPI.SetInfoHTTPRouteCR(httpRoute.HTTPRouteCombined, resourceParams); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting HttpRoute CR info to adapterInternalAPI. %v", err)) - return nil, err + return nil, nil, err } if err := adapterInternalAPI.Validate(); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2632, logging.MAJOR, "Error validating adapterInternalAPI intermediate representation. %v", err)) - return nil, err + return nil, nil, err } vHosts := getVhostsForAPI(httpRoute.HTTPRouteCombined) labels := getLabelsForAPI(httpRoute.HTTPRouteCombined) @@ -123,19 +94,25 @@ func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, en if len(listeners) == 0 || len(relativeSectionNames) == 0 { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MINOR, "Failed to find a matching listener for http route: %v. ", httpRoute.HTTPRouteCombined.Name)) - return nil, errors.New("failed to find matching listener name for the provided http route") + return nil, nil, errors.New("failed to find matching listener name for the provided http route") } + + updatedLabelsMap := make(map[string]struct{}) listenerName := listeners[0] sectionName := relativeSectionNames[0] if len(listeners) != 0 { - err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) + updatedLabels, err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), vHosts, adapterInternalAPI.UUID, err)) + return nil, nil, err + } + for newLabel := range updatedLabels { + updatedLabelsMap[newLabel] = struct{}{} } } - return &adapterInternalAPI, nil + return &adapterInternalAPI, updatedLabelsMap, nil } // getVhostForAPI returns the vHosts related to an API. diff --git a/adapter/internal/operator/synchronizer/synchronizer.go b/adapter/internal/operator/synchronizer/synchronizer.go index e876c6804..6eb6d5192 100644 --- a/adapter/internal/operator/synchronizer/synchronizer.go +++ b/adapter/internal/operator/synchronizer/synchronizer.go @@ -28,9 +28,10 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/wso2/apk/adapter/config" + "github.com/wso2/apk/adapter/internal/discovery/xds" "github.com/wso2/apk/adapter/internal/loggers" + "github.com/wso2/apk/adapter/internal/oasparser/model" "github.com/wso2/apk/adapter/internal/operator/constants" - "github.com/wso2/apk/adapter/internal/operator/utils" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/adapter/pkg/utils/tlsutils" ) @@ -40,7 +41,7 @@ import ( // go routine. type APIEvent struct { EventType string - Event APIState + Events []APIState UpdatedEvents []string } @@ -57,36 +58,38 @@ var ( ) func init() { - paritionCh = make(chan APIEvent, 10) + if config.ReadConfigs().PartitionServer.Enabled { + paritionCh = make(chan APIEvent, 10) + } } // HandleAPILifeCycleEvents handles the API events generated from OperatorDataStore func HandleAPILifeCycleEvents(ch *chan APIEvent, successChannel *chan SuccessEvent) { loggers.LoggerAPKOperator.Info("Operator synchronizer listening for API lifecycle events...") for event := range *ch { - if event.Event.APIDefinition == nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2628, logging.CRITICAL, "API Event is nil")) - } - loggers.LoggerAPKOperator.Infof("%s event received for %v", event.EventType, event.Event.APIDefinition.Name) var err error switch event.EventType { case constants.Delete: - err = undeployAPIInGateway(event.Event) + loggers.LoggerAPKOperator.Infof("Delete event received for %v", event.Events[0].APIDefinition.Name) + err = undeployAPIInGateway(event.Events[0]) case constants.Create: - err = deployAPIInGateway(event.Event) + loggers.LoggerAPKOperator.Infof("Create event received for %v", event.Events[0].APIDefinition.Name) + deployMultipleAPIsInGateway(event.Events) case constants.Update: - err = deployAPIInGateway(event.Event) + loggers.LoggerAPKOperator.Infof("Update event received for %v", event.Events[0].APIDefinition.Name) + err = deployAPIInGateway(event.Events[0]) } if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2629, logging.MAJOR, "API deployment failed for %s event : %v", event.EventType, err)) - } else { - if event.EventType != constants.Delete { - *successChannel <- SuccessEvent{ - APINamespacedName: utils.NamespacedName(event.Event.APIDefinition), - State: event.EventType, - Events: event.UpdatedEvents, - } - } + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2629, logging.CRITICAL, "API deployment failed for %s event : %v", event.EventType, err)) + } else if event.EventType != constants.Create { + // TODO(amali) commented out because there was no usage for this + // if event.EventType != constants.Delete && event.EventType != constants.Create { + // *successChannel <- SuccessEvent{ + // APINamespacedName: utils.NamespacedName(event.Events[0].APIDefinition), + // State: event.EventType, + // Events: event.UpdatedEvents, + // } + // } if config.ReadConfigs().PartitionServer.Enabled { paritionCh <- event } @@ -94,24 +97,184 @@ func HandleAPILifeCycleEvents(ch *chan APIEvent, successChannel *chan SuccessEve } } -// deployAPIInGateway deploys the related API in CREATE and UPDATE events. -func deployAPIInGateway(apiState APIState) error { +func undeployAPIInGateway(apiState APIState) error { if apiState.APIDefinition.Spec.APIType == "REST" { - return deployRestAPIInGateway(apiState) + return undeployRestAPIInGateway(apiState) } if apiState.APIDefinition.Spec.APIType == "GraphQL" { - return deployGQLAPIInGateway(apiState) + return undeployGQLAPIInGateway(apiState) } return nil } -func undeployAPIInGateway(apiState APIState) error { +// deployMultipleAPIsInGateway deploys the related API in CREATE and UPDATE events. +func deployMultipleAPIsInGateway(apiStates []APIState) { + updatedLabelsMap := make(map[string]struct{}) + for _, apiState := range apiStates { + if len(apiState.OldOrganizationID) != 0 { + xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) + } + if apiState.APIDefinition.Spec.APIType == "REST" { + if apiState.ProdHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + + if apiState.ProdHTTPRoute != nil { + _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, + "Error deploying prod httpRoute of API : %v in Organization %v from environments %v. Error: %v", + string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, + getLabelsForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined), err)) + continue + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + + if apiState.SandHTTPRoute != nil { + _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2666, logging.CRITICAL, + "Error deploying sand httpRoute of API : %v in Organization %v from environments %v. Error: %v", + string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, + getLabelsForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined), err)) + continue + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + } + + if apiState.APIDefinition.Spec.APIType == "GraphQL" { + if apiState.ProdGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + if apiState.ProdGQLRoute != nil { + _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, + "Error deploying prod gqlRoute of API : %v in Organization %v from environments %v. Error: %v", + string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, + getLabelsForGQLAPI(apiState.ProdGQLRoute.GQLRouteCombined), err)) + continue + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + if apiState.SandGQLRoute != nil { + _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, + "Error deploying sand gqlRoute of API : %v in Organization %v from environments %v. Error: %v", + string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, + getLabelsForGQLAPI(apiState.SandGQLRoute.GQLRouteCombined), err)) + continue + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + } + if config.ReadConfigs().PartitionServer.Enabled { + apiEvent := APIEvent{ + EventType: constants.Create, + Events: []APIState{apiState}, + UpdatedEvents: []string{}, + } + paritionCh <- apiEvent + } + } + xds.UpdateXdsCacheOnAPIChange1(updatedLabelsMap) +} + +// deployAPIInGateway deploys the related API in CREATE and UPDATE events. +func deployAPIInGateway(apiState APIState) error { + updatedLabelsMap := make(map[string]struct{}) + if len(apiState.OldOrganizationID) != 0 { + xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) + } + if apiState.APIDefinition.Spec.APIType == "REST" { - return undeployRestAPIInGateway(apiState) + if apiState.ProdHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + if apiState.ProdHTTPRoute != nil { + _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) + if err != nil { + return err + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + + if apiState.SandHTTPRoute != nil { + _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) + if err != nil { + return err + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } } if apiState.APIDefinition.Spec.APIType == "GraphQL" { - return undeployGQLAPIInGateway(apiState) + if apiState.ProdGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + if apiState.ProdGQLRoute != nil { + _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) + if err != nil { + return err + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } + if apiState.SandGQLRoute != nil { + _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) + if err != nil { + return err + } + for label := range updatedLabels { + updatedLabelsMap[label] = struct{}{} + } + } } + + xds.UpdateXdsCacheOnAPIChange1(updatedLabelsMap) return nil } @@ -135,51 +298,54 @@ func init() { func SendEventToPartitionServer() { conf := config.ReadConfigs() for apiEvent := range paritionCh { - if !apiEvent.Event.APIDefinition.Spec.SystemAPI { - apiDefinition := apiEvent.Event.APIDefinition - loggers.LoggerAPKOperator.Infof("Sending API to APK management server: %v, API_UUID: %v", apiDefinition.Spec.APIName, string(apiDefinition.ObjectMeta.UID)) - api := apiEvent.Event - eventType := apiEvent.EventType - basePath := api.APIDefinition.Spec.BasePath - organization := api.APIDefinition.Spec.Organization - version := api.APIDefinition.Spec.APIVersion - apiName := api.APIDefinition.Spec.APIName - apiUUID := string(api.APIDefinition.Name) - var hostNames []string - httpRoute := api.ProdHTTPRoute - if httpRoute == nil { - httpRoute = api.SandHTTPRoute - } - for _, hostName := range httpRoute.HTTPRouteCombined.Spec.Hostnames { - hostNames = append(hostNames, string(hostName)) - } - data := PartitionEvent{ - EventType: eventType, - BasePath: basePath, - Organization: organization, - APIVersion: version, - APIName: apiName, - APIUUID: apiUUID, - Vhosts: hostNames, - Partition: conf.PartitionServer.PartitionName, - } - payload, err := json.Marshal(data) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error creating Event: %v, API_UUID: %v", err, apiUUID) - } - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s%s", conf.PartitionServer.Host, conf.PartitionServer.ServiceBasePath, "/api-deployment"), bytes.NewBuffer(payload)) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error creating api definition request: %v, API_UUID: %v", err, apiUUID) - } - req.Header.Set("Content-Type", "application/json; charset=UTF-8") - resp, err := partitionClient.Do(req) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error sending API Event: %v, API_UUID: %v", err, apiUUID) - } - if resp.StatusCode == http.StatusAccepted { - loggers.LoggerAPKOperator.Info("API Event Accepted", resp.Status) + for _, event := range apiEvent.Events { + if !event.APIDefinition.Spec.SystemAPI { + apiDefinition := event.APIDefinition + loggers.LoggerAPKOperator.Infof("Sending API to APK management server: %v, API_UUID: %v", apiDefinition.Spec.APIName, string(apiDefinition.ObjectMeta.UID)) + api := event + eventType := apiEvent.EventType + basePath := api.APIDefinition.Spec.BasePath + organization := api.APIDefinition.Spec.Organization + version := api.APIDefinition.Spec.APIVersion + apiName := api.APIDefinition.Spec.APIName + apiUUID := string(api.APIDefinition.Name) + var hostNames []string + httpRoute := api.ProdHTTPRoute + if httpRoute == nil { + httpRoute = api.SandHTTPRoute + } + for _, hostName := range httpRoute.HTTPRouteCombined.Spec.Hostnames { + hostNames = append(hostNames, string(hostName)) + } + data := PartitionEvent{ + EventType: eventType, + BasePath: basePath, + Organization: organization, + APIVersion: version, + APIName: apiName, + APIUUID: apiUUID, + Vhosts: hostNames, + Partition: conf.PartitionServer.PartitionName, + } + payload, err := json.Marshal(data) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error creating Event: %v, API_UUID: %v", err, apiUUID) + } + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s%s", conf.PartitionServer.Host, conf.PartitionServer.ServiceBasePath, "/api-deployment"), bytes.NewBuffer(payload)) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error creating api definition request: %v, API_UUID: %v", err, apiUUID) + } + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + resp, err := partitionClient.Do(req) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error sending API Event: %v, API_UUID: %v", err, apiUUID) + } + if resp.StatusCode == http.StatusAccepted { + loggers.LoggerAPKOperator.Info("API Event Accepted", resp.Status) + } } } + } }