Skip to content

Commit

Permalink
WIP: Sets scalingengine healthenpoint on server port
Browse files Browse the repository at this point in the history
  • Loading branch information
bonzofenix committed Jun 17, 2024
1 parent ebcef86 commit 2085fa2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 30 deletions.
21 changes: 0 additions & 21 deletions src/autoscaler/scalingengine/cmd/scalingengine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ import (
"flag"
"fmt"
"os"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/cf"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db/sqldb"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/config"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/schedule"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine/server"
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
"github.com/tedsuo/ifrit/sigmon"
Expand Down Expand Up @@ -79,33 +75,16 @@ func main() {
}
defer func() { _ = schedulerDB.Close() }()

httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine")
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDb),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB),
httpStatusCollector,
}, true, logger.Session("scalingengine-prometheus"))

scalingEngine := scalingengine.NewScalingEngine(logger, cfClient, policyDb, scalingEngineDB, eClock, conf.DefaultCoolDownSecs, conf.LockSize)
synchronizer := schedule.NewActiveScheduleSychronizer(logger, schedulerDB, scalingEngineDB, scalingEngine)

httpServer, err := server.NewServer(logger.Session("http-server"), conf, scalingEngineDB, scalingEngine, synchronizer, httpStatusCollector)
if err != nil {
logger.Error("failed to create http server", err)
os.Exit(1)
}

healthServer, err := healthendpoint.NewServerWithBasicAuth(conf.Health, []healthendpoint.Checker{}, logger.Session("health-server"), promRegistry, time.Now)
if err != nil {
logger.Error("failed to create health server", err)
os.Exit(1)
}

members := grouper.Members{
{"http_server", httpServer},
{"health_server", healthServer},
}

monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members)))
Expand Down
1 change: 1 addition & 0 deletions src/autoscaler/scalingengine/schedule/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/scalingengine"
)

// TODO: fix the typo in the interface name, it should be ActiveScheduleSynchronizer
type ActiveScheduleSychronizer interface {
Sync()
}
Expand Down
50 changes: 48 additions & 2 deletions src/autoscaler/scalingengine/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
Expand All @@ -12,6 +14,7 @@ import (

"code.cloudfoundry.org/lager/v3"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"

Expand All @@ -26,9 +29,38 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vh(w, r, vars)
}

func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.ScalingEngineDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
func createPrometheusRegistry(policyDb db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry {
promRegistry := prometheus.NewRegistry()
//validate that db are not nil
if policyDb == nil || scalingEngineDB == nil || schedulerDB == nil {
logger.Error("failed-to-create-prometheus-registry", fmt.Errorf("db is nil"))
return promRegistry
}

healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "policyDB", policyDb),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "scalingengineDB", scalingEngineDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "scalingengine", "schedulerDB", schedulerDB),
httpStatusCollector,
}, true, logger.Session("scalingengine-prometheus"))
return promRegistry
}

func createHealthRouter(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) {
checkers := []healthendpoint.Checker{}
gatherer := createPrometheusRegistry(nil, scalingEngineDB, nil, nil, logger)
healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}
return healthRouter, nil
}

func NewServer(logger lager.Logger, conf *config.Config, policyDb db.PolicyDB, scalingEngineDB db.ScalingEngineDB, schedulerDB db.SchedulerDB, scalingEngine scalingengine.ScalingEngine, synchronizer schedule.ActiveScheduleSychronizer) (ifrit.Runner, error) {
handler := NewScalingHandler(logger, scalingEngineDB, scalingEngine)
syncHandler := NewSyncHandler(logger, synchronizer)
httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "scalingengine")

httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector)
r := routes.ScalingEngineRoutes()
r.Use(otelmux.Middleware("scalingengine"))
Expand All @@ -48,7 +80,21 @@ func NewServer(logger lager.Logger, conf *config.Config, scalingEngineDB db.Scal

r.Get(routes.SyncActiveSchedulesRouteName).Handler(VarsFunc(syncHandler.Sync))

return helpers.NewHTTPServer(logger, conf.Server, r)
healthRouter, err := createHealthRouter(logger, conf, policyDb, scalingEngineDB, schedulerDB, httpStatusCollector)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

mainRouter := setupMainRouter(r, healthRouter)

return helpers.NewHTTPServer(logger, conf.Server, mainRouter)
}

func setupMainRouter(r *mux.Router, healthRouter *mux.Router) *mux.Router {
mainRouter := mux.NewRouter()
mainRouter.PathPrefix("/").Handler(r)
mainRouter.PathPrefix("/health").Handler(healthRouter)
return mainRouter
}

func newScalingHistoryHandler(logger lager.Logger, scalingEngineDB db.ScalingEngineDB) (http.Handler, error) {
Expand Down
14 changes: 7 additions & 7 deletions src/autoscaler/scalingengine/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
)

var (
server ifrit.Process
serverUrl string
scalingEngineDB *fakes.FakeScalingEngineDB
sychronizer *fakes.FakeActiveScheduleSychronizer
httpStatusCollector *fakes.FakeHTTPStatusCollector
server ifrit.Process
serverUrl string
scalingEngineDB *fakes.FakeScalingEngineDB
sychronizer *fakes.FakeActiveScheduleSychronizer
)

var _ = SynchronizedBeforeSuite(func() []byte {
Expand All @@ -39,10 +38,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
}
scalingEngineDB = &fakes.FakeScalingEngineDB{}
scalingEngine := &fakes.FakeScalingEngine{}
policyDb := &fakes.FakePolicyDB{}
schedulerDB := &fakes.FakeSchedulerDB{}
sychronizer = &fakes.FakeActiveScheduleSychronizer{}
httpStatusCollector = &fakes.FakeHTTPStatusCollector{}

httpServer, err := NewServer(lager.NewLogger("test"), conf, scalingEngineDB, scalingEngine, sychronizer, httpStatusCollector)
httpServer, err := NewServer(lager.NewLogger("test"), conf, policyDb, scalingEngineDB, schedulerDB, scalingEngine, sychronizer)
Expect(err).NotTo(HaveOccurred())
server = ginkgomon_v2.Invoke(httpServer)
serverUrl = fmt.Sprintf("http://127.0.0.1:%d", conf.Server.Port)
Expand Down

0 comments on commit 2085fa2

Please sign in to comment.