Skip to content

Commit

Permalink
Adding resource level rate limit support for sandbox routes
Browse files Browse the repository at this point in the history
  • Loading branch information
pubudu538 committed Oct 17, 2023
1 parent d32b39c commit 16c5a85
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 124 deletions.
2 changes: 2 additions & 0 deletions adapter/internal/oasparser/envoyconf/internal_dtos.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type routeCreateParams struct {
apiLevelRateLimitPolicy *model.RateLimitPolicy
apiProperties []dpv1alpha1.Property
environment string
envType string
}

// RatelimitCriteria criterias of rate limiting
Expand All @@ -52,4 +53,5 @@ type ratelimitCriteria struct {
organizationID string
basePathForRLService string
environment string
envType string
}
8 changes: 7 additions & 1 deletion adapter/internal/oasparser/envoyconf/routes_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
logger "github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/internal/oasparser/constants"
"github.com/wso2/apk/adapter/internal/oasparser/model"
opConstants "github.com/wso2/apk/adapter/internal/operator/constants"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -112,6 +113,11 @@ func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, rate

func generateRateLimitPolicy(ratelimitCriteria *ratelimitCriteria) []*routev3.RateLimit {

environmentValue := ratelimitCriteria.environment
if ratelimitCriteria.level != RateLimitPolicyAPILevel && ratelimitCriteria.envType == opConstants.Sandbox {
environmentValue += "_sandbox"
}

rateLimit := routev3.RateLimit{
Actions: []*routev3.RateLimit_Action{
{
Expand All @@ -126,7 +132,7 @@ func generateRateLimitPolicy(ratelimitCriteria *ratelimitCriteria) []*routev3.Ra
ActionSpecifier: &routev3.RateLimit_Action_GenericKey_{
GenericKey: &routev3.RateLimit_Action_GenericKey{
DescriptorKey: DescriptorKeyForEnvironment,
DescriptorValue: ratelimitCriteria.environment,
DescriptorValue: environmentValue,
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
organizationID: params.organizationID,
basePathForRLService: basePathForRLService,
environment: params.environment,
envType: params.envType,
}
}
var (
Expand Down Expand Up @@ -1529,6 +1530,7 @@ func genRouteCreateParams(swagger *model.AdapterInternalAPI, resource *model.Res
routeConfig: resource.GetEndpoints().Config,
createDefaultPath: createDefaultPath,
environment: swagger.GetEnvironment(),
envType: swagger.EnvType,
}
return params
}
Expand Down
14 changes: 7 additions & 7 deletions common-controller/internal/cache/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ import (

// RatelimitDataStore is a cache for rate limit policies.
type RatelimitDataStore struct {
resolveRatelimitStore map[types.NamespacedName]*dpv1alpha1.ResolveRateLimitAPIPolicy
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
}

// CreateNewOperatorDataStore creates a new RatelimitDataStore.
func CreateNewOperatorDataStore() *RatelimitDataStore {
return &RatelimitDataStore{
resolveRatelimitStore: map[types.NamespacedName]*dpv1alpha1.ResolveRateLimitAPIPolicy{},
resolveRatelimitStore: map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy{},
customRatelimitStore: map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef{},
}
}

// AddorUpdateResolveRatelimitToStore adds a new ratelimit to the RatelimitDataStore.
func (ods *RatelimitDataStore) AddorUpdateResolveRatelimitToStore(rateLimit types.NamespacedName,
resolveRatelimit dpv1alpha1.ResolveRateLimitAPIPolicy) {
resolveRatelimitPolicyList []dpv1alpha1.ResolveRateLimitAPIPolicy) {
ods.mu.Lock()
defer ods.mu.Unlock()
logger.Debug("Adding/Updating ratelimit to cache")
ods.resolveRatelimitStore[rateLimit] = &resolveRatelimit
ods.resolveRatelimitStore[rateLimit] = resolveRatelimitPolicyList
}

// AddorUpdateCustomRatelimitToStore adds a new ratelimit to the RatelimitDataStore.
Expand All @@ -60,11 +60,11 @@ func (ods *RatelimitDataStore) AddorUpdateCustomRatelimitToStore(rateLimit types
}

// GetResolveRatelimitPolicy get cached ratelimit
func (ods *RatelimitDataStore) GetResolveRatelimitPolicy(rateLimit types.NamespacedName) (dpv1alpha1.ResolveRateLimitAPIPolicy, bool) {
var rateLimitPolicy dpv1alpha1.ResolveRateLimitAPIPolicy
func (ods *RatelimitDataStore) GetResolveRatelimitPolicy(rateLimit types.NamespacedName) ([]dpv1alpha1.ResolveRateLimitAPIPolicy, bool) {
var rateLimitPolicy []dpv1alpha1.ResolveRateLimitAPIPolicy
if cachedRatelimit, found := ods.resolveRatelimitStore[rateLimit]; found {
logger.Debug("Found cached ratelimit")
return *cachedRatelimit, true
return cachedRatelimit, true
}
return rateLimitPolicy, false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,12 @@ func (ratelimitReconsiler *RateLimitPolicyReconciler) Reconcile(ctx context.Cont

// Check k8s RatelimitPolicy Availbility
if err := ratelimitReconsiler.client.Get(ctx, ratelimitKey, &ratelimitPolicy); err != nil {
resolveRateLimitAPIPolicy, found := ratelimitReconsiler.ods.GetResolveRatelimitPolicy(req.NamespacedName)
resolveRateLimitAPIPolicyList, found := ratelimitReconsiler.ods.GetResolveRatelimitPolicy(req.NamespacedName)
// If availble in cache Delete cache and xds
if found && k8error.IsNotFound(err) {
ratelimitReconsiler.ods.DeleteResolveRatelimitPolicy(req.NamespacedName)
xds.DeleteAPILevelRateLimitPolicies(resolveRateLimitAPIPolicy)
if resolveRateLimitAPIPolicy.Resources != nil {
xds.DeleteResourceLevelRateLimitPolicies(resolveRateLimitAPIPolicy)
}
xds.DeleteAPILevelRateLimitPolicies(resolveRateLimitAPIPolicyList)
xds.DeleteResourceLevelRateLimitPolicies(resolveRateLimitAPIPolicyList)
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
resolveCustomRateLimitPolicy, foundCustom := ratelimitReconsiler.ods.GetCachedCustomRatelimitPolicy(req.NamespacedName)
Expand All @@ -160,11 +158,11 @@ func (ratelimitReconsiler *RateLimitPolicyReconciler) Reconcile(ctx context.Cont
xds.UpdateRateLimitXDSCacheForCustomPolicies(customRateLimitPolicy)
} else {

if resolveRatelimit, err := ratelimitReconsiler.marshelRateLimit(ctx, ratelimitKey, ratelimitPolicy); err != nil {
if resolveRatelimitPolicyList, err := ratelimitReconsiler.marshelRateLimit(ctx, ratelimitKey, ratelimitPolicy); err != nil {
return ctrl.Result{}, err
} else if resolveRatelimit != nil {
ratelimitReconsiler.ods.AddorUpdateResolveRatelimitToStore(ratelimitKey, *resolveRatelimit)
xds.UpdateRateLimitXDSCache(*resolveRatelimit)
} else if len(resolveRatelimitPolicyList) > 0 {
ratelimitReconsiler.ods.AddorUpdateResolveRatelimitToStore(ratelimitKey, resolveRatelimitPolicyList)
xds.UpdateRateLimitXDSCache(resolveRatelimitPolicyList)
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
Expand Down Expand Up @@ -239,21 +237,26 @@ func (ratelimitReconsiler *RateLimitPolicyReconciler) getRatelimitForHTTPRoute(c
}

func (ratelimitReconsiler *RateLimitPolicyReconciler) marshelRateLimit(ctx context.Context, ratelimitKey types.NamespacedName,
ratelimitPolicy dpv1alpha1.RateLimitPolicy) (*dpv1alpha1.ResolveRateLimitAPIPolicy, error) {
ratelimitPolicy dpv1alpha1.RateLimitPolicy) ([]dpv1alpha1.ResolveRateLimitAPIPolicy, error) {

policyList := []dpv1alpha1.ResolveRateLimitAPIPolicy{}
var api dpv1alpha1.API
var resolveResourceList []dpv1alpha1.ResolveResource
var resolveRatelimit dpv1alpha1.ResolveRateLimitAPIPolicy

if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: string(ratelimitPolicy.Spec.TargetRef.Name)},
&api); err != nil {
return nil, fmt.Errorf("error while getting API : %v, %s", string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}

organization := api.Spec.Organization
basePath := api.Spec.BasePath
environment := utils.GetEnvironment(api.Spec.Environment)

// API Level Rate limit policy
if ratelimitPolicy.Spec.TargetRef.Kind == constants.KindAPI {
if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: string(ratelimitPolicy.Spec.TargetRef.Name)},
&api); err != nil {
return nil, fmt.Errorf("error while getting API : %v, %s", string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}
var organization = api.Spec.Organization
var basePath = api.Spec.BasePath

var resolveRatelimit dpv1alpha1.ResolveRateLimitAPIPolicy
if ratelimitPolicy.Spec.Override != nil {
resolveRatelimit.API.RequestsPerUnit = ratelimitPolicy.Spec.Override.API.RequestsPerUnit
resolveRatelimit.API.Unit = ratelimitPolicy.Spec.Override.API.Unit
Expand All @@ -262,106 +265,95 @@ func (ratelimitReconsiler *RateLimitPolicyReconciler) marshelRateLimit(ctx conte
resolveRatelimit.API.Unit = ratelimitPolicy.Spec.Default.API.Unit
}

resolveRatelimit.Environment = utils.GetEnvironment(api.Spec.Environment)
resolveRatelimit.Environment = environment
resolveRatelimit.Organization = organization
resolveRatelimit.BasePath = basePath
resolveRatelimit.UUID = string(api.ObjectMeta.UID)
policyList = append(policyList, resolveRatelimit)
}

// Resource Level Rate limit policy
if ratelimitPolicy.Spec.TargetRef.Kind == constants.KindResource {
if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: string(ratelimitPolicy.Spec.TargetRef.Name)},
&api); err != nil {
return nil, fmt.Errorf("error while getting API : %v, %s", string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}
var organization = api.Spec.Organization
var basePath = api.Spec.BasePath
var httpRoute gwapiv1b1.HTTPRoute

var resolveRatelimit dpv1alpha1.ResolveRateLimitAPIPolicy
resolveRatelimit.Organization = organization
resolveRatelimit.BasePath = basePath
resolveRatelimit.UUID = string(api.ObjectMeta.UID)
resolveRatelimit.Environment = environment

if len(api.Spec.Production) > 0 {
for _, ref := range api.Spec.Production[0].HTTPRouteRefs {
if ref != "" {
if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: ref},
&httpRoute); err != nil {
return nil, fmt.Errorf("error while getting HTTPRoute : %v for API : %v, %s", string(ref),
string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}
for _, rule := range httpRoute.Spec.Rules {
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil {
if filter.ExtensionRef.Kind == constants.KindRateLimitPolicy && string(filter.ExtensionRef.Name) == ratelimitPolicy.Name {
var resolveResource dpv1alpha1.ResolveResource
resolveResource.Path = *rule.Matches[0].Path.Value
if rule.Matches[0].Method != nil {
resolveResource.Method = string(*rule.Matches[0].Method)
} else {
resolveResource.Method = constants.All
}
resolveResource.PathMatchType = *rule.Matches[0].Path.Type
if ratelimitPolicy.Spec.Override != nil {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Override.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Override.API.Unit
} else {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Default.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Default.API.Unit
}
resolveResourceList = append(resolveResourceList, resolveResource)
}
}
}

}
}
resolveResourceList, err := ratelimitReconsiler.getResourceList(ctx, ratelimitKey, ratelimitPolicy, api.Spec.Production[0].HTTPRouteRefs)
if err != nil {
return nil, err
}
if len(resolveResourceList) > 0 {
resolveRatelimit.Resources = resolveResourceList
policyList = append(policyList, resolveRatelimit)
}
}

if len(api.Spec.Sandbox) > 0 {
for _, ref := range api.Spec.Sandbox[0].HTTPRouteRefs {
if ref != "" {
if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: ref},
&httpRoute); err != nil {
return nil, fmt.Errorf("error while getting HTTPRoute : %v for API : %v, %s", string(ref),
string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}
for _, rule := range httpRoute.Spec.Rules {
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil {
if filter.ExtensionRef.Kind == constants.KindRateLimitPolicy && string(filter.ExtensionRef.Name) == ratelimitPolicy.Name {
var resolveResource dpv1alpha1.ResolveResource
resolveResource.Path = *rule.Matches[0].Path.Value
if rule.Matches[0].Method != nil {
resolveResource.Method = string(*rule.Matches[0].Method)
} else {
resolveResource.Method = constants.All
}
resolveResource.PathMatchType = *rule.Matches[0].Path.Type
if ratelimitPolicy.Spec.Override != nil {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Override.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Override.API.Unit
} else {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Default.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Default.API.Unit
}
resolveResourceList = append(resolveResourceList, resolveResource)
}

resolveResourceList, err := ratelimitReconsiler.getResourceList(ctx, ratelimitKey, ratelimitPolicy, api.Spec.Sandbox[0].HTTPRouteRefs)
if err != nil {
return nil, err
}
if len(resolveResourceList) > 0 {
resolveRatelimit.Resources = resolveResourceList
resolveRatelimit.Environment += "_sandbox"
policyList = append(policyList, resolveRatelimit)
}
}
}

return policyList, nil
}

func (ratelimitReconsiler *RateLimitPolicyReconciler) getResourceList(ctx context.Context, ratelimitKey types.NamespacedName,
ratelimitPolicy dpv1alpha1.RateLimitPolicy, httpRefs []string) ([]dpv1alpha1.ResolveResource, error) {

var resolveResourceList []dpv1alpha1.ResolveResource
var httpRoute gwapiv1b1.HTTPRoute

for _, ref := range httpRefs {
if ref != "" {
if err := ratelimitReconsiler.client.Get(ctx, types.NamespacedName{
Namespace: ratelimitKey.Namespace,
Name: ref},
&httpRoute); err != nil {
return nil, fmt.Errorf("error while getting HTTPRoute : %v for API : %v, %s", string(ref),
string(ratelimitPolicy.Spec.TargetRef.Name), err.Error())
}
for _, rule := range httpRoute.Spec.Rules {
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil {
if filter.ExtensionRef.Kind == constants.KindRateLimitPolicy && string(filter.ExtensionRef.Name) == ratelimitPolicy.Name {
var resolveResource dpv1alpha1.ResolveResource
resolveResource.Path = *rule.Matches[0].Path.Value
if rule.Matches[0].Method != nil {
resolveResource.Method = string(*rule.Matches[0].Method)
} else {
resolveResource.Method = constants.All
}
resolveResource.PathMatchType = *rule.Matches[0].Path.Type
if ratelimitPolicy.Spec.Override != nil {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Override.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Override.API.Unit
} else {
resolveResource.ResourceRatelimit.RequestsPerUnit = ratelimitPolicy.Spec.Default.API.RequestsPerUnit
resolveResource.ResourceRatelimit.Unit = ratelimitPolicy.Spec.Default.API.Unit
}
resolveResourceList = append(resolveResourceList, resolveResource)
}

}
}

}
}
resolveRatelimit.Organization = organization
resolveRatelimit.BasePath = basePath
resolveRatelimit.UUID = string(api.ObjectMeta.UID)
resolveRatelimit.Environment = utils.GetEnvironment(api.Spec.Environment)
resolveRatelimit.Resources = resolveResourceList
}
return &resolveRatelimit, nil

return resolveResourceList, nil
}

func (ratelimitReconsiler *RateLimitPolicyReconciler) marshelCustomRateLimit(ctx context.Context, ratelimitKey types.NamespacedName,
Expand Down
Loading

0 comments on commit 16c5a85

Please sign in to comment.