Skip to content

Commit

Permalink
Merge pull request #2490 from Tharsanan1/apk-agent-airl
Browse files Browse the repository at this point in the history
Add airl DP to CP flow
  • Loading branch information
CrowleyRajapakse authored Sep 26, 2024
2 parents 8880e10 + 935e5da commit ff49dfb
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 20 deletions.
11 changes: 11 additions & 0 deletions adapter/internal/controlplane/eventPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ type API struct {
APIKeyHeader string `json:"apiKeyHeader"`
Operations []Operation `json:"operations"`
APIHash string `json:"-"`
SandAIRL *AIRL `json:"sandAIRL"`
ProdAIRL *AIRL `json:"prodAIRL"`
}

// AIRL holds AI ratelimit related data
type AIRL struct {
PromptTokenCount *uint32 `json:"promptTokenCount"`
CompletionTokenCount *uint32 `json:"CompletionTokenCount"`
TotalTokenCount *uint32 `json:"totalTokenCount"`
TimeUnit string `json:"timeUnit"`
RequestCount *uint32 `json:"requestCount"`
}

// Headers contains the request and response header modifier information
Expand Down
91 changes: 71 additions & 20 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (apiReconciler *APIReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if apiReconciler.apiPropagationEnabled && isAPIPropagatable(&apiState) {
// Convert api state to api cp data
loggers.LoggerAPKOperator.Info("Sending API deletion event to agent")
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "")
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, apiState, "", nil, nil)
apiCpData.Event = controlplane.EventTypeDelete
controlplane.AddToEventQueue(apiCpData)
}
Expand Down Expand Up @@ -413,10 +413,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
apiState.Authentications, namespace, err.Error())
}
}

var prodAirl *dpv1alpha3.AIRateLimitPolicy
if len(prodRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" {
apiState.ProdHTTPRoute = &synchronizer.HTTPRouteState{}
if apiState.ProdHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs,
if apiState.ProdHTTPRoute, prodAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.ProdHTTPRoute, prodRouteRefs,
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving production httpRouteref %s in namespace :%s has not found. %s",
prodRouteRefs, namespace, err.Error())
Expand All @@ -430,10 +430,10 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
prodRouteRefs, namespace)
}
}

var sandAirl *dpv1alpha3.AIRateLimitPolicy
if len(sandRouteRefs) > 0 && apiState.APIDefinition.Spec.APIType == "REST" {
apiState.SandHTTPRoute = &synchronizer.HTTPRouteState{}
if apiState.SandHTTPRoute, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs,
if apiState.SandHTTPRoute, sandAirl, err = apiReconciler.resolveHTTPRouteRefs(ctx, apiState.SandHTTPRoute, sandRouteRefs,
namespace, apiState.InterceptorServiceMapping, api); err != nil {
return nil, fmt.Errorf("error while resolving sandbox httpRouteref %s in namespace :%s has not found. %s",
sandRouteRefs, namespace, err.Error())
Expand Down Expand Up @@ -518,7 +518,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
if push {
loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent")
// Publish the api data to CP
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash)
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl)
apiCpData.Event = controlplane.EventTypeCreate
controlplane.AddToEventQueue(apiCpData)
}
Expand All @@ -540,7 +540,7 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1
if push {
loggers.LoggerAPKOperator.Infof("API hash changed sending the API to agent")
// Publish the api data to CP
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash)
apiCpData := apiReconciler.convertAPIStateToAPICp(ctx, *apiState, apiHash, prodAirl, sandAirl)
apiCpData.Event = controlplane.EventTypeUpdate
controlplane.AddToEventQueue(apiCpData)
}
Expand Down Expand Up @@ -586,19 +586,20 @@ func (apiReconciler *APIReconciler) resolveGQLRouteRefs(ctx context.Context, gql
// - Authentications
func (apiReconciler *APIReconciler) resolveHTTPRouteRefs(ctx context.Context, httpRouteState *synchronizer.HTTPRouteState,
httpRouteRefs []string, namespace string, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (*synchronizer.HTTPRouteState, error) {
api dpv1alpha3.API) (*synchronizer.HTTPRouteState, *dpv1alpha3.AIRateLimitPolicy, error) {
var err error
httpRouteState.HTTPRouteCombined, httpRouteState.HTTPRoutePartitions, err = apiReconciler.concatHTTPRoutes(ctx, httpRouteRefs, namespace, api)
if err != nil {
return nil, err
return nil, nil, err
}
httpRouteState.BackendMapping, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api)
var airl *dpv1alpha3.AIRateLimitPolicy
httpRouteState.BackendMapping, airl, err = apiReconciler.getResolvedBackendsMapping(ctx, httpRouteState, interceptorServiceMapping, api)
if err != nil {
return nil, err
return nil, nil, err
}
httpRouteState.Scopes, err = apiReconciler.getScopesForHTTPRoute(ctx, httpRouteState.HTTPRouteCombined, api)

return httpRouteState, err
return httpRouteState, airl, err
}

func (apiReconciler *APIReconciler) resolveGRPCRouteRefs(ctx context.Context, grpcRouteRefs []string,
Expand Down Expand Up @@ -983,9 +984,9 @@ func (apiReconciler *APIReconciler) resolveAuthentications(ctx context.Context,

func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Context,
httpRouteState *synchronizer.HTTPRouteState, interceptorServiceMapping map[string]dpv1alpha1.InterceptorService,
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, error) {
api dpv1alpha3.API) (map[string]*dpv1alpha2.ResolvedBackend, *dpv1alpha3.AIRateLimitPolicy, error) {
backendMapping := make(map[string]*dpv1alpha2.ResolvedBackend)

var airl *dpv1alpha3.AIRateLimitPolicy
// Resolve backends in HTTPRoute
httpRoute := httpRouteState.HTTPRouteCombined
ruleIdxToAiRatelimitPolicyMapping := make(map[int]*dpv1alpha3.AIRateLimitPolicy)
Expand All @@ -1004,15 +1005,19 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
} else {
for _, aiRLPolicy := range aiRLPolicyList.Items {
loggers.LoggerAPKOperator.Debugf("Adding mapping for ruleid: %d to aiRLPolicy: %s", id, utils.NamespacedName(&aiRLPolicy))
if aiRLPolicy.Spec.Override == nil {
aiRLPolicy.Spec.Override = aiRLPolicy.Spec.Default
}
ruleIdxToAiRatelimitPolicyMapping[id] = &aiRLPolicy
airl = &aiRLPolicy
}
}
if _, exists := backendMapping[backendNamespacedName.String()]; !exists {
resolvedBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, backendNamespacedName, &api)
if resolvedBackend != nil {
backendMapping[backendNamespacedName.String()] = resolvedBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find backend %s", backendNamespacedName.String())
}
}

Expand All @@ -1031,18 +1036,18 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
}
}
} else if string(*mirrorBackend.Kind) == constants.KindService {
var err error
service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name))
if err != nil {
return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
return nil, nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
}
backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port))
if err != nil {
return nil, fmt.Errorf("error in getting service information %s", service)
return nil, nil, fmt.Errorf("error in getting service information %s", service)
}
}
}
Expand All @@ -1057,7 +1062,7 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
}

