Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventgenerator): Integrate cf api with event generator #3357

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1c59a4e
Adds cf_Server config for scalingengine
bonzofenix Nov 6, 2024
8d469c7
Adds xfcc cf endpoint support to scaling engine
bonzofenix Nov 6, 2024
ef1fe51
Remove debug println
bonzofenix Nov 14, 2024
9003d42
Update jobs/scalingengine/spec
bonzofenix Nov 21, 2024
d16d128
Update jobs/scalingengine/spec
bonzofenix Nov 21, 2024
f7f2c93
Remove pending test
bonzofenix Nov 21, 2024
179a30e
Removes subrouter in api server
bonzofenix Nov 21, 2024
6653b8a
WIP
bonzofenix Nov 14, 2024
04325d3
Fix issue with routes
bonzofenix Nov 21, 2024
26cebf7
Remove EventgeneratorServer interface from server.go
bonzofenix Nov 28, 2024
ca66e6b
Adds xfcc cf endpoint support to scaling engine
bonzofenix Nov 6, 2024
0a67607
WIP
bonzofenix Nov 21, 2024
3550446
Wip2
bonzofenix Nov 23, 2024
fa53bf8
Refactor routes
bonzofenix Nov 29, 2024
002e013
Refactor X-Forwarded-Client-Cert header handling and update tests
bonzofenix Nov 29, 2024
59e25f4
Fix test
bonzofenix Nov 29, 2024
a5853f5
wip fix test
bonzofenix Dec 2, 2024
5760efa
Merge branch 'main' into integrate-cf-api-with-eventgenerator
bonzofenix Dec 4, 2024
dc4ade3
Enable generate-fakes target for integration tests in Makefile
bonzofenix Dec 4, 2024
04e8526
Fix wrong merge conflict on public api handler
bonzofenix Dec 4, 2024
85fa4f4
Fix regular expression to parse organization unit
bonzofenix Dec 4, 2024
cf1723e
Merge branch 'main' into integrate-cf-api-with-eventgenerator
bonzofenix Dec 4, 2024
2847418
Fix lint
bonzofenix Dec 4, 2024
a9168ce
Fix integration tests
bonzofenix Dec 4, 2024
c4a936c
Remove logging
bonzofenix Dec 4, 2024
b68ad10
Remove unused reflect import and delete redundant nil check in Detach…
bonzofenix Dec 12, 2024
362ee2a
Add GinkgoHelper call to ApiRunner's Start method in api_suite_test.
bonzofenix Dec 13, 2024
39d1f3e
Refactor CF instance certificate handling to support TLS
bonzofenix Dec 13, 2024
9180950
Fix lints
bonzofenix Dec 13, 2024
abb02a3
Remove redundant comments about policy and schedule synchronization i…
bonzofenix Dec 15, 2024
d6e41a1
Remove unused certificate pool setup code from config.go in autoscale…
bonzofenix Dec 15, 2024
ed74dbc
Remove CF_INSTANCE_CERT handling from VCAPConfiguration
bonzofenix Dec 15, 2024
71830fd
Remove redundant code and TODO comment in resetDefaultPolicy function
bonzofenix Dec 15, 2024
2bc780c
Remove auth helpers from golang API server package spec
bonzofenix Dec 15, 2024
67b868d
Reads CF_INSTANCE_CERT and KEY from filepath
bonzofenix Dec 16, 2024
bf9e05e
Adds initial implementation of TLSReloadTransport
bonzofenix Dec 18, 2024
f5b31e8
Reduce load of parsing the certificate on every request to check cert…
bonzofenix Dec 19, 2024
42081d6
Refactor
bonzofenix Dec 19, 2024
2b64c7f
Refactor TLSReloadTransport to use non-pointer time.Time for cert exp…
bonzofenix Dec 19, 2024
67c6261
Simplify test
bonzofenix Dec 19, 2024
8a9ee47
Merge branch 'main' into integrate-cf-api-with-eventgenerator
bonzofenix Dec 19, 2024
f16e181
🤖🦾🛠️ go mod tidy & make package-specs
bonzofenix Dec 19, 2024
bd3a125
Fix broken tests when tls config not defined in httpclinet
bonzofenix Dec 19, 2024
5afa278
Fix api/cmd test to support CF_INSTANCE_CERT
bonzofenix Dec 19, 2024
3ab4631
fix linters
bonzofenix Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/golangapiserver/spec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ files:
- autoscaler/db/sqldb/* # gosub
- autoscaler/healthendpoint/* # gosub
- autoscaler/helpers/* # gosub
- autoscaler/helpers/auth/* # gosub
- autoscaler/helpers/handlers/* # gosub
- autoscaler/metricsforwarder/server/common/* # gosub
- autoscaler/models/* # gosub
Expand Down
10 changes: 10 additions & 0 deletions src/autoscaler/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Config struct {
PlanCheck *PlanCheckConfig `yaml:"plan_check"`
CatalogPath string `yaml:"catalog_path"`
CatalogSchemaPath string `yaml:"catalog_schema_path"`
CfInstanceCert string `yaml:"cf_instance_cert"`
DashboardRedirectURI string `yaml:"dashboard_redirect_uri"`
PolicySchemaPath string `yaml:"policy_schema_path"`
Scheduler SchedulerConfig `yaml:"scheduler"`
Expand Down Expand Up @@ -209,9 +210,18 @@ func loadVcapConfig(conf *Config, vcapReader configutil.VCAPConfigurationReader)
if err := configureBindingDb(conf, vcapReader); err != nil {
return err
}

configureCfInstanceCert(conf, vcapReader)

return nil
}

func configureCfInstanceCert(conf *Config, vcapReader configutil.VCAPConfigurationReader) {
if cert, err := vcapReader.GetCfInstanceCert(); err == nil {
conf.CfInstanceCert = cert
}
}

func configurePolicyDb(conf *Config, vcapReader configutil.VCAPConfigurationReader) error {
currentPolicyDb, ok := conf.Db[db.PolicyDb]
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions src/autoscaler/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@ var _ = Describe("Config", func() {
conf, err = LoadConfig("", mockVCAPConfigurationReader)
})

When("vcap CF_INSTANCE_CERT is set", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetCfInstanceCertReturns("cert", nil)
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CfInstanceCert).To(Equal("cert"))
})
})

When("vcap CF_INSTANCE_CERT is not set", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetCfInstanceCertReturns("", fmt.Errorf("failed to get required credential from service"))
})

It("sets env variable over config file", func() {
Expect(err).NotTo(HaveOccurred())
Expect(conf.CfInstanceCert).To(Equal(""))
})
})

When("vcap PORT is set to a number", func() {
BeforeEach(func() {
mockVCAPConfigurationReader.GetPortReturns(3333)
Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (mw *Middleware) Oauth(next http.Handler) http.Handler {
if err != nil {
mw.logger.Error("failed to check if user is admin", err, nil)
handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{
Code: "Internal-Server-Error",
Code: http.StatusText(http.StatusInternalServerError),
Message: "Failed to check if user is admin"})
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/api/publicapiserver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var _ = Describe("Middleware", func() {
})
It("should fail with 500", func() {
CheckResponse(resp, http.StatusInternalServerError, models.ErrorResponse{
Code: "Internal-Server-Error",
Code: http.StatusText(http.StatusInternalServerError),
Message: "Failed to check if user is admin",
})
})
Expand Down
190 changes: 108 additions & 82 deletions src/autoscaler/api/publicapiserver/public_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/cred_helper"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/auth"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/models"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/routes"
"github.com/google/uuid"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/handlers"
"code.cloudfoundry.org/lager/v3"
"github.com/google/uuid"
)

type PublicApiHandler struct {
Expand Down Expand Up @@ -56,22 +56,26 @@ func NewPublicApiHandler(logger lager.Logger, conf *config.Config, policydb db.P
policydb: policydb,
bindingdb: bindingdb,
eventGeneratorClient: egClient,
policyValidator: policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
policyValidator: createPolicyValidator(conf),
schedulerUtil: schedulerclient.New(conf, logger),
credentials: credentials,
}
}

func createPolicyValidator(conf *config.Config) *policyvalidator.PolicyValidator {
return policyvalidator.NewPolicyValidator(
conf.PolicySchemaPath,
conf.ScalingRules.CPU.LowerThreshold,
conf.ScalingRules.CPU.UpperThreshold,
conf.ScalingRules.CPUUtil.LowerThreshold,
conf.ScalingRules.CPUUtil.UpperThreshold,
conf.ScalingRules.DiskUtil.LowerThreshold,
conf.ScalingRules.DiskUtil.UpperThreshold,
conf.ScalingRules.Disk.LowerThreshold,
conf.ScalingRules.Disk.UpperThreshold,
)
}

func writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
handlers.WriteJSONResponse(w, statusCode, models.ErrorResponse{
Code: http.StatusText(statusCode),
Expand All @@ -85,6 +89,7 @@ func (h *PublicApiHandler) GetScalingPolicy(w http.ResponseWriter, r *http.Reque
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("GetScalingPolicy", lager.Data{"appId": appId})
logger.Info("Get Scaling Policy")

Expand Down Expand Up @@ -148,16 +153,17 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}

policyGuid := uuid.NewString()
err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid)
if err != nil {
if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to save policy", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error saving policy")
return
}

h.logger.Info("creating/updating schedules", lager.Data{"policy": policy})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid)
if err != nil {

//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuid); err != nil {
logger.Error("Failed to create/update schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, err.Error())
return
bonzofenix marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -186,7 +192,7 @@ func (h *PublicApiHandler) AttachScalingPolicy(w http.ResponseWriter, r *http.Re
}
_, err = w.Write(responseJson)
if err != nil {
logger.Error("Failed to write body", err)
h.logger.Error("Failed to write body", err)
}
}

Expand All @@ -197,62 +203,29 @@ func (h *PublicApiHandler) DetachScalingPolicy(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, http.StatusBadRequest, ErrorMessageAppidIsRequired)
return
}

logger := h.logger.Session("DetachScalingPolicy", lager.Data{"appId": appId})
logger.Info("Deleting policy json", lager.Data{"appId": appId})
err := h.policydb.DeletePolicy(r.Context(), appId)
if err != nil {

if err := h.policydb.DeletePolicy(r.Context(), appId); err != nil {
logger.Error("Failed to delete policy from database", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting policy")
return
}

logger.Info("Deleting schedules")
err = h.schedulerUtil.DeleteSchedule(r.Context(), appId)
if err != nil {
if err := h.schedulerUtil.DeleteSchedule(r.Context(), appId); err != nil {
logger.Error("Failed to delete schedule", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error deleting schedules")
return
}

if h.bindingdb != nil && !reflect.ValueOf(h.bindingdb).IsNil() {
bonzofenix marked this conversation as resolved.
Show resolved Hide resolved
//TODO this is a copy of part of the attach ... this should use a common function.
// brokered offering: check if there's a default policy that could apply
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
if err := h.handleDefaultPolicy(w, r, logger, appId); err != nil {
return
}
if serviceInstance != nil && serviceInstance.DefaultPolicy != "" {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})
var policy *models.ScalingPolicy
err := json.Unmarshal([]byte(policyStr), &policy)
if err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return
}

err = h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr)
if err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return
}

logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
err = h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr)
//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
if err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
}
}
}
err = h.bindingdb.SetOrUpdateCustomMetricStrategy(r.Context(), appId, "", "delete")
if err != nil {
if err := h.bindingdb.SetOrUpdateCustomMetricStrategy(r.Context(), appId, "", "delete"); err != nil {
actionName := "failed to delete custom metric submission strategy in the database"
logger.Error(actionName, err)
writeErrorResponse(w, http.StatusInternalServerError, actionName)
Expand All @@ -261,17 +234,84 @@ func (h *PublicApiHandler) DetachScalingPolicy(w http.ResponseWriter, r *http.Re

// find via the app id the binding -> service instance
// default policy? then apply that

w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte("{}"))
_, err := w.Write([]byte("{}"))
if err != nil {
logger.Error(ActionWriteBody, err)
}
}

func proxyRequest(pathFn func() string, call func(url string) (*http.Response, error), w http.ResponseWriter, reqUrl *url.URL, parameters *url.Values, requestDescription string, logger lager.Logger) {
aUrl := pathFn()
resp, err := call(aUrl)
// TODO this is a copy of part of the attach ... this should use a common function.
bonzofenix marked this conversation as resolved.
Show resolved Hide resolved
// brokered offering: check if there's a default policy that could apply
func (h *PublicApiHandler) handleDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string) error {
serviceInstance, err := h.bindingdb.GetServiceInstanceByAppId(appId)
if err != nil {
logger.Error("Failed to find service instance for app", err)
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving service instance")
return errors.New("error retrieving service instance")
}

if serviceInstance != nil && serviceInstance.DefaultPolicy != "" {
return h.saveDefaultPolicy(w, r, logger, appId, serviceInstance)
}

return nil
}

func (h *PublicApiHandler) saveDefaultPolicy(w http.ResponseWriter, r *http.Request, logger lager.Logger, appId string, serviceInstance *models.ServiceInstance) error {
policyStr := serviceInstance.DefaultPolicy
policyGuidStr := serviceInstance.DefaultPolicyGuid
logger.Info("saving default policy json for app", lager.Data{"policy": policyStr})

var policy *models.ScalingPolicy
if err := json.Unmarshal([]byte(policyStr), &policy); err != nil {
h.logger.Error("default policy invalid", err, lager.Data{"appId": appId, "policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Default policy not valid")
return errors.New("default policy not valid")
}

if err := h.policydb.SaveAppPolicy(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to save policy", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, "Error attaching the default policy")
return errors.New("error attaching the default policy")
}

//while there is synchronization between policy and schedule, so creating schedule error does not break
//the whole creating binding process
logger.Info("creating/updating schedules", lager.Data{"policy": policyStr})
if err := h.schedulerUtil.CreateOrUpdateSchedule(r.Context(), appId, policy, policyGuidStr); err != nil {
logger.Error("failed to create/update schedules", err, lager.Data{"policy": policyStr})
writeErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update schedule:%s", err))
return errors.New("failed to update schedule")
}
bonzofenix marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func (h *PublicApiHandler) proxyRequest(logger lager.Logger, appId string, metricType string, w http.ResponseWriter, req *http.Request, parameters *url.Values, requestDescription string) {
reqUrl := req.URL
r := routes.NewRouter()
router := r.CreateEventGeneratorRoutes()
if router == nil {
panic("Failed to create event generator routes")
}

route := router.Get(routes.GetAggregatedMetricHistoriesRouteName)
path, err := route.URLPath("appid", appId, "metrictype", metricType)
if err != nil {
logger.Error("Failed to create path", err)
panic(err)
}

aUrl := h.conf.EventGenerator.EventGeneratorUrl + path.RequestURI() + "?" + parameters.Encode()
req, _ = http.NewRequest("GET", aUrl, nil)

if h.conf.CfInstanceCert != "" {
cert := auth.NewCert(h.conf.CfInstanceCert)
req.Header.Set("X-Forwarded-Client-Cert", cert.GetXFCCHeader())
}
bonzofenix marked this conversation as resolved.
Show resolved Hide resolved

resp, err := h.eventGeneratorClient.Do(req)
if err != nil {
logger.Error("Failed to retrieve "+requestDescription, err, lager.Data{"url": aUrl})
writeErrorResponse(w, http.StatusInternalServerError, "Error retrieving "+requestDescription)
Expand All @@ -291,6 +331,7 @@ func proxyRequest(pathFn func() string, call func(url string) (*http.Response, e
writeErrorResponse(w, resp.StatusCode, string(responseData))
return
}

paginatedResponse, err := paginateResource(responseData, parameters, reqUrl)
if err != nil {
handlers.WriteJSONResponse(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -318,22 +359,7 @@ func (h *PublicApiHandler) GetAggregatedMetricsHistories(w http.ResponseWriter,
return
}

pathFn := func() string {
r := routes.NewRouter()
router := r.CreateEventGeneratorRoutes()
if router == nil {
panic("Failed to create event generator routes")
}

route := router.Get(routes.GetAggregatedMetricHistoriesRouteName)
path, err := route.URLPath("appid", appId, "metrictype", metricType)
if err != nil {
logger.Error("Failed to create path", err)
panic(err)
}
return h.conf.EventGenerator.EventGeneratorUrl + path.RequestURI() + "?" + parameters.Encode()
}
proxyRequest(pathFn, h.eventGeneratorClient.Get, w, req.URL, parameters, "metrics history from eventgenerator", logger)
h.proxyRequest(logger, appId, metricType, w, req, parameters, "metrics history from eventgenerator")
}

func (h *PublicApiHandler) GetApiInfo(w http.ResponseWriter, _ *http.Request, _ map[string]string) {
Expand Down
Loading
Loading