Skip to content

Commit

Permalink
refactor subscription data
Browse files Browse the repository at this point in the history
  • Loading branch information
tharindu1st committed Nov 28, 2023
1 parent fdcb398 commit df3ac53
Show file tree
Hide file tree
Showing 92 changed files with 1,075 additions and 12,537 deletions.
18 changes: 0 additions & 18 deletions adapter/api/proto/wso2/discovery/service/subscription/apids.proto

This file was deleted.

27 changes: 0 additions & 27 deletions adapter/api/proto/wso2/discovery/subscription/api.proto

This file was deleted.

17 changes: 0 additions & 17 deletions adapter/api/proto/wso2/discovery/subscription/api_list.proto

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ message ApplicationKeyMapping {
string keyType = 4;
string envID = 5;
int64 timestamp = 6;
string organization = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ message ApplicationMapping {
string uuid = 1;
string applicationRef = 2;
string subscriptionRef = 3;
string organization = 4;
}
1 change: 0 additions & 1 deletion adapter/internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func runManagementServer(conf *config.Config, server xdsv3.Server, enforcerServe
discoveryv3.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
configservice.RegisterConfigDiscoveryServiceServer(grpcServer, enforcerServer)
apiservice.RegisterApiDiscoveryServiceServer(grpcServer, enforcerServer)
subscriptionservice.RegisterApiListDiscoveryServiceServer(grpcServer, enforcerAPIDsSrv)
subscriptionservice.RegisterJWTIssuerDiscoveryServiceServer(grpcServer, enforcerJwtIssuerDsSrv)
// register health service
healthservice.RegisterHealthServer(grpcServer, &health.Server{})
Expand Down
112 changes: 0 additions & 112 deletions adapter/internal/discovery/xds/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
package xds

import (
"strconv"

"github.com/wso2/apk/adapter/config"
logger "github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/config/enforcer"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription"
"github.com/wso2/apk/adapter/pkg/eventhub/types"
)

var (
// APIListMap has the following mapping label -> apiUUID -> API (Metadata)
APIListMap map[string]map[string]*subscription.APIs
)

// EventType is a enum to distinguish Create, Update and Delete Events
Expand Down Expand Up @@ -174,105 +164,3 @@ func marshalAnalyticsPublishers(config config.Config) []*enforcer.AnalyticsPubli
}
return resolvedAnalyticsPublishers
}

// marshalAPIListMapToList converts the data into APIList proto type
func marshalAPIListMapToList(apiMap map[string]*subscription.APIs) *subscription.APIList {
apis := []*subscription.APIs{}
for _, api := range apiMap {
apis = append(apis, api)
}

return &subscription.APIList{
List: apis,
}
}

// MarshalAPIMetataAndReturnList updates the internal APIListMap and returns the XDS compatible APIList.
// apiList is the internal APIList object (For single API, this would contain a List with just one API)
// initialAPIUUIDListMap is assigned during startup when global adapter is associated. This would be empty otherwise.
// gatewayLabel is the environment.
func MarshalAPIMetataAndReturnList(apiList *types.APIList, initialAPIUUIDListMap map[string]int, gatewayLabel string) *subscription.APIList {

if APIListMap == nil {
APIListMap = make(map[string]map[string]*subscription.APIs)
}
// var resourceMapForLabel map[string]*subscription.APIs
if _, ok := APIListMap[gatewayLabel]; !ok {
APIListMap[gatewayLabel] = make(map[string]*subscription.APIs)
}
resourceMapForLabel := APIListMap[gatewayLabel]
for item := range apiList.List {
api := apiList.List[item]
// initialAPIUUIDListMap is not null if the adapter is running with global adapter enabled, and it is
// the first method invocation.
if initialAPIUUIDListMap != nil {
if _, ok := initialAPIUUIDListMap[api.UUID]; !ok {
continue
}
}
newAPI := marshalAPIMetadata(&api)
resourceMapForLabel[api.UUID] = newAPI
}
return marshalAPIListMapToList(resourceMapForLabel)
}

// DeleteAPIAndReturnList removes the API from internal maps and returns the marshalled API List.
// If the apiUUID is not found in the internal map under the provided environment, then it would return a
// nil value. Hence it is required to check if the return value is nil, prior to updating the XDS cache.
func DeleteAPIAndReturnList(apiUUID, organizationUUID string, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
}
delete(APIListMap[gatewayLabel], apiUUID)
return marshalAPIListMapToList(APIListMap[gatewayLabel])
}

// MarshalAPIForLifeCycleChangeEventAndReturnList updates the internal map's API instances lifecycle state only if
// stored API Instance's or input status event is a blocked event.
// If no change is applied, it would return nil. Hence the XDS cache should not be updated.
func MarshalAPIForLifeCycleChangeEventAndReturnList(apiUUID, status, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
}
if _, ok := APIListMap[gatewayLabel][apiUUID]; !ok {
logger.LoggerXds.Debugf("No API Metadata for API ID: %s is available under gateway Environment : %s",
apiUUID, gatewayLabel)
return nil
}
storedAPILCState := APIListMap[gatewayLabel][apiUUID].LcState

