Skip to content

Commit

Permalink
Wip2
Browse files Browse the repository at this point in the history
  • Loading branch information
bonzofenix committed Nov 25, 2024
1 parent 6b05fa5 commit 87801c3
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 78 deletions.
7 changes: 7 additions & 0 deletions src/autoscaler/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,16 @@ func loadVcapConfig(conf *Config, vcapReader configutil.VCAPConfigurationReader)
if err := configureBindingDb(conf, vcapReader); err != nil {
return err
}

configureCfInstanceCert(conf)

return nil
}

func configureCfInstanceCert(conf *Config) {
conf.CfInstanceCert = os.Getenv("CF_INSTANCE_CERT")
}

func configurePolicyDb(conf *Config, vcapReader configutil.VCAPConfigurationReader) error {
currentPolicyDb, ok := conf.Db[db.PolicyDb]
if !ok {
Expand Down
18 changes: 15 additions & 3 deletions src/autoscaler/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config_test

import (
"fmt"
"os"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/fakes"
Expand Down Expand Up @@ -43,13 +44,24 @@ var _ = Describe("Config", func() {

When("vcap CF_INSTANCE_CERT is set", func() {
BeforeEach(func() {
mockVCAPConfigurationReader
os.Setenv("CF_INSTANCE_CERT", "cert")
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CFInstanceCert).To(Equal("cert"))
}
Expect(conf.CfInstanceCert).To(Equal("cert"))
})
})

When("vcap CF_INSTANCE_CERT is not set", func() {
BeforeEach(func() {
os.Unsetenv("CF_INSTANCE_CERT")
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CfInstanceCert).To(Equal(""))
})
})

When("vcap PORT is set to a number", func() {
Expand Down
160 changes: 85 additions & 75 deletions src/autoscaler/api/publicapiserver/public_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/cred_helper"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/models"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/routes"
"github.com/google/uuid"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/lager/v3"
"github.com/google/uuid"
)

type PublicApiHandler struct {
Expand Down Expand Up @@ -55,22 +54,26 @@ func NewPublicApiHandler(logger lager.Logger, conf *config.Config, policydb db.P
policydb: policydb,
bindingdb: bindingdb,
eventGeneratorClient: egClient,
policyValidator: policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
policyValidator: createPolicyValidator(conf),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
}
}

func createPolicyValidator(conf *config.Config) *policyvalidator.PolicyValidator {
return policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
)
}

func writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
handlers.WriteJSONResponse(w, statusCode, models.ErrorResponse{
Code: http.StatusText(statusCode),
Expand All @@ -84,6 +87,7 @@ func (h *PublicApiHandler) GetScalingPolicy(w http.ResponseWriter, r *http.Reque
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("GetScalingPolicy", lager.Data{"appId": appId})
logger.Info("Get Scaling Policy")

Expand Down Expand Up @@ -129,15 +133,14 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}

policyGuid := uuid.NewString()
err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid)
if err != nil {
if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to save policy", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error saving policy")
return
}

h.logger.Info("creating/updating schedules", lager.Data{"policy": policy})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid)
if err != nil {
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to create/update schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -151,7 +154,7 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}
_, err = w.Write(response)
if err != nil {
logger.Error("Failed to write body", err)
h.logger.Error("Failed to write body", err)
}
}

Expand All @@ -162,17 +165,18 @@ func (h *PublicApiHandler) DetachScalingPolicy(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("DetachScalingPolicy", lager.Data{"appId": appId})
logger.Info("Deleting policy json", lager.Data{"appId": appId})
err := h.policydb.DeletePolicy(r.Context(), appId)
if err != nil {

if err := h.policydb.DeletePolicy(r.Context(), appId); err != nil {
logger.Error("Failed to delete policy from database", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting policy")
return
}

logger.Info("Deleting schedules")
err = h.schedulerUtil.DeleteSchedule(r.Context(), appId)
if err != nil {
if err := h.schedulerUtil.DeleteSchedule(r.Context(), appId); err != nil {
logger.Error("Failed to delete schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting schedules")
return
Expand All @@ -181,51 +185,62 @@ func (h *PublicApiHandler) DetachScalingPolicy(w http.ResponseWriter, r *http.Re
if h.bindingdb != nil && !reflect.ValueOf(h.bindingdb).IsNil() {
//TODO this is a copy of part of the attach ... this should use a common function.
// brokered offering: check if there's a default policy that could apply
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
if err := h.handleDefaultPolicy(w, r, logger, appId); err != nil {
return
}
if serviceInstance.DefaultPolicy != "" {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})
var policy *models.ScalingPolicy
err := json.Unmarshal([]byte(policyStr), &policy)
if err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return
}

err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr)
if err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return
}

logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr)
//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
if err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
}
}
}
// find via the app id the binding -> service instance
// default policy? then apply that

