Skip to content

Commit

Permalink
Sets scalingengine healthenpoint on server port
Browse files Browse the repository at this point in the history
  • Loading branch information
bonzofenix committed Jun 18, 2024
1 parent cbaa084 commit bd90ac7
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 50 deletions.
21 changes: 1 addition & 20 deletions src/autoscaler/scalingengine/cmd/scalingengine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import (
"flag"
"fmt"
"os"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/cf"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db/sqldb"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/config"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/schedule"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/server"
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
"github.com/tedsuo/ifrit/sigmon"
Expand Down Expand Up @@ -79,33 +76,17 @@ func main() {
}
defer func() { _ = schedulerDB.Close() }()

httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine")
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDb),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB),
httpStatusCollector,
}, true, logger.Session("scalingengine-prometheus"))

scalingEngine := scalingengine.NewScalingEngine(logger, cfClient, policyDb, scalingEngineDB, eClock, conf.DefaultCoolDownSecs, conf.LockSize)
synchronizer := schedule.NewActiveScheduleSychronizer(logger, schedulerDB, scalingEngineDB, scalingEngine)

httpServer, err := server.NewServer(logger.Session("http-server"), conf, scalingEngineDB, scalingEngine, synchronizer, httpStatusCollector)
httpServer, err := server.NewServer(logger.Session("http-server"), conf, policyDb, scalingEngineDB, schedulerDB, scalingEngine, synchronizer)
if err != nil {
logger.Error("failed to create http server", err)
os.Exit(1)
}

healthServer, err := healthendpoint.NewServerWithBasicAuth(conf.Health, []healthendpoint.Checker{}, logger.Session("health-server"), promRegistry, time.Now)
if err != nil {
logger.Error("failed to create health server", err)
os.Exit(1)
}

members := grouper.Members{
{"http_server", httpServer},
{"health_server", healthServer},
}

monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ var _ = SynchronizedBeforeSuite(
_, err = testDB.Exec(testDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)"), appId, policy, "1234")
FailOnError("insert failed", err)

httpClient = NewEventGeneratorClient()
healthHttpClient = &http.Client{}
httpClient = NewScalingEngineClient()
})

