Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactore subscription validation #1861

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

27 changes: 0 additions & 27 deletions adapter/api/proto/wso2/discovery/subscription/api.proto

This file was deleted.

17 changes: 0 additions & 17 deletions adapter/api/proto/wso2/discovery/subscription/api_list.proto

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ message ApplicationKeyMapping {
string keyType = 4;
string envID = 5;
int64 timestamp = 6;
string organization = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ message ApplicationMapping {
string uuid = 1;
string applicationRef = 2;
string subscriptionRef = 3;
string organization = 4;
}
1 change: 0 additions & 1 deletion adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func runManagementServer(conf *config.Config, server xdsv3.Server, enforcerServe
discoveryv3.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
configservice.RegisterConfigDiscoveryServiceServer(grpcServer, enforcerServer)
apiservice.RegisterApiDiscoveryServiceServer(grpcServer, enforcerServer)
subscriptionservice.RegisterApiListDiscoveryServiceServer(grpcServer, enforcerAPIDsSrv)
subscriptionservice.RegisterJWTIssuerDiscoveryServiceServer(grpcServer, enforcerJwtIssuerDsSrv)
// register health service
healthservice.RegisterHealthServer(grpcServer, &health.Server{})
Expand Down
112 changes: 0 additions & 112 deletions adapter/internal/discovery/xds/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
package xds

import (
"strconv"

"github.com/wso2/apk/adapter/config"
logger "github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/config/enforcer"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription"
"github.com/wso2/apk/adapter/pkg/eventhub/types"
)

var (
// APIListMap has the following mapping label -> apiUUID -> API (Metadata)
APIListMap map[string]map[string]*subscription.APIs
)

// EventType is a enum to distinguish Create, Update and Delete Events
Expand Down Expand Up @@ -174,105 +164,3 @@ func marshalAnalyticsPublishers(config config.Config) []*enforcer.AnalyticsPubli
}
return resolvedAnalyticsPublishers
}

// marshalAPIListMapToList converts the data into APIList proto type
func marshalAPIListMapToList(apiMap map[string]*subscription.APIs) *subscription.APIList {
apis := []*subscription.APIs{}
for _, api := range apiMap {
apis = append(apis, api)
}

return &subscription.APIList{
List: apis,
}
}

// MarshalAPIMetataAndReturnList updates the internal APIListMap and returns the XDS compatible APIList.
// apiList is the internal APIList object (For single API, this would contain a List with just one API)
// initialAPIUUIDListMap is assigned during startup when global adapter is associated. This would be empty otherwise.
// gatewayLabel is the environment.
func MarshalAPIMetataAndReturnList(apiList *types.APIList, initialAPIUUIDListMap map[string]int, gatewayLabel string) *subscription.APIList {

if APIListMap == nil {
APIListMap = make(map[string]map[string]*subscription.APIs)
}
// var resourceMapForLabel map[string]*subscription.APIs
if _, ok := APIListMap[gatewayLabel]; !ok {
APIListMap[gatewayLabel] = make(map[string]*subscription.APIs)
}
resourceMapForLabel := APIListMap[gatewayLabel]
for item := range apiList.List {
api := apiList.List[item]
// initialAPIUUIDListMap is not null if the adapter is running with global adapter enabled, and it is
// the first method invocation.
if initialAPIUUIDListMap != nil {
if _, ok := initialAPIUUIDListMap[api.UUID]; !ok {
continue
}
}
newAPI := marshalAPIMetadata(&api)
resourceMapForLabel[api.UUID] = newAPI
}
return marshalAPIListMapToList(resourceMapForLabel)
}

// DeleteAPIAndReturnList removes the API from internal maps and returns the marshalled API List.
// If the apiUUID is not found in the internal map under the provided environment, then it would return a
// nil value. Hence it is required to check if the return value is nil, prior to updating the XDS cache.
func DeleteAPIAndReturnList(apiUUID, organizationUUID string, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
}
delete(APIListMap[gatewayLabel], apiUUID)
return marshalAPIListMapToList(APIListMap[gatewayLabel])
}

// MarshalAPIForLifeCycleChangeEventAndReturnList updates the internal map's API instances lifecycle state only if
// stored API Instance's or input status event is a blocked event.
// If no change is applied, it would return nil. Hence the XDS cache should not be updated.
func MarshalAPIForLifeCycleChangeEventAndReturnList(apiUUID, status, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
}
if _, ok := APIListMap[gatewayLabel][apiUUID]; !ok {
logger.LoggerXds.Debugf("No API Metadata for API ID: %s is available under gateway Environment : %s",
apiUUID, gatewayLabel)
return nil
}
storedAPILCState := APIListMap[gatewayLabel][apiUUID].LcState