// Because the adapter only required to update the XDS if it is related to blocked state.
if !(storedAPILCState == blockedStatus || status == blockedStatus) {
return nil
}
APIListMap[gatewayLabel][apiUUID].LcState = status
return marshalAPIListMapToList(APIListMap[gatewayLabel])
}

func marshalAPIMetadata(api *types.API) *subscription.APIs {
return &subscription.APIs{
ApiId: strconv.Itoa(api.APIID),
Name: api.Name,
Provider: api.Provider,
Version: api.Version,
BasePath: api.BasePath,
Policy: api.Policy,
ApiType: api.APIType,
Uuid: api.UUID,
IsDefaultVersion: api.IsDefaultVersion,
LcState: api.APIStatus,
}
}

// CheckIfAPIMetadataIsAlreadyAvailable returns true only if the API Metadata for the given API UUID
// is already available
func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool {
if _, labelAvailable := APIListMap[label]; labelAvailable {
if _, apiAvailale := APIListMap[label][apiUUID]; apiAvailale {
return true
}
}
return false
}
37 changes: 1 addition & 36 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/wso2/apk/adapter/internal/oasparser/envoyconf"
"github.com/wso2/apk/adapter/internal/oasparser/model"
operatorconsts "github.com/wso2/apk/adapter/internal/operator/constants"
disc_api "github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/api"
"github.com/wso2/apk/adapter/pkg/discovery/api/wso2/discovery/subscription"
wso2_cache "github.com/wso2/apk/adapter/pkg/discovery/protocol/cache/v3"
wso2_resource "github.com/wso2/apk/adapter/pkg/discovery/protocol/resource/v3"
Expand Down Expand Up @@ -326,6 +325,7 @@ func updateXdsCacheOnAPIChange(oldLabels []string, newLabels []string) bool {
for _, oldLabel := range oldLabels {
if !stringutils.StringInSlice(oldLabel, newLabels) {
listeners, clusters, routes, endpoints, apis := GenerateEnvoyResoucesForGateway(oldLabel)
GenerateEnvoyResoucesForGateway(oldLabel)
UpdateEnforcerApis(oldLabel, apis, "")
UpdateXdsCacheWithLock(oldLabel, endpoints, clusters, routes, listeners)
logger.LoggerXds.Debugf("Xds Cache is updated for the already existing label : %v", oldLabel)
Expand Down Expand Up @@ -533,22 +533,6 @@ func UpdateEnforcerApis(label string, apis []types.Resource, version string) {
}
logger.LoggerXds.Infof("New API cache update for the label: " + label + " version: " + fmt.Sprint(version))

subAPIs := []*subscription.APIs{}
for _, api := range apis {
subAPI := subscription.APIs{}
subAPI.ApiId = api.(*disc_api.Api).GetId()
subAPI.Name = api.(*disc_api.Api).GetTitle()
subAPI.Version = api.(*disc_api.Api).GetVersion()
subAPI.BasePath = api.(*disc_api.Api).GetBasePath()
subAPI.Policy = api.(*disc_api.Api).GetTier()
subAPI.ApiType = api.(*disc_api.Api).GetApiType()
subAPI.Uuid = api.(*disc_api.Api).GetId()
subAPIs = append(subAPIs, &subAPI)
}
subAPIList := &subscription.APIList{
List: subAPIs,
}
UpdateEnforcerAPIList(label, subAPIList)
}

// UpdateEnforcerJWTIssuers sets new update to the enforcer's Applications
Expand All @@ -571,25 +555,6 @@ func UpdateEnforcerJWTIssuers(jwtIssuers *subscription.JWTIssuerList) {
logger.LoggerXds.Infof("New JWTIssuer cache update for the label: " + label + " version: " + fmt.Sprint(version))
}

// UpdateEnforcerAPIList sets new update to the enforcer's Apis
func UpdateEnforcerAPIList(label string, apis *subscription.APIList) {
logger.LoggerXds.Debugf("Updating Enforcer API Cache")
apiList := append(enforcerLabelMap[label].apiList, apis)

version, _ := crand.Int(crand.Reader, maxRandomBigInt())
snap, _ := wso2_cache.NewSnapshot(fmt.Sprint(version), map[wso2_resource.Type][]types.Resource{
wso2_resource.APIListType: apiList,
})
snap.Consistent()

errSetSnap := enforcerAPICache.SetSnapshot(context.Background(), label, snap)
if errSetSnap != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1414, logging.MAJOR, "Error while setting the snapshot : %v", errSetSnap.Error()))
}
enforcerLabelMap[label].apiList = apiList
logger.LoggerXds.Infof("New API List cache update for the label: " + label + " version: " + fmt.Sprint(version))
}

// UpdateXdsCacheWithLock uses mutex and lock to avoid different go routines updating XDS at the same time
func UpdateXdsCacheWithLock(label string, endpoints []types.Resource, clusters []types.Resource, routes []types.Resource,
listeners []types.Resource) bool {
Expand Down
Loading

0 comments on commit df3ac53

Please sign in to comment.