loggers.LoggerAPKOperator.Debugf("Generated backendMapping: %v", backendMapping)
return backendMapping, nil
return backendMapping, airl, nil
}

// These proxy methods are designed as intermediaries for the getAPIsFor<CR objects> methods.
Expand Down Expand Up @@ -2829,7 +2834,7 @@ func prepareOwnerReference(apiItems []dpv1alpha3.API) []metav1.OwnerReference {
return ownerReferences
}

func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string) controlplane.APICPEvent {
func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context, apiState synchronizer.APIState, apiHash string, prodAIRL *dpv1alpha3.AIRateLimitPolicy, sandAIRL *dpv1alpha3.AIRateLimitPolicy) controlplane.APICPEvent {
apiCPEvent := controlplane.APICPEvent{}
spec := apiState.APIDefinition.Spec
configMap := &corev1.ConfigMap{}
Expand Down Expand Up @@ -2874,6 +2879,50 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context,
sandVhost := geSandVhost(&apiState)
securityScheme, authHeader, apiKeyHeader := prepareSecuritySchemeForCP(&apiState)
operations := prepareOperations(&apiState)
var sandAIRLToAgent controlplane.AIRL
var prodAIRLToAgent controlplane.AIRL
if prodAIRL != nil {
var promptTC, completionTC, totalTC, requestC *uint32
var timeUnit string
if prodAIRL.Spec.Override.TokenCount != nil {
promptTC = &prodAIRL.Spec.Override.TokenCount.RequestTokenCount
completionTC = &prodAIRL.Spec.Override.TokenCount.ResponseTokenCount
totalTC = &prodAIRL.Spec.Override.TokenCount.TotalTokenCount
timeUnit = prodAIRL.Spec.Override.TokenCount.Unit
}
if prodAIRL.Spec.Override.RequestCount != nil {
timeUnit = prodAIRL.Spec.Override.RequestCount.Unit
requestC = &prodAIRL.Spec.Override.RequestCount.RequestsPerUnit
}
prodAIRLToAgent = controlplane.AIRL{
PromptTokenCount: promptTC,
CompletionTokenCount: completionTC,
TotalTokenCount: totalTC,
TimeUnit: timeUnit,
RequestCount: requestC,
}
}
if sandAIRL != nil {
var promptTC, completionTC, totalTC, requestC *uint32
var timeUnit string
if sandAIRL.Spec.Override.TokenCount != nil {
promptTC = &sandAIRL.Spec.Override.TokenCount.RequestTokenCount
completionTC = &sandAIRL.Spec.Override.TokenCount.ResponseTokenCount
totalTC = &sandAIRL.Spec.Override.TokenCount.TotalTokenCount
timeUnit = sandAIRL.Spec.Override.TokenCount.Unit
}
if sandAIRL.Spec.Override.RequestCount != nil {
timeUnit = sandAIRL.Spec.Override.RequestCount.Unit
requestC = &sandAIRL.Spec.Override.RequestCount.RequestsPerUnit
}
sandAIRLToAgent = controlplane.AIRL{
PromptTokenCount: promptTC,
CompletionTokenCount: completionTC,
TotalTokenCount: totalTC,
TimeUnit: timeUnit,
RequestCount: requestC,
}
}
api := controlplane.API{
APIName: spec.APIName,
APIVersion: spec.APIVersion,
Expand All @@ -2898,6 +2947,8 @@ func (apiReconciler *APIReconciler) convertAPIStateToAPICp(ctx context.Context,
Operations: operations,
APIHash: apiHash,
APIKeyHeader: apiKeyHeader,
SandAIRL: &sandAIRLToAgent,
ProdAIRL: &prodAIRLToAgent,
}
apiCPEvent.API = api
apiCPEvent.CRName = apiState.APIDefinition.ObjectMeta.Name
Expand Down

0 comments on commit ff49dfb

Please sign in to comment.