// Because the adapter only required to update the XDS if it is related to blocked state.
if !(storedAPILCState == blockedStatus || status == blockedStatus) {
return nil
}
APIListMap[gatewayLabel][apiUUID].LcState = status
return marshalAPIListMapToList(APIListMap[gatewayLabel])
}

func marshalAPIMetadata(api *types.API) *subscription.APIs {
return &subscription.APIs{
ApiId: strconv.Itoa(api.APIID),
Name: api.Name,
Provider: api.Provider,
Version: api.Version,
BasePath: api.BasePath,
Policy: api.Policy,
ApiType: api.APIType,
Uuid: api.UUID,
IsDefaultVersion: api.IsDefaultVersion,
LcState: api.APIStatus,
}
}

// CheckIfAPIMetadataIsAlreadyAvailable returns true only if the API Metadata for the given API UUID
// is already available
func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool {
if _, labelAvailable := APIListMap[label]; labelAvailable {
if _, apiAvailale := APIListMap[label][apiUUID]; apiAvailale {
return true
}
}
return false
}
37 changes: 1 addition & 36 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/oasparser/model"
operatorconsts "github.com/wso2/apk/adapter/internal/operator/constants"
disc_api "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/api"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription"
wso2_cache "github.com/wso2/apk/adapter/pkg/discovery/protocol/cache/v3"
wso2_resource "github.com/wso2/apk/adapter/pkg/discovery/protocol/resource/v3"
Expand Down Expand Up @@ -326,6 +325,7 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
for _, oldLabel := range oldLabels {
if !stringutils.StringInSlice(oldLabel, newLabels) {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel)
GenerateEnvoyResoucesForGateway(oldLabel)
UpdateEnforcerApis(oldLabel, apis, "")
UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel)
Expand Down Expand Up @@ -533,22 +533,6 @@ func UpdateEnforcerApis(label string, apis []types.Resource, version string) {
}
logger.LoggerXds.Infof("New API cache update for the label: " + label + " version: " + fmt.Sprint(version))

subAPIs := []*subscription.APIs{}
for _, api := range apis {
subAPI := subscription.APIs{}
subAPI.ApiId = api.(*disc_api.Api).GetId()
subAPI.Name = api.(*disc_api.Api).GetTitle()
subAPI.Version = api.(*disc_api.Api).GetVersion()
subAPI.BasePath = api.(*disc_api.Api).GetBasePath()
subAPI.Policy = api.(*disc_api.Api).GetTier()
subAPI.ApiType = api.(*disc_api.Api).GetApiType()
subAPI.Uuid = api.(*disc_api.Api).GetId()
subAPIs = append(subAPIs, &subAPI)
}
subAPIList := &subscription.APIList{
List: subAPIs,
}
UpdateEnforcerAPIList(label, subAPIList)
}

// UpdateEnforcerJWTIssuers sets new update to the enforcer's Applications
Expand All @@ -571,25 +555,6 @@ func UpdateEnforcerJWTIssuers(jwtIssuers *subscription.JWTIssuerList) {
logger.LoggerXds.Infof("New JWTIssuer cache update for the label: " + label + " version: " + fmt.Sprint(version))
}

// UpdateEnforcerAPIList sets new update to the enforcer's Apis
func UpdateEnforcerAPIList(label string, apis *subscription.APIList) {
logger.LoggerXds.Debugf("Updating Enforcer API Cache")
apiList := append(enforcerLabelMap[label].apiList, apis)

version, _ := crand.Int(crand.Reader, maxRandomBigInt())
snap, _ := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{
wso2_resource.APIListType: apiList,
})
snap.Consistent()

errSetSnap := enforcerAPICache.SetSnapshot(context.Background(), label, snap)
if errSetSnap != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error()))
}
enforcerLabelMap[label].apiList = apiList
logger.LoggerXds.Infof("New API List cache update for the label: " + label + " version: " + fmt.Sprint(version))
}

// UpdateXdsCacheWithLock uses mutex and lock to avoid different go routines updating XDS at the same time
func UpdateXdsCacheWithLock(label string, endpoints []types.Resource, clusters []types.Resource, routes []types.Resource,
listeners []types.Resource) bool {
Expand Down
Loading