w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte("{}"))
_, err := w.Write([]byte("{}"))
if err != nil {
logger.Error(ActionWriteBody, err)
}
}

func (h *PublicApiHandler) handleDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string) error {
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
return errors.New("error retrieving service instance")

}

if serviceInstance.DefaultPolicy != "" {
return h.saveDefaultPolicy(w, r, logger, appId, serviceInstance)
}

return nil
}

func (h *PublicApiHandler) saveDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string, serviceInstance *models.ServiceInstance) error {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})

var policy *models.ScalingPolicy
if err := json.Unmarshal([]byte(policyStr), &policy); err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return errors.New("default policy not valid")
}

if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return errors.New("error attaching the default policy")
}

logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
return errors.New("failed to update schedule")
}

return nil
}

func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metricType string, w http.ResponseWriter, req *http.Request, parameters *url.Values, requestDescription string) {
reqUrl := req.URL
r := routes.NewRouter()
Expand All @@ -242,26 +257,13 @@ func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metri
}

aUrl := h.conf.EventGenerator.EventGeneratorUrl + path.RequestURI() + "?" + parameters.Encode()

req, err = http.NewRequest("GET", aUrl, nil)

if h.conf.CfInstanceCert != "" {
certPEM := []byte(h.conf.CfInstanceCert)

// Calculate SHA-256 hash of the certificate
hash := sha256.Sum256(certPEM)

// URL encode the PEM certificate
encodedCert := url.QueryEscape(string(certPEM))

// Construct the XFCC header value
xfccHeader := fmt.Sprintf("Hash=%x;Cert=\"%s\"", hash, encodedCert)

req.Header.Set("X-Forwarded-Client-Cert", xfccHeader)
h.setXForwardedClientCertHeader(req)
}

resp, err := h.eventGeneratorClient.Do(req)

if err != nil {
logger.Error("Failed to retrieve "+requestDescription, err, lager.Data{"url": aUrl})
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving "+requestDescription)
Expand All @@ -281,6 +283,7 @@ func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metri
writeErrorResponse(w, resp.StatusCode, string(responseData))
return
}

paginatedResponse, err := paginateResource(responseData, parameters, reqUrl)
if err != nil {
handlers.WriteJSONResponse(w, http.StatusInternalServerError, err.Error())
Expand All @@ -290,6 +293,14 @@ func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metri
handlers.WriteJSONResponse(w, resp.StatusCode, paginatedResponse)
}

func (h *PublicApiHandler) setXForwardedClientCertHeader(req *http.Request) {
certPEM := []byte(h.conf.CfInstanceCert)
hash := sha256.Sum256(certPEM)
encodedCert := url.QueryEscape(string(certPEM))
xfccHeader := fmt.Sprintf("Hash=%x;Cert=\"%s\"", hash, encodedCert)
req.Header.Set("X-Forwarded-Client-Cert", xfccHeader)
}

func (h *PublicApiHandler) GetAggregatedMetricsHistories(w http.ResponseWriter, req *http.Request, vars map[string]string) {
appId := vars["appId"]
metricType := vars["metricType"]
Expand All @@ -309,7 +320,6 @@ func (h *PublicApiHandler) GetAggregatedMetricsHistories(w http.ResponseWriter,
}

h.proxyRequest(logger, appId, metricType, w, req, parameters, "metrics history from eventgenerator")
//proxyRequest(pathFn, h.eventGeneratorClient, w, req.URL, parameters, "metrics history from eventgenerator", logger)
}

func (h *PublicApiHandler) GetApiInfo(w http.ResponseWriter, _ *http.Request, _ map[string]string) {
Expand Down

0 comments on commit 87801c3

Please sign in to comment.