diff --git a/adapter/go.mod b/adapter/go.mod index cc70f65ac..1f923cb79 100644 --- a/adapter/go.mod +++ b/adapter/go.mod @@ -2,6 +2,8 @@ module github.com/wso2/apk/adapter go 1.19 +replace github.com/wso2/apk/common-go-libs => ../common-go-libs + require ( github.com/envoyproxy/go-control-plane v0.11.2-0.20230802074621-eea0b3bd0f81 github.com/fsnotify/fsnotify v1.6.0 diff --git a/adapter/internal/discovery/xds/server.go b/adapter/internal/discovery/xds/server.go index e12fb0010..098f2fdfd 100644 --- a/adapter/internal/discovery/xds/server.go +++ b/adapter/internal/discovery/xds/server.go @@ -110,7 +110,7 @@ var ( listenerToRouteArrayMap map[string][]*routev3.Route // Listener -> Routes map // Common Enforcer Label as map key - // This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future. + // TODO(amali) This doesn't have a usage yet. It will be used to handle multiple enforcer labels in future. enforcerLabelMap map[string]*EnforcerInternalAPI // Enforcer Label -> EnforcerInternalAPI struct map // KeyManagerList to store data @@ -260,28 +260,26 @@ func deleteAPI(apiIdentifier string, environments []string, organizationID strin existingLabels := orgAPIMap[organizationID][apiIdentifier].envoyLabels toBeDelEnvs, toBeKeptEnvs := getEnvironmentsToBeDeleted(existingLabels, environments) - var isAllowedToDelete bool - updatedLabelsMap := make(map[string]struct{}) for _, val := range toBeDelEnvs { - updatedLabelsMap[val] = struct{}{} - if stringutils.StringInSlice(val, existingLabels) { - isAllowedToDelete = true - } - } - if isAllowedToDelete { - // do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs - orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs - if len(toBeKeptEnvs) != 0 { - UpdateXdsCacheOnAPIChange(updatedLabelsMap) + isAllowedToDelete := stringutils.StringInSlice(val, existingLabels) + if isAllowedToDelete { + // do not delete from all environments, hence do not clear routes, clusters, endpoints, enforcerAPIs + orgAPIMap[organizationID][apiIdentifier].envoyLabels = toBeKeptEnvs + updateXdsCacheOnAPIChange(toBeDelEnvs, []string{}) + if len(toBeKeptEnvs) != 0 { + return nil + } + logger.LoggerXds.Infof("API identifier: %v does not have any gateways. Hence deleting the API from label : %s. API_UUID: %v", + apiIdentifier, val, apiUUID) + cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs) return nil } } //clean maps of routes, clusters, endpoints, enforcerAPIs - if len(environments) == 0 || isAllowedToDelete { + if len(environments) == 0 { cleanMapResources(apiIdentifier, organizationID, toBeDelEnvs) } - UpdateXdsCacheOnAPIChange(updatedLabelsMap) return nil } @@ -290,6 +288,11 @@ func cleanMapResources(apiIdentifier string, organizationID string, toBeDelEnvs delete(orgAPIMap[organizationID], apiIdentifier) } + //updateXdsCacheOnAPIAdd is called after cleaning maps of routes, clusters, endpoints, enforcerAPIs. + //Therefore resources that belongs to the deleting API do not exist. Caches updated only with + //resources that belongs to the remaining APIs + updateXdsCacheOnAPIChange(toBeDelEnvs, []string{}) + deleteBasepathForVHost(organizationID, apiIdentifier) //TODO: (SuKSW) clean any remaining in label wise maps, if this is the last API of that label logger.LoggerXds.Infof("Deleted API %v of organization %v", apiIdentifier, organizationID) @@ -307,17 +310,17 @@ func deleteBasepathForVHost(organizationID, apiIdentifier string) { } } -// UpdateXdsCacheOnAPIChange when this method is called, openAPIEnvoy map is updated. +// when this method is called, openAPIEnvoy map is updated. // Old labels refers to the previously assigned labels // New labels refers to the the updated labels -func UpdateXdsCacheOnAPIChange(labels map[string]struct{}) bool { +func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool { revisionStatus := false // TODO: (VirajSalaka) check possible optimizations, Since the number of labels are low by design it should not be an issue - for newLabel := range labels { + for _, newLabel := range newLabels { listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(newLabel) UpdateEnforcerApis(newLabel, apis, "") success := UpdateXdsCacheWithLock(newLabel, endpoints, clusters, routes, listeners) - logger.LoggerXds.Debugf("Xds Cache is updated for the label : %v", newLabel) + logger.LoggerXds.Debugf("Xds Cache is updated for the newly added label : %v", newLabel) if success { // if even one label was updated with latest revision, we take the revision as deployed. // (other labels also will get updated successfully) @@ -325,6 +328,15 @@ func UpdateXdsCacheOnAPIChange(labels map[string]struct{}) bool { continue } } + for _, oldLabel := range oldLabels { + if !stringutils.StringInSlice(oldLabel, newLabels) { + listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel) + GenerateEnvoyResoucesForGateway(oldLabel) + UpdateEnforcerApis(oldLabel, apis, "") + UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners) + logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel) + } + } return revisionStatus } @@ -672,8 +684,7 @@ func RemoveAPIFromOrgAPIMap(uuid string, orgID string) { } // UpdateAPICache updates the xDS cache related to the API Lifecycle event. -func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, - adapterInternalAPI model.AdapterInternalAPI) (map[string]struct{}, error) { +func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectionName string, adapterInternalAPI model.AdapterInternalAPI) error { mutexForInternalMapUpdate.Lock() defer mutexForInternalMapUpdate.Unlock() @@ -688,45 +699,37 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio orgIDAPIvHostsMap[adapterInternalAPI.GetOrganizationID()] = vHostsMap } - updatedLabelsMap := make(map[string]struct{}, 0) - // Remove internal mappigs for old vHosts for _, oldvhost := range oldvHosts { apiIdentifier := GenerateIdentifierForAPIWithUUID(oldvhost, adapterInternalAPI.UUID) + var oldLabels []string if orgMap, orgExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists { if _, apiExists := orgMap[apiIdentifier]; apiExists { - for _, oldLabel := range orgMap[apiIdentifier].envoyLabels { - updatedLabelsMap[oldLabel] = struct{}{} - } + oldLabels = orgMap[apiIdentifier].envoyLabels delete(orgAPIMap[adapterInternalAPI.GetOrganizationID()], apiIdentifier) } } + updateXdsCacheOnAPIChange(oldLabels, newLabels) } - // Create internal mappings for new vHosts + // Create internal mappigs for new vHosts for _, vHost := range vHosts { logger.LoggerAPKOperator.Debugf("Creating internal mapping for vhost: %s", vHost) apiUUID := adapterInternalAPI.UUID apiIdentifier := GenerateIdentifierForAPIWithUUID(vHost, apiUUID) + var oldLabels []string var orgExists bool - - // get changing label set if _, orgExists = orgAPIMap[adapterInternalAPI.GetOrganizationID()]; orgExists { if _, apiExists := orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier]; apiExists { - for _, oldLabel := range orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels { - updatedLabelsMap[oldLabel] = struct{}{} - } + oldLabels = orgAPIMap[adapterInternalAPI.GetOrganizationID()][apiIdentifier].envoyLabels } } - for _, newLabel := range newLabels { - updatedLabelsMap[newLabel] = struct{}{} - } routes, clusters, endpoints, err := oasParser.GetRoutesClustersEndpoints(adapterInternalAPI, nil, vHost, adapterInternalAPI.GetOrganizationID()) if err != nil { - return nil, fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s", + return fmt.Errorf("error while deploying API. Name: %s Version: %s, OrgID: %s, API_UUID: %v, Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), adapterInternalAPI.GetOrganizationID(), apiUUID, err.Error()) } @@ -751,8 +754,11 @@ func UpdateAPICache(vHosts []string, newLabels []string, listener string, sectio } else { listenerToRouteArrayMap[listener] = routes } + + revisionStatus := updateXdsCacheOnAPIChange(oldLabels, newLabels) + logger.LoggerXds.Infof("Deployed Revision: %v:%v, API_UUID: %v", apiIdentifier, revisionStatus, apiUUID) } - return updatedLabelsMap, nil + return nil } // UpdateGatewayCache updates the xDS cache related to the Gateway Lifecycle event. diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go index b64eec0f0..6464f24ad 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/wso2/apk/adapter/internal/dataholder" - "github.com/wso2/apk/adapter/internal/discovery/xds" envoy "github.com/wso2/apk/adapter/internal/oasparser/envoyconf" "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/internal/operator/synchronizer" @@ -119,7 +118,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T } dataholder.UpdateGateway(gateway) - xds.SanitizeGateway("default-gateway", true) + httpRouteState.HTTPRouteCombined = &httpRoute backendMapping := make(map[string]*v1alpha1.ResolvedBackend) @@ -131,8 +130,7 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T apiState.ProdHTTPRoute = &httpRouteState - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -183,9 +181,8 @@ func TestGenerateAdapterInternalAPIForDefaultCase(t *testing.T) { apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0") httpRouteState := synchronizer.HTTPRouteState{} httpRouteState = *apiState.ProdHTTPRoute - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") assert.Equal(t, "Default", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.") } @@ -196,10 +193,8 @@ func TestGenerateAdapterInternalAPIForSpecificEnvironment(t *testing.T) { httpRouteState := synchronizer.HTTPRouteState{} httpRouteState = *apiState.ProdHTTPRoute apiState.APIDefinition.Spec.Environment = "dev" - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") assert.Equal(t, "dev", adapterInternalAPI.GetEnvironment(), "Environment is incorrect.") } @@ -346,10 +341,8 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") routes, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -480,10 +473,8 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.") @@ -808,10 +799,8 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 3, len(clusters), "Number of production clusters created is incorrect.") @@ -894,10 +883,8 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) { httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState - xds.SanitizeGateway("default-gateway", true) - adapterInternalAPI, labels, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) - assert.Equal(t, map[string]struct{}{"default-gateway": struct{}{}}, labels, "Labels are incorrect.") + adapterInternalAPI, err := synchronizer.GenerateAdapterInternalAPI(apiState, &httpRouteState, constants.Production) assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") _, clusters, _, _ := envoy.CreateRoutesWithClusters(*adapterInternalAPI, nil, "prod.gw.wso2.com", "carbon.super") assert.Equal(t, 2, len(clusters), "Number of production clusters created is incorrect.") diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index 0cf15a89d..f67a16399 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -244,7 +244,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ apiReconciler.ods.DeleteCachedAPI(req.NamespacedName) loggers.LoggerAPKOperator.Infof("Delete event received for API : %s with API UUID : %v, hence deleted from API cache", req.NamespacedName.String(), string(apiCR.ObjectMeta.UID)) - *apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Events: []synchronizer.APIState{apiState}} + *apiReconciler.ch <- synchronizer.APIEvent{EventType: constants.Delete, Event: apiState} return ctrl.Result{}, nil } loggers.LoggerAPKOperator.Warnf("Api CR related to the reconcile request with key: %s returned error. Assuming API with API UUID : %v is already deleted, hence ignoring the error : %v", @@ -272,20 +272,14 @@ func (apiReconciler *APIReconciler) applyStartupAPIs() { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2605, logging.CRITICAL, "Unable to list APIs: %v", err)) return } - combinedapiEvent := &synchronizer.APIEvent{ - EventType: constants.Create, - Events: make([]synchronizer.APIState, 0), - } for _, api := range apisList { if apiState, err := apiReconciler.resolveAPIRefs(ctx, api); err != nil { loggers.LoggerAPKOperator.Warnf("Error retrieving ref CRs for API : %s in namespace : %s with API UUID : %v, %v", api.Name, api.Namespace, string(api.ObjectMeta.UID), err) } else if apiState != nil { - combinedapiEvent.Events = append(combinedapiEvent.Events, apiState.Events...) + *apiReconciler.ch <- *apiState } } - // Send all the API events to the channel - *apiReconciler.ch <- *combinedapiEvent xds.SetReady() loggers.LoggerAPKOperator.Info("Initial APIs were deployed successfully") } @@ -411,13 +405,13 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 if !api.Status.DeploymentStatus.Accepted { apiReconciler.ods.AddAPIState(apiRef, apiState) - return &synchronizer.APIEvent{EventType: constants.Create, Events: []synchronizer.APIState{*apiState}, UpdatedEvents: []string{}}, nil + return &synchronizer.APIEvent{EventType: constants.Create, Event: *apiState, UpdatedEvents: []string{}}, nil } else if cachedAPI, events, updated := apiReconciler.ods.UpdateAPIState(apiRef, apiState); updated { apiReconciler.removeOldOwnerRefs(ctx, cachedAPI) loggers.LoggerAPI.Infof("API CR %s with API UUID : %v is updated on %v", apiRef.String(), string(api.ObjectMeta.UID), events) - return &synchronizer.APIEvent{EventType: constants.Update, Events: []synchronizer.APIState{cachedAPI}, UpdatedEvents: events}, nil + return &synchronizer.APIEvent{EventType: constants.Update, Event: cachedAPI, UpdatedEvents: events}, nil } return nil, nil diff --git a/adapter/internal/operator/synchronizer/gql_api.go b/adapter/internal/operator/synchronizer/gql_api.go index bae3652f2..5a2b387b1 100644 --- a/adapter/internal/operator/synchronizer/gql_api.go +++ b/adapter/internal/operator/synchronizer/gql_api.go @@ -27,14 +27,43 @@ import ( "github.com/wso2/apk/adapter/internal/discovery/xds/common" "github.com/wso2/apk/adapter/internal/loggers" "github.com/wso2/apk/adapter/internal/oasparser/model" + "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" "k8s.io/apimachinery/pkg/types" gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) +// deployGQLAPIInGateway deploys the related API in CREATE and UPDATE events. +func deployGQLAPIInGateway(apiState APIState) error { + var err error + if len(apiState.OldOrganizationID) != 0 { + xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) + } + if apiState.ProdGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandGQLRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + if apiState.ProdGQLRoute != nil { + _, err = generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) + } + if err != nil { + return err + } + if apiState.SandGQLRoute != nil { + _, err = generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) + } + return err +} + // generateGQLAdapterInternalAPI this will populate a AdapterInternalAPI representation for an GQLRoute -func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, map[string]struct{}, error) { +func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, envType string) (*model.AdapterInternalAPI, error) { var adapterInternalAPI model.AdapterInternalAPI adapterInternalAPI.SetIsDefaultVersion(apiState.APIDefinition.Spec.IsDefaultVersion) adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) @@ -63,12 +92,9 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e ResourceRateLimitPolicies: apiState.ResourceRateLimitPolicies, } if err := adapterInternalAPI.SetInfoGQLRouteCR(gqlRoute.GQLRouteCombined, resourceParams); err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting GQLRoute CR info to adapterInternalAPI. %v", err)) - return nil, nil, err - } - if err := adapterInternalAPI.Validate(); err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2632, logging.MAJOR, "Error validating adapterInternalAPI intermediate representation. %v", err)) - return nil, nil, err + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, + "Error setting GQLRoute CR info to adapterInternalAPI. %v", err)) + return nil, err } vHosts := getVhostsForGQLAPI(gqlRoute.GQLRouteCombined) labels := getLabelsForGQLAPI(gqlRoute.GQLRouteCombined) @@ -77,25 +103,19 @@ func generateGQLAdapterInternalAPI(apiState APIState, gqlRoute *GQLRouteState, e if len(listeners) == 0 || len(relativeSectionNames) == 0 { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MINOR, "Failed to find a matching listener for gql route: %v. ", gqlRoute.GQLRouteCombined.Name)) - return nil, nil, errors.New("failed to find matching listener name for the provided gql route") + return nil, errors.New("failed to find matching listener name for the provided gql route") } - - updatedLabelsMap := make(map[string]struct{}) listenerName := listeners[0] sectionName := relativeSectionNames[0] if len(listeners) != 0 { - updatedLabels, err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) + err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), vHosts, adapterInternalAPI.UUID, err)) - return nil, nil, err - } - for newLabel := range updatedLabels { - updatedLabelsMap[newLabel] = struct{}{} } } - return &adapterInternalAPI, updatedLabelsMap, nil + return &adapterInternalAPI, nil } // getVhostForAPI returns the vHosts related to an API. diff --git a/adapter/internal/operator/synchronizer/rest_api.go b/adapter/internal/operator/synchronizer/rest_api.go index bb968145c..2450f8c92 100644 --- a/adapter/internal/operator/synchronizer/rest_api.go +++ b/adapter/internal/operator/synchronizer/rest_api.go @@ -27,11 +27,40 @@ import ( "github.com/wso2/apk/adapter/internal/discovery/xds/common" "github.com/wso2/apk/adapter/internal/loggers" "github.com/wso2/apk/adapter/internal/oasparser/model" + "github.com/wso2/apk/adapter/internal/operator/constants" "github.com/wso2/apk/adapter/pkg/logging" "k8s.io/apimachinery/pkg/types" gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) +// deployRestAPIInGateway deploys the related API in CREATE and UPDATE events. +func deployRestAPIInGateway(apiState APIState) error { + var err error + if len(apiState.OldOrganizationID) != 0 { + xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) + } + if apiState.ProdHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) + } + if apiState.SandHTTPRoute == nil { + var adapterInternalAPI model.AdapterInternalAPI + adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) + xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) + } + if apiState.ProdHTTPRoute != nil { + _, err = GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) + } + if err != nil { + return err + } + if apiState.SandHTTPRoute != nil { + _, err = GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) + } + return err +} + // undeployAPIInGateway undeploys the related API in CREATE and UPDATE events. func undeployRestAPIInGateway(apiState APIState) error { var err error @@ -51,7 +80,7 @@ func undeployRestAPIInGateway(apiState APIState) error { } // GenerateAdapterInternalAPI this will populate a AdapterInternalAPI representation for an HTTPRoute -func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, envType string) (*model.AdapterInternalAPI, map[string]struct{}, error) { +func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, envType string) (*model.AdapterInternalAPI, error) { var adapterInternalAPI model.AdapterInternalAPI adapterInternalAPI.SetIsDefaultVersion(apiState.APIDefinition.Spec.IsDefaultVersion) adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) @@ -88,11 +117,11 @@ func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, en } if err := adapterInternalAPI.SetInfoHTTPRouteCR(httpRoute.HTTPRouteCombined, resourceParams); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting HttpRoute CR info to adapterInternalAPI. %v", err)) - return nil, nil, err + return nil, err } if err := adapterInternalAPI.Validate(); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2632, logging.MAJOR, "Error validating adapterInternalAPI intermediate representation. %v", err)) - return nil, nil, err + return nil, err } vHosts := getVhostsForAPI(httpRoute.HTTPRouteCombined) labels := getLabelsForAPI(httpRoute.HTTPRouteCombined) @@ -101,25 +130,19 @@ func GenerateAdapterInternalAPI(apiState APIState, httpRoute *HTTPRouteState, en if len(listeners) == 0 || len(relativeSectionNames) == 0 { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MINOR, "Failed to find a matching listener for http route: %v. ", httpRoute.HTTPRouteCombined.Name)) - return nil, nil, errors.New("failed to find matching listener name for the provided http route") + return nil, errors.New("failed to find matching listener name for the provided http route") } - - updatedLabelsMap := make(map[string]struct{}) listenerName := listeners[0] sectionName := relativeSectionNames[0] if len(listeners) != 0 { - updatedLabels, err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) + err := xds.UpdateAPICache(vHosts, labels, listenerName, sectionName, adapterInternalAPI) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2633, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), vHosts, adapterInternalAPI.UUID, err)) - return nil, nil, err - } - for newLabel := range updatedLabels { - updatedLabelsMap[newLabel] = struct{}{} } } - return &adapterInternalAPI, updatedLabelsMap, nil + return &adapterInternalAPI, nil } // getVhostForAPI returns the vHosts related to an API. diff --git a/adapter/internal/operator/synchronizer/synchronizer.go b/adapter/internal/operator/synchronizer/synchronizer.go index b2e333173..e876c6804 100644 --- a/adapter/internal/operator/synchronizer/synchronizer.go +++ b/adapter/internal/operator/synchronizer/synchronizer.go @@ -28,10 +28,9 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/wso2/apk/adapter/config" - "github.com/wso2/apk/adapter/internal/discovery/xds" "github.com/wso2/apk/adapter/internal/loggers" - "github.com/wso2/apk/adapter/internal/oasparser/model" "github.com/wso2/apk/adapter/internal/operator/constants" + "github.com/wso2/apk/adapter/internal/operator/utils" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/adapter/pkg/utils/tlsutils" ) @@ -41,7 +40,7 @@ import ( // go routine. type APIEvent struct { EventType string - Events []APIState + Event APIState UpdatedEvents []string } @@ -58,38 +57,36 @@ var ( ) func init() { - if config.ReadConfigs().PartitionServer.Enabled { - paritionCh = make(chan APIEvent, 10) - } + paritionCh = make(chan APIEvent, 10) } // HandleAPILifeCycleEvents handles the API events generated from OperatorDataStore func HandleAPILifeCycleEvents(ch *chan APIEvent, successChannel *chan SuccessEvent) { loggers.LoggerAPKOperator.Info("Operator synchronizer listening for API lifecycle events...") for event := range *ch { + if event.Event.APIDefinition == nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2628, logging.CRITICAL, "API Event is nil")) + } + loggers.LoggerAPKOperator.Infof("%s event received for %v", event.EventType, event.Event.APIDefinition.Name) var err error switch event.EventType { case constants.Delete: - loggers.LoggerAPKOperator.Infof("Delete event received for %v", event.Events[0].APIDefinition.Name) - err = undeployAPIInGateway(event.Events[0]) + err = undeployAPIInGateway(event.Event) case constants.Create: - loggers.LoggerAPKOperator.Infof("Create event received for %v", event.Events[0].APIDefinition.Name) - deployMultipleAPIsInGateway(event.Events) + err = deployAPIInGateway(event.Event) case constants.Update: - loggers.LoggerAPKOperator.Infof("Update event received for %v", event.Events[0].APIDefinition.Name) - err = deployAPIInGateway(event.Events[0]) + err = deployAPIInGateway(event.Event) } if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2629, logging.CRITICAL, "API deployment failed for %s event : %v", event.EventType, err)) - } else if event.EventType != constants.Create { - // TODO(amali) commented out because there was no usage for this - // if event.EventType != constants.Delete && event.EventType != constants.Create { - // *successChannel <- SuccessEvent{ - // APINamespacedName: utils.NamespacedName(event.Events[0].APIDefinition), - // State: event.EventType, - // Events: event.UpdatedEvents, - // } - // } + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2629, logging.MAJOR, "API deployment failed for %s event : %v", event.EventType, err)) + } else { + if event.EventType != constants.Delete { + *successChannel <- SuccessEvent{ + APINamespacedName: utils.NamespacedName(event.Event.APIDefinition), + State: event.EventType, + Events: event.UpdatedEvents, + } + } if config.ReadConfigs().PartitionServer.Enabled { paritionCh <- event } @@ -97,185 +94,24 @@ func HandleAPILifeCycleEvents(ch *chan APIEvent, successChannel *chan SuccessEve } } -func undeployAPIInGateway(apiState APIState) error { +// deployAPIInGateway deploys the related API in CREATE and UPDATE events. +func deployAPIInGateway(apiState APIState) error { if apiState.APIDefinition.Spec.APIType == "REST" { - return undeployRestAPIInGateway(apiState) + return deployRestAPIInGateway(apiState) } if apiState.APIDefinition.Spec.APIType == "GraphQL" { - return undeployGQLAPIInGateway(apiState) + return deployGQLAPIInGateway(apiState) } return nil } -// deployMultipleAPIsInGateway deploys the related API in CREATE and UPDATE events. -func deployMultipleAPIsInGateway(apiStates []APIState) { - updatedLabelsMap := make(map[string]struct{}) - for _, apiState := range apiStates { - if len(apiState.OldOrganizationID) != 0 { - xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) - } - if apiState.APIDefinition.Spec.APIType == "REST" { - if apiState.ProdHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - - if apiState.ProdHTTPRoute != nil { - _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) - if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, - "Error deploying prod httpRoute of API : %v in Organization %v from environments %v. Error: %v", - string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, - getLabelsForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined), err)) - continue - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - - if apiState.SandHTTPRoute != nil { - _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) - if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2666, logging.CRITICAL, - "Error deploying sand httpRoute of API : %v in Organization %v from environments %v. Error: %v", - string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, - getLabelsForAPI(apiState.ProdHTTPRoute.HTTPRouteCombined), err)) - continue - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - } - - if apiState.APIDefinition.Spec.APIType == "GraphQL" { - if apiState.ProdGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - if apiState.ProdGQLRoute != nil { - _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) - if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, - "Error deploying prod gqlRoute of API : %v in Organization %v from environments %v. Error: %v", - string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, - getLabelsForGQLAPI(apiState.ProdGQLRoute.GQLRouteCombined), err)) - continue - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - if apiState.SandGQLRoute != nil { - _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) - if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2665, logging.CRITICAL, - "Error deploying sand gqlRoute of API : %v in Organization %v from environments %v. Error: %v", - string(apiState.APIDefinition.Spec.APIName), apiState.APIDefinition.Spec.Organization, - getLabelsForGQLAPI(apiState.SandGQLRoute.GQLRouteCombined), err)) - continue - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - } - if config.ReadConfigs().PartitionServer.Enabled { - apiEvent := APIEvent{ - EventType: constants.Create, - Events: []APIState{apiState}, - UpdatedEvents: []string{}, - } - paritionCh <- apiEvent - } - } - //TODO(amali) only update status if this is successful - xds.UpdateXdsCacheOnAPIChange(updatedLabelsMap) -} - -// deployAPIInGateway deploys the related API in CREATE and UPDATE events. -func deployAPIInGateway(apiState APIState) error { - updatedLabelsMap := make(map[string]struct{}) - if len(apiState.OldOrganizationID) != 0 { - xds.RemoveAPIFromOrgAPIMap(string((*apiState.APIDefinition).ObjectMeta.UID), apiState.OldOrganizationID) - } - +func undeployAPIInGateway(apiState APIState) error { if apiState.APIDefinition.Spec.APIType == "REST" { - if apiState.ProdHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandHTTPRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - if apiState.ProdHTTPRoute != nil { - _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.ProdHTTPRoute, constants.Production) - if err != nil { - return err - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - - if apiState.SandHTTPRoute != nil { - _, updatedLabels, err := GenerateAdapterInternalAPI(apiState, apiState.SandHTTPRoute, constants.Sandbox) - if err != nil { - return err - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } + return undeployRestAPIInGateway(apiState) } if apiState.APIDefinition.Spec.APIType == "GraphQL" { - if apiState.ProdGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Production) - } - if apiState.SandGQLRoute == nil { - var adapterInternalAPI model.AdapterInternalAPI - adapterInternalAPI.SetInfoAPICR(*apiState.APIDefinition) - xds.RemoveAPICacheForEnv(adapterInternalAPI, constants.Sandbox) - } - if apiState.ProdGQLRoute != nil { - _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.ProdGQLRoute, constants.Production) - if err != nil { - return err - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } - if apiState.SandGQLRoute != nil { - _, updatedLabels, err := generateGQLAdapterInternalAPI(apiState, apiState.SandGQLRoute, constants.Sandbox) - if err != nil { - return err - } - for label := range updatedLabels { - updatedLabelsMap[label] = struct{}{} - } - } + return undeployGQLAPIInGateway(apiState) } - - xds.UpdateXdsCacheOnAPIChange(updatedLabelsMap) return nil } @@ -299,54 +135,51 @@ func init() { func SendEventToPartitionServer() { conf := config.ReadConfigs() for apiEvent := range paritionCh { - for _, event := range apiEvent.Events { - if !event.APIDefinition.Spec.SystemAPI { - apiDefinition := event.APIDefinition - loggers.LoggerAPKOperator.Infof("Sending API to APK management server: %v, API_UUID: %v", apiDefinition.Spec.APIName, string(apiDefinition.ObjectMeta.UID)) - api := event - eventType := apiEvent.EventType - basePath := api.APIDefinition.Spec.BasePath - organization := api.APIDefinition.Spec.Organization - version := api.APIDefinition.Spec.APIVersion - apiName := api.APIDefinition.Spec.APIName - apiUUID := string(api.APIDefinition.Name) - var hostNames []string - httpRoute := api.ProdHTTPRoute - if httpRoute == nil { - httpRoute = api.SandHTTPRoute - } - for _, hostName := range httpRoute.HTTPRouteCombined.Spec.Hostnames { - hostNames = append(hostNames, string(hostName)) - } - data := PartitionEvent{ - EventType: eventType, - BasePath: basePath, - Organization: organization, - APIVersion: version, - APIName: apiName, - APIUUID: apiUUID, - Vhosts: hostNames, - Partition: conf.PartitionServer.PartitionName, - } - payload, err := json.Marshal(data) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error creating Event: %v, API_UUID: %v", err, apiUUID) - } - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s%s", conf.PartitionServer.Host, conf.PartitionServer.ServiceBasePath, "/api-deployment"), bytes.NewBuffer(payload)) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error creating api definition request: %v, API_UUID: %v", err, apiUUID) - } - req.Header.Set("Content-Type", "application/json; charset=UTF-8") - resp, err := partitionClient.Do(req) - if err != nil { - loggers.LoggerAPKOperator.Errorf("Error sending API Event: %v, API_UUID: %v", err, apiUUID) - } - if resp.StatusCode == http.StatusAccepted { - loggers.LoggerAPKOperator.Info("API Event Accepted", resp.Status) - } + if !apiEvent.Event.APIDefinition.Spec.SystemAPI { + apiDefinition := apiEvent.Event.APIDefinition + loggers.LoggerAPKOperator.Infof("Sending API to APK management server: %v, API_UUID: %v", apiDefinition.Spec.APIName, string(apiDefinition.ObjectMeta.UID)) + api := apiEvent.Event + eventType := apiEvent.EventType + basePath := api.APIDefinition.Spec.BasePath + organization := api.APIDefinition.Spec.Organization + version := api.APIDefinition.Spec.APIVersion + apiName := api.APIDefinition.Spec.APIName + apiUUID := string(api.APIDefinition.Name) + var hostNames []string + httpRoute := api.ProdHTTPRoute + if httpRoute == nil { + httpRoute = api.SandHTTPRoute + } + for _, hostName := range httpRoute.HTTPRouteCombined.Spec.Hostnames { + hostNames = append(hostNames, string(hostName)) + } + data := PartitionEvent{ + EventType: eventType, + BasePath: basePath, + Organization: organization, + APIVersion: version, + APIName: apiName, + APIUUID: apiUUID, + Vhosts: hostNames, + Partition: conf.PartitionServer.PartitionName, + } + payload, err := json.Marshal(data) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error creating Event: %v, API_UUID: %v", err, apiUUID) + } + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s%s", conf.PartitionServer.Host, conf.PartitionServer.ServiceBasePath, "/api-deployment"), bytes.NewBuffer(payload)) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error creating api definition request: %v, API_UUID: %v", err, apiUUID) + } + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + resp, err := partitionClient.Do(req) + if err != nil { + loggers.LoggerAPKOperator.Errorf("Error sending API Event: %v, API_UUID: %v", err, apiUUID) + } + if resp.StatusCode == http.StatusAccepted { + loggers.LoggerAPKOperator.Info("API Event Accepted", resp.Status) } } - } }