func verifyCertExistence(testCertDir string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
var _ = Describe("Main", func() {

var (
runner *ScalingEngineRunner
runner *ScalingEngineRunner
serverURL string
)

BeforeEach(func() {
runner = NewScalingEngineRunner()
serverURL = fmt.Sprintf("https://127.0.0.1:%d", conf.Server.Port)
})

JustBeforeEach(func() {
Expand All @@ -49,10 +51,6 @@ var _ = Describe("Main", func() {
It("http server starts directly", func() {
Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.http-server.new-http-server"))
})

It("health server starts directly", func() {
Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.health-server.new-http-server"))
})
})

Context("when starting multiple scaling engine instances", func() {
Expand Down Expand Up @@ -160,7 +158,7 @@ var _ = Describe("Main", func() {
body, err := json.Marshal(models.Trigger{Adjustment: "+1"})
Expect(err).NotTo(HaveOccurred())

rsp, err := httpClient.Post(fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/scale", port, appId),
rsp, err := httpClient.Post(fmt.Sprintf("%s/v1/apps/%s/scale", serverURL, appId),
"application/json", bytes.NewReader(body))
Expect(err).NotTo(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
Expand All @@ -170,7 +168,7 @@ var _ = Describe("Main", func() {

Context("when a request to retrieve scaling history comes", func() {
It("returns with a 200", func() {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/scaling_histories", port, appId), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/v1/apps/%s/scaling_histories", serverURL, appId), nil)
Expect(err).NotTo(HaveOccurred())
req.Header.Set("Authorization", "Bearer none")
rsp, err := httpClient.Do(req)
Expand All @@ -182,7 +180,7 @@ var _ = Describe("Main", func() {

It("handles the start and end of a schedule", func() {
By("start of a schedule")
url := fmt.Sprintf("https://127.0.0.1:%d/v1/apps/%s/active_schedules/111111", port, appId)
url := fmt.Sprintf("%s/v1/apps/%s/active_schedules/111111", serverURL, appId)
bodyReader := bytes.NewReader([]byte(`{"instance_min_count":1, "instance_max_count":5, "initial_min_instance_count":3}`))

req, err := http.NewRequest(http.MethodPut, url, bodyReader)
Expand All @@ -205,7 +203,6 @@ var _ = Describe("Main", func() {
})

Describe("when Health server is ready to serve RESTful API", func() {

BeforeEach(func() {
basicAuthConfig := conf
basicAuthConfig.Health.HealthCheckUsername = ""
Expand All @@ -219,7 +216,7 @@ var _ = Describe("Main", func() {

Context("when a request to query health comes", func() {
It("returns with a 200", func() {
rsp, err := healthHttpClient.Get(fmt.Sprintf("http://127.0.0.1:%d", healthport))
rsp, err := httpClient.Get(fmt.Sprintf("%s", serverURL))
Expect(err).NotTo(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
raw, _ := io.ReadAll(rsp.Body)
Expand All @@ -243,13 +240,12 @@ var _ = Describe("Main", func() {

Context("when username and password are incorrect for basic authentication during health check", func() {
It("should return 401", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth("wrongusername", "wrongpassword")

rsp, err := healthHttpClient.Do(req)
rsp, err := httpClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized))
})
Expand All @@ -258,12 +254,12 @@ var _ = Describe("Main", func() {
Context("when username and password are correct for basic authentication during health check", func() {
It("should return 200", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword)

rsp, err := healthHttpClient.Do(req)
rsp, err := httpClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
})
Expand All @@ -278,12 +274,12 @@ var _ = Describe("Main", func() {
Context("when username and password are incorrect for basic authentication during health check", func() {
It("should return 401", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth("wrongusername", "wrongpassword")

rsp, err := healthHttpClient.Do(req)
rsp, err := httpClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized))
})
Expand All @@ -292,12 +288,12 @@ var _ = Describe("Main", func() {
Context("when username and password are correct for basic authentication during health check", func() {
It("should return 200", func() {

req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", healthport), nil)
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/health", serverURL), nil)
Expect(err).NotTo(HaveOccurred())

req.SetBasicAuth(conf.Health.HealthCheckUsername, conf.Health.HealthCheckPassword)

rsp, err := healthHttpClient.Do(req)
rsp, err := httpClient.Do(req)
Expect(err).ToNot(HaveOccurred())
Expect(rsp.StatusCode).To(Equal(http.StatusOK))
})
Expand Down
1 change: 1 addition & 0 deletions src/autoscaler/scalingengine/schedule/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine"
)

// TODO: fix the typo in the interface name, it should be ActiveScheduleSynchronizer
type ActiveScheduleSychronizer interface {
Sync()
}
Expand Down
52 changes: 50 additions & 2 deletions src/autoscaler/scalingengine/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
Expand All @@ -12,6 +14,7 @@ import (

"code.cloudfoundry.org/lager/v3"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"

Expand All @@ -26,9 +29,39 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vh(w, r, vars)
}

func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.ScalingEngineDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
func createPrometheusRegistry(policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry {
promRegistry := prometheus.NewRegistry()
//validate that db are not nil

if policyDB == nil || scalingEngineDB == nil || schedulerDB == nil {
logger.Error("failed-to-create-prometheus-registry", fmt.Errorf("db is nil: have policyDB: %t, have scalingEngineDB: %t, have schedulerDB: %t", policyDB != nil, scalingEngineDB != nil, schedulerDB != nil))
return promRegistry
}

healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB),
httpStatusCollector,
}, true, logger.Session("scalingengine-prometheus"))
return promRegistry
}

func createHealthRouter(logger lager.Logger, conf *config.Config, policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) {
checkers := []healthendpoint.Checker{}
gatherer := createPrometheusRegistry(policyDB, scalingEngineDB, schedulerDB, httpStatusCollector, logger)
healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}
return healthRouter, nil
}

func NewServer(logger lager.Logger, conf *config.Config, policyDB db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer) (ifrit.Runner, error) {
handler := NewScalingHandler(logger, scalingEngineDB, scalingEngine)
syncHandler := NewSyncHandler(logger, synchronizer)
httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine")

httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector)
r := routes.ScalingEngineRoutes()
r.Use(otelmux.Middleware("scalingengine"))
Expand All @@ -48,7 +81,22 @@ func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.Scal

r.Get(routes.SyncActiveSchedulesRouteName).Handler(VarsFunc(syncHandler.Sync))

return helpers.NewHTTPServer(logger, conf.Server, r)
healthRouter, err := createHealthRouter(logger, conf, policyDB, scalingEngineDB, schedulerDB, httpStatusCollector)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

mainRouter := setupMainRouter(r, healthRouter)

return helpers.NewHTTPServer(logger, conf.Server, mainRouter)
}

func setupMainRouter(r *mux.Router, healthRouter *mux.Router) *mux.Router {
mainRouter := mux.NewRouter()
mainRouter.PathPrefix("/v1").Handler(r)
mainRouter.PathPrefix("/health").Handler(healthRouter)
mainRouter.PathPrefix("/").Handler(healthRouter)
return mainRouter
}

func newScalingHistoryHandler(logger lager.Logger, scalingEngineDB db.ScalingEngineDB) (http.Handler, error) {
Expand Down
14 changes: 7 additions & 7 deletions src/autoscaler/scalingengine/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
)

var (
server ifrit.Process
serverUrl string
scalingEngineDB *fakes.FakeScalingEngineDB
sychronizer *fakes.FakeActiveScheduleSychronizer
httpStatusCollector *fakes.FakeHTTPStatusCollector
server ifrit.Process
serverUrl string
scalingEngineDB *fakes.FakeScalingEngineDB
sychronizer *fakes.FakeActiveScheduleSychronizer
)

var _ = SynchronizedBeforeSuite(func() []byte {
Expand All @@ -39,10 +38,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
}
scalingEngineDB = &fakes.FakeScalingEngineDB{}
scalingEngine := &fakes.FakeScalingEngine{}
policyDb := &fakes.FakePolicyDB{}
schedulerDB := &fakes.FakeSchedulerDB{}
sychronizer = &fakes.FakeActiveScheduleSychronizer{}
httpStatusCollector = &fakes.FakeHTTPStatusCollector{}

httpServer, err := NewServer(lager.NewLogger("test"), conf, scalingEngineDB, scalingEngine, sychronizer, httpStatusCollector)
httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDb, scalingEngineDB, schedulerDB, scalingEngine, sychronizer)
Expect(err).NotTo(HaveOccurred())
server = ginkgomon_v2.Invoke(httpServer)
serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port)
Expand Down
4 changes: 4 additions & 0 deletions src/autoscaler/testhelpers/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func NewSchedulerClient() *http.Client {
return CreateClientFor("scheduler")
}

func NewScalingEngineClient() *http.Client {
return CreateClientFor("scalingengine")
}

func CreateClientFor(name string) *http.Client {
certFolder := TestCertFolder()
return CreateClient(filepath.Join(certFolder, name+".crt"),
Expand Down

0 comments on commit bd90ac7

Please sign in to comment.