diff --git a/adapter/internal/discovery/xds/marshaller.go b/adapter/internal/discovery/xds/marshaller.go index c195d8ba39..e60192fd87 100644 --- a/adapter/internal/discovery/xds/marshaller.go +++ b/adapter/internal/discovery/xds/marshaller.go @@ -279,100 +279,6 @@ func MarshalKeyManager(keyManager *types.KeyManager) *keymgt.KeyManagerConfig { return nil } -// // MarshalMultipleApplications is used to update the applicationList during the startup where -// // multiple applications are pulled at once. And then it returns the ApplicationList. -// func MarshalMultipleApplications(appList *types.ApplicationList) *subscription.ApplicationList { -// resourceMap := make(map[string]*subscription.Application) -// for item := range appList.List { -// application := appList.List[item] -// applicationSub := marshalApplication(&application) -// resourceMap[application.UUID] = applicationSub -// } -// ApplicationMap = resourceMap -// return marshalApplicationMapToList(ApplicationMap) -// } - -// // MarshalApplicationEventAndReturnList handles the Application Event corresponding to the event received -// // from message broker. And then it returns the ApplicationList. -// func MarshalApplicationEventAndReturnList(application *types.Application, -// eventType EventType) *subscription.ApplicationList { -// if eventType == DeleteEvent { -// delete(ApplicationMap, application.UUID) -// logger.LoggerXds.Infof("Application %s is deleted.", application.UUID) -// } else { -// applicationSub := marshalApplication(application) -// ApplicationMap[application.UUID] = applicationSub -// if eventType == CreateEvent { -// logger.LoggerXds.Infof("Application %s is added.", application.UUID) -// } else { -// logger.LoggerXds.Infof("Application %s is updated.", application.UUID) -// } -// } -// return marshalApplicationMapToList(ApplicationMap) -// } - -// // MarshalMultipleApplicationKeyMappings is used to update the application key mappings during the startup where -// // multiple key mappings are pulled at once. And then it returns the ApplicationKeyMappingList. -// func MarshalMultipleApplicationKeyMappings(keymappingList *types.ApplicationKeyMappingList) *subscription.ApplicationKeyMappingList { -// resourceMap := make(map[string]*subscription.ApplicationKeyMapping) -// for item := range keymappingList.List { -// keyMapping := keymappingList.List[item] -// applicationKeyMappingReference := GetApplicationKeyMappingReference(&keyMapping) -// keyMappingSub := marshalKeyMapping(&keyMapping) -// resourceMap[applicationKeyMappingReference] = keyMappingSub -// } -// ApplicationKeyMappingMap = resourceMap -// return marshalKeyMappingMapToList(ApplicationKeyMappingMap) -// } - -// // MarshalApplicationKeyMappingEventAndReturnList handles the Application Key Mapping Event corresponding to the event received -// // from message broker. And then it returns the ApplicationKeyMappingList. -// func MarshalApplicationKeyMappingEventAndReturnList(keyMapping *types.ApplicationKeyMapping, -// eventType EventType) *subscription.ApplicationKeyMappingList { -// applicationKeyMappingReference := GetApplicationKeyMappingReference(keyMapping) -// if eventType == DeleteEvent { -// delete(ApplicationKeyMappingMap, applicationKeyMappingReference) -// logger.LoggerXds.Infof("Application Key Mapping for the applicationKeyMappingReference %s is removed.", -// applicationKeyMappingReference) -// } else { -// keyMappingSub := marshalKeyMapping(keyMapping) -// ApplicationKeyMappingMap[applicationKeyMappingReference] = keyMappingSub -// logger.LoggerXds.Infof("Application Key Mapping for the applicationKeyMappingReference %s is added.", -// applicationKeyMappingReference) -// } -// return marshalKeyMappingMapToList(ApplicationKeyMappingMap) -// } - -// // MarshalMultipleSubscriptions is used to update the subscriptions during the startup where -// // multiple subscriptions are pulled at once. And then it returns the SubscriptionList. -// func MarshalMultipleSubscriptions(subscriptionsList *types.SubscriptionList) *subscription.SubscriptionList { -// resourceMap := make(map[int32]*subscription.Subscription) -// for item := range subscriptionsList.List { -// sb := subscriptionsList.List[item] -// resourceMap[sb.SubscriptionID] = marshalSubscription(&sb) -// } -// SubscriptionMap = resourceMap -// return marshalSubscriptionMapToList(SubscriptionMap) -// } - -// // MarshalSubscriptionEventAndReturnList handles the Subscription Event corresponding to the event received -// // from message broker. And then it returns the SubscriptionList. -// func MarshalSubscriptionEventAndReturnList(sub *types.Subscription, eventType EventType) *subscription.SubscriptionList { -// if eventType == DeleteEvent { -// delete(SubscriptionMap, sub.SubscriptionID) -// logger.LoggerXds.Infof("Subscription for %s:%s is deleted.", sub.APIUUID, sub.ApplicationUUID) -// } else { -// subscriptionSub := marshalSubscription(sub) -// SubscriptionMap[sub.SubscriptionID] = subscriptionSub -// if eventType == UpdateEvent { -// logger.LoggerXds.Infof("Subscription for %s:%s is updated.", sub.APIUUID, sub.ApplicationUUID) -// } else { -// logger.LoggerXds.Infof("Subscription for %s:%s is added.", x.APIUUID, sub.ApplicationUUID) -// } -// } -// return marshalSubscriptionMapToList(SubscriptionMap) -// } - // MarshalMultipleApplicationPolicies is used to update the applicationPolicies during the startup where // multiple application policies are pulled at once. And then it returns the ApplicationPolicyList. func MarshalMultipleApplicationPolicies(policies *types.ApplicationPolicyList) *subscription.ApplicationPolicyList { @@ -499,40 +405,6 @@ func MarshalAPIForLifeCycleChangeEventAndReturnList(apiUUID, status, gatewayLabe return marshalAPIListMapToList(APIListMap[gatewayLabel]) } -// func marshalSubscription(subscriptionInternal *types.Subscription) *subscription.Subscription { -// api := &subscription.API{ -// Name: subscriptionInternal.Api.Name, -// Versions: subscriptionInternal.Api.Versions, -// } -// sub := &subscription.Subscription{ -// Uuid: subscriptionInternal.SubscriptionUUID, -// SubStatus: subscriptionInternal.SubscriptionStatus, -// Organization: subscriptionInternal.Organization, -// } -// sub.Api = api -// return sub -// } - -// func marshalApplication(appInternal *types.Application) *subscription.Application { -// app := &subscription.Application{ -// Uuid: appInternal.UUID, -// Name: appInternal.Name, -// Owner: appInternal.Owner, -// Attributes: appInternal.Attributes, -// } -// return app -// } - -// func marshalKeyMapping(keyMappingInternal *types.ApplicationKeyMapping) *subscription.ApplicationKeyMapping { -// return &subscription.ApplicationKeyMapping{ -// ApplicationUUID: keyMappingInternal.ApplicationUUID, -// ApplicationIdentifier: keyMappingInternal.ApplicationIdentifier, -// KeyType: keyMappingInternal.KeyType, -// EnvID: keyMappingInternal.EnvId, -// Timestamp: keyMappingInternal.TimeStamp, -// } -// } - func marshalAPIMetadata(api *types.API) *subscription.APIs { return &subscription.APIs{ ApiId: strconv.Itoa(api.APIID), @@ -573,12 +445,6 @@ func marshalSubscriptionPolicy(policy *types.SubscriptionPolicy) *subscription.S } } -// // GetApplicationKeyMappingReference returns unique reference for each key Mapping event. -// // It is the combination of consumerKey -// func GetApplicationKeyMappingReference(keyMapping *types.ApplicationKeyMapping) string { -// return keyMapping.ApplicationIdentifier -// } - // CheckIfAPIMetadataIsAlreadyAvailable returns true only if the API Metadata for the given API UUID // is already available func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool { diff --git a/adapter/internal/discovery/xds/server.go b/adapter/internal/discovery/xds/server.go index ed456f3e80..faa0e09beb 100644 --- a/adapter/internal/discovery/xds/server.go +++ b/adapter/internal/discovery/xds/server.go @@ -79,12 +79,9 @@ type EnvoyGatewayConfig struct { type EnforcerInternalAPI struct { configs []types.Resource keyManagers []types.Resource - // subscriptions []types.Resource - // applications []types.Resource apiList []types.Resource applicationPolicies []types.Resource subscriptionPolicies []types.Resource - // applicationKeyMappings []types.Resource revokedTokens []types.Resource jwtIssuers []types.Resource } @@ -97,12 +94,9 @@ var ( cache envoy_cachev3.SnapshotCache enforcerCache wso2_cache.SnapshotCache enforcerJwtIssuerCache wso2_cache.SnapshotCache - // enforcerSubscriptionCache wso2_cache.SnapshotCache - // enforcerApplicationCache wso2_cache.SnapshotCache enforcerAPICache wso2_cache.SnapshotCache enforcerApplicationPolicyCache wso2_cache.SnapshotCache enforcerSubscriptionPolicyCache wso2_cache.SnapshotCache - // enforcerApplicationKeyMappingCache wso2_cache.SnapshotCache enforcerKeyManagerCache wso2_cache.SnapshotCache enforcerRevokedTokensCache wso2_cache.SnapshotCache enforcerThrottleDataCache wso2_cache.SnapshotCache @@ -563,48 +557,6 @@ func UpdateEnforcerApis(label string, apis []types.Resource, version string) { UpdateEnforcerAPIList(label, subAPIList) } -// // UpdateEnforcerSubscriptions sets new update to the enforcer's Subscriptions -// func UpdateEnforcerSubscriptions(subscriptions *subscription.SubscriptionList) { -// //TODO: (Dinusha) check this hardcoded value -// logger.LoggerXds.Debug("Updating Enforcer Subscription Cache") -// label := commonEnforcerLabel -// subscriptionList := append(enforcerLabelMap[label].subscriptions, subscriptions) - -// // TODO: (VirajSalaka) Decide if a map is required to keep version (just to avoid having the same version) -// version, _ := crand.Int(crand.Reader, maxRandomBigInt()) -// snap, _ := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ -// wso2_resource.SubscriptionListType: subscriptionList, -// }) -// snap.Consistent() - -// errSetSnap := enforcerSubscriptionCache.SetSnapshot(context.Background(), label, snap) -// if errSetSnap != nil { -// logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error())) -// } -// enforcerLabelMap[label].subscriptions = subscriptionList -// logger.LoggerXds.Infof("New Subscription cache update for the label: " + label + " version: " + fmt.Sprint(version)) -// } - -// // UpdateEnforcerApplications sets new update to the enforcer's Applications -// func UpdateEnforcerApplications(applications *subscription.ApplicationList) { -// logger.LoggerXds.Debug("Updating Enforcer Application Cache") -// label := commonEnforcerLabel -// applicationList := append(enforcerLabelMap[label].applications, applications) - -// version, _ := crand.Int(crand.Reader, maxRandomBigInt()) -// snap, _ := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ -// wso2_resource.ApplicationListType: applicationList, -// }) -// snap.Consistent() - -// errSetSnap := enforcerApplicationCache.SetSnapshot(context.Background(), label, snap) -// if errSetSnap != nil { -// logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error())) -// } -// enforcerLabelMap[label].applications = applicationList -// logger.LoggerXds.Infof("New Application cache update for the label: " + label + " version: " + fmt.Sprint(version)) -// } - // UpdateEnforcerJWTIssuers sets new update to the enforcer's Applications func UpdateEnforcerJWTIssuers(jwtIssuers *subscription.JWTIssuerList) { logger.LoggerXds.Debug("Updating Enforcer JWT Issuer Cache") @@ -684,26 +636,6 @@ func UpdateEnforcerSubscriptionPolicies(subscriptionPolicies *subscription.Subsc logger.LoggerXds.Infof("New Subscription Policy cache update for the label: " + label + " version: " + fmt.Sprint(version)) } -// // UpdateEnforcerApplicationKeyMappings sets new update to the enforcer's Application Key Mappings -// func UpdateEnforcerApplicationKeyMappings(applicationKeyMappings *subscription.ApplicationKeyMappingList) { -// logger.LoggerXds.Debug("Updating Application Key Mapping Cache") -// label := commonEnforcerLabel -// applicationKeyMappingList := append(enforcerLabelMap[label].applicationKeyMappings, applicationKeyMappings) - -// version, _ := crand.Int(crand.Reader, maxRandomBigInt()) -// snap, _ := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ -// wso2_resource.ApplicationKeyMappingListType: applicationKeyMappingList, -// }) -// snap.Consistent() - -// errSetSnap := enforcerApplicationKeyMappingCache.SetSnapshot(context.Background(), label, snap) -// if errSetSnap != nil { -// logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error())) -// } -// enforcerLabelMap[label].applicationKeyMappings = applicationKeyMappingList -// logger.LoggerXds.Infof("New Application Key Mapping cache update for the label: " + label + " version: " + fmt.Sprint(version)) -// } - // UpdateXdsCacheWithLock uses mutex and lock to avoid different go routines updating XDS at the same time func UpdateXdsCacheWithLock(label string, endpoints []types.Resource, clusters []types.Resource, routes []types.Resource, listeners []types.Resource) bool { diff --git a/adapter/internal/management-server/xds/application_event_listener.go b/adapter/internal/management-server/xds/application_event_listener.go deleted file mode 100644 index f7e37c5208..0000000000 --- a/adapter/internal/management-server/xds/application_event_listener.go +++ /dev/null @@ -1,83 +0,0 @@ -package xds - -// import ( -// "context" -// "errors" - -// "github.com/wso2/apk/adapter/internal/loggers" -// logging "github.com/wso2/apk/adapter/internal/logging" -// cpv1alpha1 "github.com/wso2/apk/adapter/internal/operator/apis/cp/v1alpha1" - -// apierrors "k8s.io/apimachinery/pkg/api/errors" -// "k8s.io/apimachinery/pkg/types" -// ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" -// "sigs.k8s.io/controller-runtime/pkg/client" -// ) - -// // HandleApplicationEventsFromMgtServer handles the Application events -// func HandleApplicationEventsFromMgtServer(c client.Client, cReader client.Reader) { -// for applicationEvent := range applicationChannel { -// switch applicationEvent.Type { -// case ApplicationCreate: -// if found, _, err := checkApplicationExists(applicationEvent.Application, c, cReader); err == nil && !found { -// if err := c.Create(context.Background(), *&applicationEvent.Application); err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1707, logging.CRITICAL, "Error creating application: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Application created: " + applicationEvent.Application.Name) -// } -// } -// break -// case ApplicationUpdate: -// if found, application, err := checkApplicationExists(applicationEvent.Application, c, cReader); err == nil && found { -// application.Spec = applicationEvent.Application.Spec -// err := c.Update(context.Background(), application) -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1709, logging.CRITICAL, "Error updating application: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Application updated: " + applicationEvent.Application.Name) -// } -// } -// break -// case ApplicationDelete: -// err := c.Delete(context.Background(), *&applicationEvent.Application) -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1710, logging.CRITICAL, "Error deleting application: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Application deleted: " + applicationEvent.Application.Name) -// } -// break -// default: -// loggers.LoggerXds.Info("Unknown Application Event Type") -// } -// } -// } - -// func checkApplicationExists(application *cpv1alpha1.Application, c client.Client, cReader client.Reader) (bool, *cpv1alpha1.Application, error) { -// var retrivedApplication = new(cpv1alpha1.Application) -// // Try reading from cache -// if err := c.Get(context.Background(), types.NamespacedName{ -// Name: application.Name, -// Namespace: application.Namespace}, retrivedApplication); err != nil { - -// target := &ctrlcache.ErrCacheNotStarted{} -// if errors.As(err, &target) { -// // Try reading from api server directly -// if err := cReader.Get(context.Background(), types.NamespacedName{ -// Name: application.Name, -// Namespace: application.Namespace}, retrivedApplication); err != nil { - -// if !apierrors.IsNotFound(err) { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1711, logging.CRITICAL, "Error retrieving application: %v", err.Error())) -// return false, nil, err -// } -// return false, nil, nil -// } -// } else if !apierrors.IsNotFound(err) { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1711, logging.CRITICAL, "Error retrieving application: %v", err.Error())) -// return false, nil, err -// } else { -// return false, nil, nil -// } -// } -// return true, retrivedApplication, nil -// } diff --git a/adapter/internal/management-server/xds/client.go b/adapter/internal/management-server/xds/client.go deleted file mode 100644 index c54ca1b249..0000000000 --- a/adapter/internal/management-server/xds/client.go +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Copyright (c) 2021, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -// import ( -// "context" -// "fmt" -// "io" -// "reflect" - -// core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" -// discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" -// "github.com/golang/protobuf/ptypes" -// "github.com/wso2/apk/adapter/config" -// "github.com/wso2/apk/adapter/internal/loggers" -// logging "github.com/wso2/apk/adapter/internal/logging" -// "github.com/wso2/apk/adapter/internal/management-server/utils" -// cpv1alpha1 "github.com/wso2/apk/adapter/internal/operator/apis/cp/v1alpha1" - -// stub "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/service/subscription" -// sub_model "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription" - -// operatorutils "github.com/wso2/apk/adapter/internal/operator/utils" -// "github.com/wso2/apk/adapter/pkg/utils/stringutils" -// "google.golang.org/genproto/googleapis/rpc/status" -// "google.golang.org/grpc" -// "google.golang.org/grpc/codes" -// grpcStatus "google.golang.org/grpc/status" -// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -// ) - -// var ( -// // Last Acknowledged Response from the apkmgt server -// lastAckedResponse *discovery.DiscoveryResponse -// // Last Received Response from the apkmgt server -// // Last Received Response is always is equal to the lastAckedResponse according to current implementation as there is no -// // validation performed on successfully received response. -// lastReceivedResponse *discovery.DiscoveryResponse -// // XDS stream for streaming Aplications from APK Mgt client -// xdsStream stub.ApplicationDiscoveryService_StreamApplicationsClient -// // applicationMap contains the application cache -// applicationMap map[string]cpv1alpha1.Application -// // applicationChannel is used to notifiy the application updates -// applicationChannel chan ApplicationEvent -// // subscriptionMap contains the application cache -// subscriptionMap map[string]cpv1alpha1.Subscription -// // subscriptionChannel is used to notifiy the subscription updates -// subscriptionChannel chan SubscriptionEvent -// // XDS stream for streaming Subscriptions from client -// xdsSubStream stub.SubscriptionDiscoveryService_StreamSubscriptionsClient -// // Last Acknowledged Response from the apkmgt server -// lastAckedResponseSub *discovery.DiscoveryResponse -// // Last Received Response from the apkmgt server -// // Last Received Response is always is equal to the lastAckedResponse according to current implementation as there is no -// // validation performed on successfully received response. -// lastReceivedResponseSub *discovery.DiscoveryResponse -// ) - -// // EventType is the type of the event -// type EventType int - -// const ( -// // ApplicationCreate is application create event type -// ApplicationCreate = 0 -// // ApplicationUpdate is application update event type -// ApplicationUpdate = 1 -// // ApplicationDelete is application delete event type -// ApplicationDelete = 2 -// ) - -// const ( -// // SubscriptionCreate is subscription create event type -// SubscriptionCreate = 0 -// // SubscriptionUpdate is subscription update event type -// SubscriptionUpdate = 1 -// // SubscriptionDelete is subscription delete event type -// SubscriptionDelete = 2 -// ) - -// // ApplicationEvent is the application event data holder -// type ApplicationEvent struct { -// Type EventType -// Application *cpv1alpha1.Application -// } - -// // SubscriptionEvent is the subsctiption event data holder -// type SubscriptionEvent struct { -// Type EventType -// Subscription *cpv1alpha1.Subscription -// } - -// const ( -// // The type url for requesting Application Entries from apkmgt server. -// applicationTypeURL string = "type.googleapis.com/wso2.discovery.subscription.Application" -// // The type url for requesting Subscription Entries from apkmgt server. -// subscriptionTypeURL string = "type.googleapis.com/wso2.discovery.subscription.Subscription" -// ) - -// func init() { -// lastAckedResponse = &discovery.DiscoveryResponse{} -// lastAckedResponseSub = &discovery.DiscoveryResponse{} -// applicationChannel = make(chan ApplicationEvent, 1000) -// applicationMap = make(map[string]cpv1alpha1.Application) -// subscriptionChannel = make(chan SubscriptionEvent, 1000) -// subscriptionMap = make(map[string]cpv1alpha1.Subscription) -// } - -// func initConnection(xdsURL string) error { -// // TODO: (AmaliMatharaarachchi) Bring in connection level configurations -// transportCredentials, err := utils.GenerateTLSCredentials() -// conn, err := grpc.Dial(xdsURL, grpc.WithTransportCredentials(transportCredentials), grpc.WithBlock()) -// if err != nil { -// // TODO: (AmaliMatharaarachchi) retries -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1700, logging.BLOCKER, "Error while connecting to the APK Management Server. %v", err.Error())) -// return err -// } - -// client := stub.NewApplicationDiscoveryServiceClient(conn) -// clientSub := stub.NewSubscriptionDiscoveryServiceClient(conn) -// streamContext := context.Background() -// xdsStream, err = client.StreamApplications(streamContext) -// xdsSubStream, err = clientSub.StreamSubscriptions(streamContext) - -// if err != nil { -// // TODO: (AmaliMatharaarachchi) handle error. -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1701, logging.BLOCKER, "Error while starting APK Management application stream. %v", err.Error())) -// return err -// } -// loggers.LoggerXds.Infof("Connection to the APK Management Server: %s is successful.", xdsURL) -// return nil -// } - -// func watchApplications() { -// for { -// discoveryResponse, err := xdsStream.Recv() -// if err == io.EOF { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1702, logging.CRITICAL, "EOF is received from the APK Management Server application stream. %v", err.Error())) -// return -// } -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1703, logging.CRITICAL, "Failed to receive the discovery response from the APK Management Server application stream. %v", err.Error())) -// errStatus, _ := grpcStatus.FromError(err) -// if errStatus.Code() == codes.Unavailable { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1704, logging.MINOR, "The APK Management Server application stream connection stopped: %v", err.Error())) -// return -// } -// nack(err.Error()) -// } else { -// lastReceivedResponse = discoveryResponse -// loggers.LoggerXds.Debugf("Discovery response is received : %s", discoveryResponse.VersionInfo) -// addApplicationsToChannel(discoveryResponse) -// ack() -// } -// } -// } - -// func watchSubscriptions() { -// for { -// discoveryResponse, err := xdsSubStream.Recv() -// if err == io.EOF { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1717, logging.CRITICAL, "EOF is received from the APK Management Server subscription stream. %v", err.Error())) -// return -// } -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1718, logging.CRITICAL, "Failed to receive the discovery response from the APK Management Server subscription stream. %v", err.Error())) -// errStatus, _ := grpcStatus.FromError(err) -// if errStatus.Code() == codes.Unavailable { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1719, logging.MINOR, "The APK Management Server subscription stream connection stopped: %v", err.Error())) -// return -// } -// nackSub(err.Error()) -// } else { -// lastReceivedResponseSub = discoveryResponse -// loggers.LoggerXds.Debugf("Discovery response is received : %s", discoveryResponse.VersionInfo) -// addSubscriptionsToChannel(discoveryResponse) -// ackSub() -// } -// } -// } - -// func ack() { -// lastAckedResponse = lastReceivedResponse -// discoveryRequest := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: lastAckedResponse.VersionInfo, -// TypeUrl: applicationTypeURL, -// ResponseNonce: lastReceivedResponse.Nonce, -// } -// xdsStream.Send(discoveryRequest) -// } - -// func ackSub() { -// lastAckedResponseSub = lastReceivedResponseSub -// discoveryRequest := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: lastAckedResponseSub.VersionInfo, -// TypeUrl: subscriptionTypeURL, -// ResponseNonce: lastReceivedResponseSub.Nonce, -// } -// xdsSubStream.Send(discoveryRequest) -// } - -// func nack(errorMessage string) { -// if lastAckedResponse == nil { -// return -// } -// discoveryRequest := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: lastAckedResponse.VersionInfo, -// TypeUrl: applicationTypeURL, -// ResponseNonce: lastReceivedResponse.Nonce, -// ErrorDetail: &status.Status{ -// Message: errorMessage, -// }, -// } -// xdsStream.Send(discoveryRequest) -// } - -// func nackSub(errorMessage string) { -// if lastAckedResponseSub == nil { -// return -// } -// discoveryRequest := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: lastAckedResponseSub.VersionInfo, -// TypeUrl: subscriptionTypeURL, -// ResponseNonce: lastReceivedResponseSub.Nonce, -// ErrorDetail: &status.Status{ -// Message: errorMessage, -// }, -// } -// xdsSubStream.Send(discoveryRequest) -// } - -// func getAdapterNode() *core.Node { -// config := config.ReadConfigs() -// return &core.Node{ -// Id: config.ManagementServer.NodeLabel, -// } -// } - -// // InitApkMgtXDSClient initializes the connection to the apkmgt server. -// func InitApkMgtXDSClient() { -// loggers.LoggerXds.Info("Starting the XDS Client connection to APK Management server.") -// config := config.ReadConfigs() -// err := initConnection(fmt.Sprintf("%s:%d", config.ManagementServer.Host, config.ManagementServer.XDSPort)) -// if err == nil { -// go watchApplications() -// discoveryRequest := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: "", -// TypeUrl: applicationTypeURL, -// } -// xdsStream.Send(discoveryRequest) -// go watchSubscriptions() -// discoveryRequestSub := &discovery.DiscoveryRequest{ -// Node: getAdapterNode(), -// VersionInfo: "", -// TypeUrl: subscriptionTypeURL, -// } -// xdsSubStream.Send(discoveryRequestSub) -// } else { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1705, logging.BLOCKER, "Error while starting the APK Management Server: %v", err.Error())) -// } -// } - -// func addApplicationsToChannel(resp *discovery.DiscoveryResponse) { -// var newApplicationUUIDs []string - -// for _, res := range resp.Resources { -// application := &sub_model.Application{} -// err := ptypes.UnmarshalAny(res, application) - -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1706, logging.MINOR, "Error while unmarshalling APK Management Server Application discovery response: %v", err.Error())) -// continue -// } - -// applicationUUID := application.Uuid -// newApplicationUUIDs = append(newApplicationUUIDs, applicationUUID) - -// applicationResource := &cpv1alpha1.Application{ -// ObjectMeta: metav1.ObjectMeta{ -// Namespace: operatorutils.GetOperatorPodNamespace(), -// Name: application.Uuid, -// }, -// Spec: cpv1alpha1.ApplicationSpec{ -// Name: application.Name, -// Owner: application.Owner, -// Attributes: application.Attributes, -// Policy: application.Policy, -// Organization: application.Organization, -// }, -// } - -// var consumerKeys []cpv1alpha1.Key -// for _, consumerKey := range application.Keys { -// consumerKeys = append(consumerKeys, cpv1alpha1.Key{Key: consumerKey.Key, KeyManager: consumerKey.KeyManager}) -// } -// applicationResource.Spec.Keys = consumerKeys - -// // Todo:(Sampath) Need to handle adding the subscriptions coming from management server seperately -// // var subscriptions []cpv1alpha1.Subscription -// // for _, subscription := range application.Subscriptions { -// // subscriptions = append(subscriptions, cpv1alpha1.Subscription{ -// // UUID: subscription.Name, -// // SubscriptionStatus: subscription.Spec.SubscriptionStatus, -// // PolicyID: subscription.PolicyId, -// // APIRef: subscription.ApiUuid, -// // }) -// // } -// // applicationResource.Spec.Subscriptions = subscriptions - -// var event ApplicationEvent - -// if currentApplication, found := applicationMap[applicationUUID]; found { -// if reflect.DeepEqual(currentApplication.Spec, applicationResource.Spec) { -// continue -// } -// // Application update event -// event = ApplicationEvent{ -// Type: ApplicationUpdate, -// Application: applicationResource, -// } -// applicationMap[applicationUUID] = *applicationResource -// } else { -// // Application create event -// event = ApplicationEvent{ -// Type: ApplicationCreate, -// Application: applicationResource, -// } -// applicationMap[applicationUUID] = *applicationResource -// } - -// applicationChannel <- event - -// } -// // Send delete events for removed applications -// for item := range applicationMap { -// application := applicationMap[item] -// if !stringutils.StringInSlice(application.Name, newApplicationUUIDs) { -// // Application delete event -// event := ApplicationEvent{ -// Type: ApplicationDelete, -// Application: &application, -// } -// applicationChannel <- event -// delete(applicationMap, application.Name) -// } -// } -// } - -// func addSubscriptionsToChannel(resp *discovery.DiscoveryResponse) { -// var newSubscriptionUUIDs []string - -// for _, res := range resp.Resources { -// subscription := &sub_model.Subscription{} -// err := ptypes.UnmarshalAny(res, subscription) - -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1720, logging.MINOR, "Error while unmarshalling APK Management Server Subscription discovery response: %v", err.Error())) -// continue -// } - -// subscriptionUUID := subscription.Uuid -// newSubscriptionUUIDs = append(newSubscriptionUUIDs, subscriptionUUID) - -// subscriptionResource := &cpv1alpha1.Subscription{ -// ObjectMeta: metav1.ObjectMeta{ -// Namespace: operatorutils.GetOperatorPodNamespace(), -// Name: subscription.Uuid, -// }, -// Spec: cpv1alpha1.SubscriptionSpec{ -// APIRef: subscription.ApiRef, -// ApplicationRef: subscription.ApplicationRef, -// PolicyID: subscription.PolicyId, -// SubscriptionStatus: subscription.SubStatus, -// Subscriber: subscription.Subscriber, -// Organization: subscription.Organization, -// }, -// } - -// var event SubscriptionEvent - -// if currentSubscription, found := subscriptionMap[subscriptionUUID]; found { -// if reflect.DeepEqual(currentSubscription.Spec, subscriptionResource.Spec) { -// continue -// } -// // Subscription update event -// event = SubscriptionEvent{ -// Type: SubscriptionUpdate, -// Subscription: subscriptionResource, -// } -// subscriptionMap[subscriptionUUID] = *subscriptionResource -// } else { -// // Subscription create event -// event = SubscriptionEvent{ -// Type: SubscriptionCreate, -// Subscription: subscriptionResource, -// } -// subscriptionMap[subscriptionUUID] = *subscriptionResource -// } - -// subscriptionChannel <- event - -// } -// // Send delete events for removed subscriptions -// for item := range subscriptionMap { -// subscription := subscriptionMap[item] -// if !stringutils.StringInSlice(subscription.Name, newSubscriptionUUIDs) { -// // Subscription delete event -// event := SubscriptionEvent{ -// Type: SubscriptionDelete, -// Subscription: &subscription, -// } -// subscriptionChannel <- event -// delete(subscriptionMap, subscription.Name) -// } -// } -// } diff --git a/adapter/internal/management-server/xds/subscription_event_listener.go b/adapter/internal/management-server/xds/subscription_event_listener.go deleted file mode 100644 index 2708d6f23e..0000000000 --- a/adapter/internal/management-server/xds/subscription_event_listener.go +++ /dev/null @@ -1,83 +0,0 @@ -package xds - -// import ( -// "context" -// "errors" - -// "github.com/wso2/apk/adapter/internal/loggers" -// logging "github.com/wso2/apk/adapter/internal/logging" -// cpv1alpha1 "github.com/wso2/apk/adapter/internal/operator/apis/cp/v1alpha1" - -// apierrors "k8s.io/apimachinery/pkg/api/errors" -// "k8s.io/apimachinery/pkg/types" -// ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" -// "sigs.k8s.io/controller-runtime/pkg/client" -// ) - -// HandleSubscriptionEventsFromMgtServer handles the Subscription events -// func HandleSubscriptionEventsFromMgtServer(c client.Client, cReader client.Reader) { -// for subscriptionEvent := range subscriptionChannel { -// switch subscriptionEvent.Type { -// case SubscriptionCreate: -// if found, _, err := checkSubscriptionExists(subscriptionEvent.Subscription, c, cReader); err == nil && !found { -// if err := c.Create(context.Background(), *&subscriptionEvent.Subscription); err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1721, logging.CRITICAL, "Error creating subscription: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Subscription created: " + subscriptionEvent.Subscription.Name) -// } -// } -// break -// case SubscriptionUpdate: -// if found, subscription, err := checkSubscriptionExists(subscriptionEvent.Subscription, c, cReader); err == nil && found { -// subscription.Spec = subscriptionEvent.Subscription.Spec -// err := c.Update(context.Background(), subscription) -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1722, logging.CRITICAL, "Error updating subscription: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Subscription updated: " + subscriptionEvent.Subscription.Name) -// } -// } -// break -// case SubscriptionDelete: -// err := c.Delete(context.Background(), *&subscriptionEvent.Subscription) -// if err != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1723, logging.CRITICAL, "Error deleting subscription: %v", err.Error())) -// } else { -// loggers.LoggerXds.Info("Subscription deleted: " + subscriptionEvent.Subscription.Name) -// } -// break -// default: -// loggers.LoggerXds.Info("Unknown Subscription Event Type") -// } -// } -// } - -// func checkSubscriptionExists(subscription *cpv1alpha1.Subscription, c client.Client, cReader client.Reader) (bool, *cpv1alpha1.Subscription, error) { -// var retrivedSubscription = new(cpv1alpha1.Subscription) -// // Try reading from cache -// if err := c.Get(context.Background(), types.NamespacedName{ -// Name: subscription.Name, -// Namespace: subscription.Namespace}, retrivedSubscription); err != nil { - -// target := &ctrlcache.ErrCacheNotStarted{} -// if errors.As(err, &target) { -// // Try reading from api server directly -// if err := cReader.Get(context.Background(), types.NamespacedName{ -// Name: subscription.Name, -// Namespace: subscription.Namespace}, retrivedSubscription); err != nil { - -// if !apierrors.IsNotFound(err) { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1724, logging.CRITICAL, "Error retrieving subscription: %v", err.Error())) -// return false, nil, err -// } -// return false, nil, nil -// } -// } else if !apierrors.IsNotFound(err) { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1724, logging.CRITICAL, "Error retrieving subscription: %v", err.Error())) -// return false, nil, err -// } else { -// return false, nil, nil -// } -// } -// return true, retrivedSubscription, nil -// } diff --git a/adapter/internal/operator/operator.go b/adapter/internal/operator/operator.go index ffee4e07f9..12460c4d4a 100644 --- a/adapter/internal/operator/operator.go +++ b/adapter/internal/operator/operator.go @@ -139,11 +139,6 @@ func InitOperator() { go synchronizer.HandleAPILifeCycleEvents(&ch, &successChannel) go synchronizer.HandleGatewayLifeCycleEvents(&gatewaych) - // if config.ReadConfigs().ManagementServer.Enabled { - // go xds.InitApkMgtXDSClient() - // go xds.HandleApplicationEventsFromMgtServer(mgr.GetClient(), mgr.GetAPIReader()) - // go xds.HandleSubscriptionEventsFromMgtServer(mgr.GetClient(), mgr.GetAPIReader()) - // } if config.ReadConfigs().PartitionServer.Enabled { go synchronizer.SendEventToPartitionServer() } diff --git a/adapter/pkg/eventhub/types/types.go b/adapter/pkg/eventhub/types/types.go index adb5fc3985..dc7c03b620 100644 --- a/adapter/pkg/eventhub/types/types.go +++ b/adapter/pkg/eventhub/types/types.go @@ -17,55 +17,6 @@ package types -// // Subscription for struct subscription -// type Subscription struct { -// SubscriptionID int32 `json:"subscriptionId"` -// SubscriptionUUID string `json:"subscriptionUUID"` -// SubscriptionStatus string `json:"subscriptionStatus"` -// Organization string `json:"organization"` -// Api APIRef `json:"api"` -// TimeStamp int64 `json:"timeStamp,omitempty"` -// } - -// type APIRef struct { -// Name string `json:"name"` -// Versions []string `json:"versions"` -// } - -// // SubscriptionList for struct list of applications -// type SubscriptionList struct { -// List []Subscription `json:"list"` -// } - -// // Application for struct application -// type Application struct { -// UUID string `json:"uuid"` -// ID int32 `json:"id" json:"applicationId"` -// Name string `json:"name" json:"applicationName"` -// Owner string `json:"owner"` -// Attributes map[string]string `json:"attributes"` -// } - -// // ApplicationList for struct list of application -// type ApplicationList struct { -// List []Application `json:"list"` -// } - -// // ApplicationKeyMapping for struct applicationKeyMapping -// type ApplicationKeyMapping struct { -// ApplicationUUID string `json:"applicationUUID"` -// SecurityScheme string `json:"securityScheme"` -// ApplicationIdentifier string `json:"appId"` -// EnvId string `json:"envId"` -// KeyType string `json:"keyType"` -// TimeStamp int64 `json:"timeStamp,omitempty"` -// } - -// // ApplicationKeyMappingList for struct list of applicationKeyMapping -// type ApplicationKeyMappingList struct { -// List []ApplicationKeyMapping `json:"list"` -// } - // API for struct Api type API struct { APIID int `json:"apiId"` diff --git a/common-controller/internal/xds/server.go b/common-controller/internal/xds/server.go index 856214139b..9bd0ac5878 100644 --- a/common-controller/internal/xds/server.go +++ b/common-controller/internal/xds/server.go @@ -33,15 +33,12 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" envoy_cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - // envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription" wso2_cache "github.com/wso2/apk/adapter/pkg/discovery/protocol/cache/v3" wso2_resource "github.com/wso2/apk/adapter/pkg/discovery/protocol/resource/v3" eventhubTypes "github.com/wso2/apk/adapter/pkg/eventhub/types" "github.com/wso2/apk/adapter/pkg/logging" - // "github.com/wso2/apk/common-controller/internal/config" "github.com/wso2/apk/common-controller/internal/loggers" - // "github.com/wso2/apk/common-controller/internal/oasparser/model" dpv1alpha1 "github.com/wso2/apk/common-controller/internal/operator/apis/dp/v1alpha1" ) @@ -249,30 +246,6 @@ func GetEnforcerApplicationMappingCache() wso2_cache.SnapshotCache { return enforcerApplicationMappingCache } -// // UpdateEnforcerConfig Sets new update to the enforcer's configuration -// func UpdateEnforcerConfig(configFile *config.Config) { -// // TODO: (Praminda) handle labels -// label := commonEnforcerLabel -// config := config.ReadConfigs() -// // configs := []types.Resource{MarshalConfig(configFile)} -// version, _ := crand.Int(crand.Reader, maxRandomBigInt()) -// snap, errNewSnap := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{ -// wso2_resource.ConfigType: configs, -// }) -// if errNewSnap != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1413, logging.MAJOR, "Error creating new snapshot : %v", errNewSnap.Error())) -// } -// snap.Consistent() - -// errSetSnap := enforcerCache.SetSnapshot(context.Background(), label, snap) -// if errSetSnap != nil { -// loggers.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error())) -// } - -// enforcerLabelMap[label].configs = configs -// loggers.LoggerXds.Infof("New Config cache update for the label: " + label + " version: " + fmt.Sprint(version)) -// } - // UpdateEnforcerApplications sets new update to the enforcer's Applications func UpdateEnforcerApplications(applications *subscription.ApplicationList) { loggers.LoggerXds.Debug("Updating Enforcer Application Cache")