From 77405a16fca0283ffa793836d5ad4c8f00098476 Mon Sep 17 00:00:00 2001 From: Tharsanan1 Date: Thu, 26 Sep 2024 09:27:04 +0530 Subject: [PATCH 1/2] Add airl DP to CP flow --- .../internal/controlplane/eventPublisher.go | 11 +++ .../operator/controllers/dp/api_controller.go | 91 +++++++++++++++---- 2 files changed, 82 insertions(+), 20 deletions(-) diff --git a/adapter/internal/controlplane/eventPublisher.go b/adapter/internal/controlplane/eventPublisher.go index f01f39c50..3cad85379 100644 --- a/adapter/internal/controlplane/eventPublisher.go +++ b/adapter/internal/controlplane/eventPublisher.go @@ -99,6 +99,17 @@ type API struct { APIKeyHeader string `json:"apiKeyHeader"` Operations []Operation `json:"operations"` APIHash string `json:"-"` + SandAIRL *AIRL `json:"sandAIRL"` + ProdAIRL *AIRL `json:"prodAIRL"` +} + +// AIRL holds AI ratelimit related data +type AIRL struct { + PromptTokenCount *uint32 `json:"promptTokenCount"` + CompletionTokenCount *uint32 `json:"CompletionTokenCount"` + TotalTokenCount *uint32 `json:"totalTokenCount"` + TimeUnit string `json:"timeUnit"` + RequestCount *uint32 `json:"requestCount"` } // Headers contains the request and response header modifier information diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index 267c505d5..db14c34ad 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -293,7 +293,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ if apiReconciler.apiPropagationEnabled && isAPIPropagatable(&apiState) { // Convert api state to api cp data loggers.LoggerAPKOperator.Info("Sending API deletion event to agent") - apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "") + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "", nil, nil) apiCpData.Event = controlplane.EventTypeDelete controlplane.AddToEventQueue(apiCpData) } @@ -413,10 +413,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 apiState.Authentications, namespace, err.Error()) } } - + var prodAirl *dpv1alpha3.AIRateLimitPolicy if len(prodRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" { apiState.ProdHTTPRoute = &synchronizer.HTTPRouteState{} - if apiState.ProdHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs, + if apiState.ProdHTTPRoute, prodAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs, namespace, apiState.InterceptorServiceMapping, api); err != nil { return nil, fmt.Errorf("error while resolving production httpRouteref %s in namespace :%s has not found. %s", prodRouteRefs, namespace, err.Error()) @@ -430,10 +430,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 prodRouteRefs, namespace) } } - + var sandAirl *dpv1alpha3.AIRateLimitPolicy if len(sandRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" { apiState.SandHTTPRoute = &synchronizer.HTTPRouteState{} - if apiState.SandHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs, + if apiState.SandHTTPRoute, sandAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs, namespace, apiState.InterceptorServiceMapping, api); err != nil { return nil, fmt.Errorf("error while resolving sandbox httpRouteref %s in namespace :%s has not found. %s", sandRouteRefs, namespace, err.Error()) @@ -518,7 +518,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 if push { loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent") // Publish the api data to CP - apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash) + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl) apiCpData.Event = controlplane.EventTypeCreate controlplane.AddToEventQueue(apiCpData) } @@ -540,7 +540,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 if push { loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent") // Publish the api data to CP - apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash) + apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl) apiCpData.Event = controlplane.EventTypeUpdate controlplane.AddToEventQueue(apiCpData) } @@ -586,19 +586,20 @@ func (apiReconciler *APIReconciler) resolveGQLRouteRefs(ctx context.Context, gql // - Authentications func (apiReconciler *APIReconciler) resolveHTTPRouteRefs(ctx context.Context, httpRouteState *synchronizer.HTTPRouteState, httpRouteRefs []string, namespace string, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService, - api dpv1alpha3.API) (*synchronizer.HTTPRouteState, error) { + api dpv1alpha3.API) (*synchronizer.HTTPRouteState, *dpv1alpha3.AIRateLimitPolicy, error) { var err error httpRouteState.HTTPRouteCombined, httpRouteState.HTTPRoutePartitions, err = apiReconciler.concatHTTPRoutes(ctx, httpRouteRefs, namespace, api) if err != nil { - return nil, err + return nil, nil, err } - httpRouteState.BackendMapping, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api) + var airl *dpv1alpha3.AIRateLimitPolicy + httpRouteState.BackendMapping, airl, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api) if err != nil { - return nil, err + return nil, nil, err } httpRouteState.Scopes, err = apiReconciler.getScopesForHTTPRoute(ctx, httpRouteState.HTTPRouteCombined, api) - return httpRouteState, err + return httpRouteState, airl, err } func (apiReconciler *APIReconciler) resolveGRPCRouteRefs(ctx context.Context, grpcRouteRefs []string, @@ -983,9 +984,9 @@ func (apiReconciler *APIReconciler) resolveAuthentications(ctx context.Context, func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Context, httpRouteState *synchronizer.HTTPRouteState, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService, - api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, error) { + api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, *dpv1alpha3.AIRateLimitPolicy, error) { backendMapping := make(map[string]*dpv1alpha2.ResolvedBackend) - + var airl *dpv1alpha3.AIRateLimitPolicy // Resolve backends in HTTPRoute httpRoute := httpRouteState.HTTPRouteCombined ruleIdxToAiRatelimitPolicyMapping := make(map[int]*dpv1alpha3.AIRateLimitPolicy) @@ -1004,7 +1005,11 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte } else { for _, aiRLPolicy := range aiRLPolicyList.Items { loggers.LoggerAPKOperator.Debugf("Adding mapping for ruleid: %d to aiRLPolicy: %s", id, utils.NamespacedName(&aiRLPolicy)) + if aiRLPolicy.Spec.Override == nil { + aiRLPolicy.Spec.Override = aiRLPolicy.Spec.Default + } ruleIdxToAiRatelimitPolicyMapping[id] = &aiRLPolicy + airl = &aiRLPolicy } } if _, exists := backendMapping[backendNamespacedName.String()]; !exists { @@ -1012,7 +1017,7 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte if resolvedBackend != nil { backendMapping[backendNamespacedName.String()] = resolvedBackend } else { - return nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String()) + return nil, nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String()) } } @@ -1031,18 +1036,18 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte if resolvedMirrorBackend != nil { backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend } else { - return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String()) + return nil, nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String()) } } } else if string(*mirrorBackend.Kind) == constants.KindService { var err error service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name)) if err != nil { - return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String()) + return nil, nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String()) } backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port)) if err != nil { - return nil, fmt.Errorf("error in getting service information %s", service) + return nil, nil, fmt.Errorf("error in getting service information %s", service) } } } @@ -1057,7 +1062,7 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte } loggers.LoggerAPKOperator.Debugf("Generated backendMapping: %v", backendMapping) - return backendMapping, nil + return backendMapping, airl, nil } // These proxy methods are designed as intermediaries for the getAPIsFor methods. @@ -2829,7 +2834,7 @@ func prepareOwnerReference(apiItems []dpv1alpha3.API) []metav1.OwnerReference { return ownerReferences } -func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string) controlplane.APICPEvent { +func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string, prodAIRL *dpv1alpha3.AIRateLimitPolicy, sandAIRL *dpv1alpha3.AIRateLimitPolicy) controlplane.APICPEvent { apiCPEvent := controlplane.APICPEvent{} spec := apiState.APIDefinition.Spec configMap := &corev1.ConfigMap{} @@ -2874,6 +2879,50 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, sandVhost := geSandVhost(&apiState) securityScheme, authHeader, apiKeyHeader := prepareSecuritySchemeForCP(&apiState) operations := prepareOperations(&apiState) + var sandAIRLToAgent controlplane.AIRL + var prodAIRLToAgent controlplane.AIRL + if prodAIRL != nil { + var promptTC, completionTC, totalTC, requestC *uint32 + var timeUnit string = "" + if prodAIRL.Spec.Override.TokenCount != nil { + promptTC = &prodAIRL.Spec.Override.TokenCount.RequestTokenCount + completionTC = &prodAIRL.Spec.Override.TokenCount.ResponseTokenCount + totalTC = &prodAIRL.Spec.Override.TokenCount.TotalTokenCount + timeUnit = prodAIRL.Spec.Override.TokenCount.Unit + } + if prodAIRL.Spec.Override.RequestCount != nil { + timeUnit = prodAIRL.Spec.Override.RequestCount.Unit + requestC = &prodAIRL.Spec.Override.RequestCount.RequestsPerUnit + } + prodAIRLToAgent = controlplane.AIRL{ + PromptTokenCount: promptTC, + CompletionTokenCount: completionTC, + TotalTokenCount: totalTC, + TimeUnit: timeUnit, + RequestCount: requestC, + } + } + if sandAIRL != nil { + var promptTC, completionTC, totalTC, requestC *uint32 + var timeUnit string = "" + if sandAIRL.Spec.Override.TokenCount != nil { + promptTC = &sandAIRL.Spec.Override.TokenCount.RequestTokenCount + completionTC = &sandAIRL.Spec.Override.TokenCount.ResponseTokenCount + totalTC = &sandAIRL.Spec.Override.TokenCount.TotalTokenCount + timeUnit = sandAIRL.Spec.Override.TokenCount.Unit + } + if sandAIRL.Spec.Override.RequestCount != nil { + timeUnit = sandAIRL.Spec.Override.RequestCount.Unit + requestC = &sandAIRL.Spec.Override.RequestCount.RequestsPerUnit + } + sandAIRLToAgent = controlplane.AIRL{ + PromptTokenCount: promptTC, + CompletionTokenCount: completionTC, + TotalTokenCount: totalTC, + TimeUnit: timeUnit, + RequestCount: requestC, + } + } api := controlplane.API{ APIName: spec.APIName, APIVersion: spec.APIVersion, @@ -2898,6 +2947,8 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, Operations: operations, APIHash: apiHash, APIKeyHeader: apiKeyHeader, + SandAIRL: &sandAIRLToAgent, + ProdAIRL: &prodAIRLToAgent, } apiCPEvent.API = api apiCPEvent.CRName = apiState.APIDefinition.ObjectMeta.Name From 935e5da7ecdba8437368465a2bb8f48a686a974f Mon Sep 17 00:00:00 2001 From: Tharsanan1 Date: Thu, 26 Sep 2024 09:45:42 +0530 Subject: [PATCH 2/2] Fix revive error --- adapter/internal/operator/controllers/dp/api_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index db14c34ad..eb3af1d67 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -2883,7 +2883,7 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, var prodAIRLToAgent controlplane.AIRL if prodAIRL != nil { var promptTC, completionTC, totalTC, requestC *uint32 - var timeUnit string = "" + var timeUnit string if prodAIRL.Spec.Override.TokenCount != nil { promptTC = &prodAIRL.Spec.Override.TokenCount.RequestTokenCount completionTC = &prodAIRL.Spec.Override.TokenCount.ResponseTokenCount @@ -2904,7 +2904,7 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, } if sandAIRL != nil { var promptTC, completionTC, totalTC, requestC *uint32 - var timeUnit string = "" + var timeUnit string if sandAIRL.Spec.Override.TokenCount != nil { promptTC = &sandAIRL.Spec.Override.TokenCount.RequestTokenCount completionTC = &sandAIRL.Spec.Override.TokenCount.ResponseTokenCount