From bd34b1c0ae5cc23593c2497dbfec6d6715fcc3b3 Mon Sep 17 00:00:00 2001 From: Alan Moran Date: Thu, 14 Nov 2024 16:38:22 +0100 Subject: [PATCH] WIP --- Makefile | 3 + jobs/eventgenerator/spec | 12 + .../templates/eventgenerator.yml.erb | 6 + operations/use-cf-services.yml | 25 + packages/eventgenerator/spec | 1 + .../eventgenerator/eventgenerator_spec.rb | 17 + src/autoscaler/build-extension-file.sh | 5 + .../eventgenerator_suite_test.go | 32 +- .../cmd/eventgenerator/eventgenerator_test.go | 61 ++- .../eventgenerator/cmd/eventgenerator/main.go | 18 +- .../eventgenerator/config/config.go | 482 ++++++++++-------- .../eventgenerator/config/config_test.go | 8 + .../eventgenerator/server/server.go | 73 ++- .../eventgenerator/server/server_test.go | 124 +++-- src/autoscaler/routes/routes.go | 14 +- src/autoscaler/routes/routes_test.go | 10 +- .../scalingengine/scalingengine_suite_test.go | 20 +- .../cmd/scalingengine/scalingengine_test.go | 82 +-- .../scalingengine/server/server_test.go | 3 +- src/autoscaler/testhelpers/certs.go | 14 + 20 files changed, 604 insertions(+), 406 deletions(-) diff --git a/Makefile b/Makefile index eb874f95f5..0a29af1669 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,9 @@ clean-targets: clean-vendor: @echo " - cleaning vendored go" @find . -depth -name "vendor" -type d -exec rm -rf {} \; +clean-fakes: + @echo " - cleaning fakes" + @find . -depth -name "fakes" -type d -exec rm -rf {} \; clean-autoscaler: @make --directory='./src/autoscaler' clean clean-scheduler: diff --git a/jobs/eventgenerator/spec b/jobs/eventgenerator/spec index 49c9b92066..67d2b7f404 100644 --- a/jobs/eventgenerator/spec +++ b/jobs/eventgenerator/spec @@ -124,6 +124,18 @@ properties: autoscaler.eventgenerator.server_key: description: "PEM-encoded server key" + autoscaler.cf_server.port: + description: "the listening port of cf xfcc endpoint" + default: 8080 + + autoscaler.cf_server.xfcc.valid_org_guid: + description: approve org guid for xfcc endpoint + default: '' + + autoscaler.cf_server.xfcc.valid_space_guid: + description: approve space guid for xfcc endpoint + default: '' + autoscaler.eventgenerator.aggregator.aggregator_execute_interval: description: "the time interval to aggregate metrics data" default: 40s diff --git a/jobs/eventgenerator/templates/eventgenerator.yml.erb b/jobs/eventgenerator/templates/eventgenerator.yml.erb index 46bcae0ef5..ccbfb4fd0b 100644 --- a/jobs/eventgenerator/templates/eventgenerator.yml.erb +++ b/jobs/eventgenerator/templates/eventgenerator.yml.erb @@ -65,6 +65,12 @@ server: node_addrs: <%= nodeAddrs %> node_index: <%= nodeIndex %> +cf_server: + port: <%= p("autoscaler.cf_server.port") %> + xfcc: + valid_org_guid: <%= p("autoscaler.cf_server.xfcc.valid_org_guid") %> + valid_space_guid: <%= p("autoscaler.cf_server.xfcc.valid_space_guid") %> + logging: level: <%= p("autoscaler.eventgenerator.logging.level") %> http_client_timeout: <%= p("autoscaler.eventgenerator.http_client_timeout") %> diff --git a/operations/use-cf-services.yml b/operations/use-cf-services.yml index 0b8abebaa0..4f38f12413 100644 --- a/operations/use-cf-services.yml +++ b/operations/use-cf-services.yml @@ -77,3 +77,28 @@ uris: - ((deployment_name))-cf-scalingengine.((system_domain)) +## EVENTGENERATOR - Enable cf Server to receive calls from api running on cf -- + +- type: replace + path: /instance_groups/name=eventgenerator/jobs/name=eventgenerator/properties/autoscaler/eventgenerator/cf_server?/xfcc?/valid_org_guid? + value: ((autoscaler_cf_server_xfcc_valid_org_guid)) + +- type: replace + path: /instance_groups/name=eventgenerator/jobs/name=eventgenerator/properties/autoscaler/eventgenerator/cf_server?/xfcc?/valid_space_guid? + value: ((autoscaler_cf_server_xfcc_valid_space_guid)) + + +- type: replace + path: /instance_groups/name=eventgenerator/jobs/name=eventgenerator/properties/autoscaler/eventgenerator/cf_server?/port? + value: &EventGeneratorCfPort 6205 + +- type: replace + path: /instance_groups/name=postgres/jobs/name=route_registrar/properties/route_registrar/routes/- + value: + name: ((deployment_name))-cf-eventgenerator + registration_interval: 20s + port: *EventGeneratorCfPort + tags: + component: autoscaler_cf_eventgenerator + uris: + - ((deployment_name))-cf-eventgenerator.((system_domain)) diff --git a/packages/eventgenerator/spec b/packages/eventgenerator/spec index d9cfe88dc7..5ffb845de4 100644 --- a/packages/eventgenerator/spec +++ b/packages/eventgenerator/spec @@ -24,6 +24,7 @@ files: - autoscaler/eventgenerator/server/* # gosub - autoscaler/healthendpoint/* # gosub - autoscaler/helpers/* # gosub +- autoscaler/helpers/auth/* # gosub - autoscaler/helpers/handlers/* # gosub - autoscaler/metricsforwarder/server/common/* # gosub - autoscaler/models/* # gosub diff --git a/spec/jobs/eventgenerator/eventgenerator_spec.rb b/spec/jobs/eventgenerator/eventgenerator_spec.rb index 3fe85e2ff1..bca6a6c614 100644 --- a/spec/jobs/eventgenerator/eventgenerator_spec.rb +++ b/spec/jobs/eventgenerator/eventgenerator_spec.rb @@ -79,6 +79,23 @@ end end + context "cf server" do + it "includes default port for cf server" do + expect(rendered_template["cf_server"]["port"]).to eq(8080) + end + + it "defaults xfcc valid org and space " do + properties["autoscaler"]["cf_server"] = {} + properties["autoscaler"]["cf_server"]["xfcc"] = { + "valid_org_guid" => "some-valid-org-guid", + "valid_space_guid" => "some-valid-space-guid" + } + + expect(rendered_template["cf_server"]["xfcc"]["valid_org_guid"]).to eq(properties["autoscaler"]["cf_server"]["xfcc"]["valid_org_guid"]) + expect(rendered_template["cf_server"]["xfcc"]["valid_space_guid"]).to eq(properties["autoscaler"]["cf_server"]["xfcc"]["valid_space_guid"]) + end + end + context "uses tls" do context "policy_db" do it "includes the ca, cert and key in url when configured" do diff --git a/src/autoscaler/build-extension-file.sh b/src/autoscaler/build-extension-file.sh index 96a16bd082..fbbf35755b 100755 --- a/src/autoscaler/build-extension-file.sh +++ b/src/autoscaler/build-extension-file.sh @@ -20,7 +20,10 @@ export POSTGRES_EXTERNAL_PORT="${PR_NUMBER:-5432}" export METRICSFORWARDER_HOST="${METRICSFORWARDER_HOST:-"${DEPLOYMENT_NAME}-metricsforwarder"}" export METRICSFORWARDER_MTLS_HOST="${METRICSFORWARDER_MTLS_HOST:-"${DEPLOYMENT_NAME}-metricsforwarder-mtls"}" + export SCALINGENGINE_HOST="${SCALINGENGINE_HOST:-"${DEPLOYMENT_NAME}-cf-scalingengine"}" +export EVENTGENERATOR_HOST="${EVENTGENERATOR_HOST:-"${DEPLOYMENT_NAME}-cf-eventgenerator"}" + export PUBLICAPISERVER_HOST="${PUBLICAPISERVER_HOST:-"${DEPLOYMENT_NAME}"}" export SERVICEBROKER_HOST="${SERVICEBROKER_HOST:-"${DEPLOYMENT_NAME}servicebroker"}" @@ -116,4 +119,6 @@ resources: metrics_forwarder_mtls_url: ${METRICSFORWARDER_MTLS_HOST}.\${default-domain} scaling_engine: scaling_engine_url: ${SCALINGENGINE_HOST}.\${default-domain} + event_generator: + event_generator_url: ${EVENTGENERATOR_HOST}.\${default-domain} EOF diff --git a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go index 28bbd37062..7a43edbfd5 100644 --- a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go +++ b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go @@ -38,8 +38,6 @@ var ( regPath = regexp.MustCompile(`^/v1/apps/.*/scale$`) configFile *os.File conf config.Config - egPort int - healthport int httpClient *http.Client healthHttpClient *http.Client mockLogCache *testhelpers.MockLogCache @@ -240,22 +238,9 @@ func initHttpEndPoints() { func initConfig() { testCertDir := testhelpers.TestCertFolder() - egPort = 7000 + GinkgoParallelProcess() - healthport = 8000 + GinkgoParallelProcess() dbUrl := testhelpers.GetDbUrl() conf = config.Config{ - Logging: helpers.LoggingConfig{ - Level: "debug", - }, Server: config.ServerConfig{ - ServerConfig: helpers.ServerConfig{ - Port: egPort, - TLS: models.TLSCerts{ - KeyFile: filepath.Join(testCertDir, "eventgenerator.key"), - CertFile: filepath.Join(testCertDir, "eventgenerator.crt"), - CACertFile: filepath.Join(testCertDir, "autoscaler-ca.crt"), - }, - }, NodeAddrs: []string{"localhost"}, NodeIndex: 0, }, @@ -313,7 +298,7 @@ func initConfig() { HttpClientTimeout: 10 * time.Second, Health: helpers.HealthConfig{ ServerConfig: helpers.ServerConfig{ - Port: healthport, + Port: 8000 + GinkgoParallelProcess(), }, BasicAuth: models.BasicAuth{ Username: "healthcheckuser", @@ -321,6 +306,19 @@ func initConfig() { }, }, } + + conf.Health.ServerConfig.Port = 8000 + GinkgoParallelProcess() + + conf.CFServer.Port = 9000 + GinkgoParallelProcess() + conf.CFServer.XFCC.ValidOrgGuid = "org-guid" + conf.CFServer.XFCC.ValidSpaceGuid = "space-guid" + + // Configure the server to use the test certs + conf.Server.Port = 7000 + GinkgoParallelProcess() + conf.Server.TLS.KeyFile = filepath.Join(testCertDir, "eventgenerator.key") + conf.Server.TLS.CertFile = filepath.Join(testCertDir, "eventgenerator.crt") + conf.Server.TLS.CACertFile = filepath.Join(testCertDir, "autoscaler-ca.crt") + conf.Logging.Level = "debug" configFile = writeConfig(&conf) } @@ -361,7 +359,7 @@ func (eg *EventGeneratorRunner) Start() { Expect(err).NotTo(HaveOccurred()) if eg.startCheck != "" { - Eventually(egSession.Buffer, 2).Should(gbytes.Say(eg.startCheck)) + Eventually(egSession.Buffer, 6).Should(gbytes.Say(eg.startCheck)) } eg.Session = egSession diff --git a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go index 5cd5847861..8aec012cb4 100644 --- a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go +++ b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go @@ -10,7 +10,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/config" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/testhelpers" + . "code.cloudfoundry.org/app-autoscaler/src/autoscaler/testhelpers" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -25,8 +25,9 @@ var _ = Describe("Eventgenerator", func() { httpClientForEventGenerator *http.Client httpClientForHealth *http.Client - serverURL *url.URL - healthURL *url.URL + serverURL *url.URL + healthURL *url.URL + cfServerURL *url.URL err error ) @@ -34,25 +35,28 @@ var _ = Describe("Eventgenerator", func() { BeforeEach(func() { runner = NewEventGeneratorRunner() - httpClientForEventGenerator = testhelpers.NewEventGeneratorClient() + httpClientForEventGenerator = NewEventGeneratorClient() httpClientForHealth = &http.Client{} serverURL, err = url.Parse("https://127.0.0.1:" + strconv.Itoa(conf.Server.Port)) + Expect(err).ToNot(HaveOccurred()) + healthURL, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(conf.Health.ServerConfig.Port)) + Expect(err).ToNot(HaveOccurred()) + cfServerURL, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(conf.CFServer.Port)) Expect(err).ToNot(HaveOccurred()) }) + JustBeforeEach(func() { + runner.Start() + }) + AfterEach(func() { runner.KillWithFire() }) - Context("with a valid config file", func() { - BeforeEach(func() { - runner.Start() - }) - It("Starts successfully, retrives metrics and generates events", func() { Consistently(runner.Session).ShouldNot(Exit()) Eventually(func() bool { return mockLogCache.ReadRequestsCount() >= 1 }, 5*time.Second).Should(BeTrue()) @@ -64,7 +68,6 @@ var _ = Describe("Eventgenerator", func() { BeforeEach(func() { runner.startCheck = "" runner.configPath = "bogus" - runner.Start() }) It("fails with an error", func() { @@ -82,7 +85,6 @@ var _ = Describe("Eventgenerator", func() { // #nosec G306 err = os.WriteFile(runner.configPath, []byte("bogus"), os.ModePerm) Expect(err).NotTo(HaveOccurred()) - runner.Start() }) AfterEach(func() { @@ -116,7 +118,6 @@ var _ = Describe("Eventgenerator", func() { } configFile := writeConfig(conf) runner.configPath = configFile.Name() - runner.Start() }) AfterEach(func() { @@ -130,9 +131,6 @@ var _ = Describe("Eventgenerator", func() { }) When("an interrupt is sent", func() { - BeforeEach(func() { - runner.Start() - }) It("should stop", func() { runner.Session.Interrupt() @@ -144,7 +142,6 @@ var _ = Describe("Eventgenerator", func() { When("a request for aggregated metrics history comes", func() { BeforeEach(func() { serverURL.Path = "/v1/apps/an-app-id/aggregated_metric_histories/a-metric-type" - runner.Start() }) It("returns with a 200", func() { @@ -169,8 +166,6 @@ var _ = Describe("Eventgenerator", func() { basicAuthConfig.Health.BasicAuth.Password = "" runner.configPath = writeConfig(&basicAuthConfig).Name() - runner.Start() - }) When("a request to query health comes", func() { @@ -194,9 +189,6 @@ var _ = Describe("Eventgenerator", func() { }) When("Health server is ready to serve RESTful API with basic Auth", func() { - BeforeEach(func() { - runner.Start() - }) When("username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { @@ -226,9 +218,6 @@ var _ = Describe("Eventgenerator", func() { }) When("Health server is ready to serve RESTful API with basic Auth", func() { - BeforeEach(func() { - runner.Start() - }) When("username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { @@ -257,4 +246,28 @@ var _ = Describe("Eventgenerator", func() { }) }) }) + + When("running CF server", func() { + JustBeforeEach(func() { + //Eventually(runner.Session.Buffer, 3).Should(gbytes.Say("eventgenerator.started")) + }) + + When("running outside cf", func() { + It("/v1/liveness should return 200", func() { + cfServerURL.Path = "/v1/liveness" + + req, err := http.NewRequest(http.MethodGet, cfServerURL.String(), nil) + Expect(err).NotTo(HaveOccurred()) + + err = SetXFCCCertHeader(req, conf.CFServer.XFCC.ValidOrgGuid, conf.CFServer.XFCC.ValidSpaceGuid) + Expect(err).NotTo(HaveOccurred()) + + rsp, err := healthHttpClient.Do(req) + Expect(err).ToNot(HaveOccurred()) + + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) + + }) + }) + }) }) diff --git a/src/autoscaler/eventgenerator/cmd/eventgenerator/main.go b/src/autoscaler/eventgenerator/cmd/eventgenerator/main.go index 856ea3ae6f..ed557b90be 100644 --- a/src/autoscaler/eventgenerator/cmd/eventgenerator/main.go +++ b/src/autoscaler/eventgenerator/cmd/eventgenerator/main.go @@ -11,6 +11,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/server" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/healthendpoint" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/auth" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" "github.com/prometheus/client_golang/prometheus" circuit "github.com/rubyist/circuitbreaker" @@ -104,32 +105,39 @@ func main() { httpServer := server.NewServer(logger.Session("http_server"), conf, appMetricDB, policyDb, appManager.QueryAppMetrics, httpStatusCollector) - mtlsServer, err := httpServer.GetMtlsServer() + mtlsServer, err := httpServer.CreateMtlsServer() if err != nil { logger.Error("failed to create http server", err) os.Exit(1) } - healthServer, err := httpServer.GetHealthServer() + healthServer, err := httpServer.CreateHealthServer() if err != nil { logger.Error("failed to create health server", err) os.Exit(1) } + + xm := auth.NewXfccAuthMiddleware(logger, conf.CFServer.XFCC) + cfServer, err := httpServer.CreateCFServer(xm) + if err != nil { + logger.Error("failed to create cf server", err) + os.Exit(1) + } + members := grouper.Members{ {"eventGenerator", eventGenerator}, {"https_server", mtlsServer}, {"health_server", healthServer}, + {"cf_server", cfServer}, } - monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members))) + monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members))) logger.Info("started") - err = <-monitor.Wait() if err != nil { logger.Error("exited-with-failure", err) os.Exit(1) } - logger.Info("exited") } diff --git a/src/autoscaler/eventgenerator/config/config.go b/src/autoscaler/eventgenerator/config/config.go index a35b6691b5..89b52e887d 100644 --- a/src/autoscaler/eventgenerator/config/config.go +++ b/src/autoscaler/eventgenerator/config/config.go @@ -1,210 +1,272 @@ -package config - -import ( - "bytes" - "fmt" - "strings" - "time" - - "gopkg.in/yaml.v3" - - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" -) - -const ( - DefaultLoggingLevel string = "info" - DefaultServerPort int = 8080 - DefaultHealthServerPort int = 8081 - DefaultPolicyPollerInterval = 40 * time.Second - DefaultAggregatorExecuteInterval = 40 * time.Second - DefaultSaveInterval = 5 * time.Second - DefaultMetricPollerCount int = 20 - DefaultAppMonitorChannelSize int = 200 - DefaultAppMetricChannelSize int = 200 - DefaultEvaluationExecuteInterval = 40 * time.Second - DefaultEvaluatorCount int = 20 - DefaultTriggerArrayChannelSize int = 200 - DefaultBackOffInitialInterval = 5 * time.Minute - DefaultBackOffMaxInterval = 2 * time.Hour - DefaultBreakerConsecutiveFailureCount int64 = 3 - DefaultHttpClientTimeout = 5 * time.Second - DefaultMetricCacheSizePerApp = 100 -) - -type ServerConfig struct { - helpers.ServerConfig `yaml:",inline"` - NodeAddrs []string `yaml:"node_addrs"` - NodeIndex int `yaml:"node_index"` -} -type DBConfig struct { - PolicyDB db.DatabaseConfig `yaml:"policy_db"` - AppMetricDB db.DatabaseConfig `yaml:"app_metrics_db"` -} - -type AggregatorConfig struct { - MetricPollerCount int `yaml:"metric_poller_count"` - AppMonitorChannelSize int `yaml:"app_monitor_channel_size"` - AppMetricChannelSize int `yaml:"app_metric_channel_size"` - AggregatorExecuteInterval time.Duration `yaml:"aggregator_execute_interval"` - PolicyPollerInterval time.Duration `yaml:"policy_poller_interval"` - SaveInterval time.Duration `yaml:"save_interval"` - MetricCacheSizePerApp int `yaml:"metric_cache_size_per_app"` -} - -type EvaluatorConfig struct { - EvaluatorCount int `yaml:"evaluator_count"` - TriggerArrayChannelSize int `yaml:"trigger_array_channel_size"` - EvaluationManagerInterval time.Duration `yaml:"evaluation_manager_execute_interval"` -} - -type ScalingEngineConfig struct { - ScalingEngineURL string `yaml:"scaling_engine_url"` - TLSClientCerts models.TLSCerts `yaml:"tls"` -} - -type MetricCollectorConfig struct { - MetricCollectorURL string `yaml:"metric_collector_url"` - TLSClientCerts models.TLSCerts `yaml:"tls"` - UAACreds models.UAACreds `yaml:"uaa"` -} - -type CircuitBreakerConfig struct { - BackOffInitialInterval time.Duration `yaml:"back_off_initial_interval"` - BackOffMaxInterval time.Duration `yaml:"back_off_max_interval"` - ConsecutiveFailureCount int64 `yaml:"consecutive_failure_count"` -} -type Config struct { - Logging helpers.LoggingConfig `yaml:"logging"` - Server ServerConfig `yaml:"server"` - Health helpers.HealthConfig `yaml:"health"` - DB DBConfig `yaml:"db"` - Aggregator AggregatorConfig `yaml:"aggregator"` - Evaluator EvaluatorConfig `yaml:"evaluator"` - ScalingEngine ScalingEngineConfig `yaml:"scalingEngine"` - MetricCollector MetricCollectorConfig `yaml:"metricCollector"` - DefaultStatWindowSecs int `yaml:"defaultStatWindowSecs"` - DefaultBreachDurationSecs int `yaml:"defaultBreachDurationSecs"` - CircuitBreaker CircuitBreakerConfig `yaml:"circuitBreaker"` - HttpClientTimeout time.Duration `yaml:"http_client_timeout"` -} - -func LoadConfig(config []byte) (*Config, error) { - conf := &Config{ - Logging: helpers.LoggingConfig{ - Level: DefaultLoggingLevel, - }, - Server: ServerConfig{ - ServerConfig: helpers.ServerConfig{ - Port: DefaultServerPort, - }, - }, - Health: helpers.HealthConfig{ - ServerConfig: helpers.ServerConfig{ - Port: DefaultHealthServerPort, - }, - }, - Aggregator: AggregatorConfig{ - AggregatorExecuteInterval: DefaultAggregatorExecuteInterval, - PolicyPollerInterval: DefaultPolicyPollerInterval, - SaveInterval: DefaultSaveInterval, - MetricPollerCount: DefaultMetricPollerCount, - AppMonitorChannelSize: DefaultAppMonitorChannelSize, - AppMetricChannelSize: DefaultAppMetricChannelSize, - MetricCacheSizePerApp: DefaultMetricCacheSizePerApp, - }, - Evaluator: EvaluatorConfig{ - EvaluationManagerInterval: DefaultEvaluationExecuteInterval, - EvaluatorCount: DefaultEvaluatorCount, - TriggerArrayChannelSize: DefaultTriggerArrayChannelSize, - }, - HttpClientTimeout: DefaultHttpClientTimeout, - } - dec := yaml.NewDecoder(bytes.NewBuffer(config)) - dec.KnownFields(true) - err := dec.Decode(conf) - - if err != nil { - return nil, err - } - - conf.Logging.Level = strings.ToLower(conf.Logging.Level) - if conf.CircuitBreaker.ConsecutiveFailureCount == 0 { - conf.CircuitBreaker.ConsecutiveFailureCount = DefaultBreakerConsecutiveFailureCount - } - if conf.CircuitBreaker.BackOffInitialInterval == 0 { - conf.CircuitBreaker.BackOffInitialInterval = DefaultBackOffInitialInterval - } - if conf.CircuitBreaker.BackOffMaxInterval == 0 { - conf.CircuitBreaker.BackOffMaxInterval = DefaultBackOffMaxInterval - } - return conf, nil -} - -func (c *Config) Validate() error { - if c.DB.PolicyDB.URL == "" { - return fmt.Errorf("Configuration error: db.policy_db.url is empty") - } - if c.DB.AppMetricDB.URL == "" { - return fmt.Errorf("Configuration error: db.app_metrics_db.url is empty") - } - if c.ScalingEngine.ScalingEngineURL == "" { - return fmt.Errorf("Configuration error: scalingEngine.scaling_engine_url is empty") - } - if c.MetricCollector.MetricCollectorURL == "" { - return fmt.Errorf("Configuration error: metricCollector.metric_collector_url is empty") - } - if c.Aggregator.AggregatorExecuteInterval <= time.Duration(0) { - return fmt.Errorf("Configuration error: aggregator.aggregator_execute_interval is less-equal than 0") - } - if c.Aggregator.PolicyPollerInterval <= time.Duration(0) { - return fmt.Errorf("Configuration error: aggregator.policy_poller_interval is less-equal than 0") - } - if c.Aggregator.SaveInterval <= time.Duration(0) { - return fmt.Errorf("Configuration error: aggregator.save_interval is less-equal than 0") - } - if c.Aggregator.MetricPollerCount <= 0 { - return fmt.Errorf("Configuration error: aggregator.metric_poller_count is less-equal than 0") - } - if c.Aggregator.AppMonitorChannelSize <= 0 { - return fmt.Errorf("Configuration error: aggregator.app_monitor_channel_size is less-equal than 0") - } - if c.Aggregator.AppMetricChannelSize <= 0 { - return fmt.Errorf("Configuration error: aggregator.app_metric_channel_size is less-equal than 0") - } - - if c.Aggregator.MetricCacheSizePerApp <= 0 { - return fmt.Errorf("Configuration error: aggregator.metric_cache_size_per_app is less-equal than 0") - } - - if c.Evaluator.EvaluationManagerInterval <= time.Duration(0) { - return fmt.Errorf("Configuration error: evaluator.evaluation_manager_execute_interval is less-equal than 0") - } - if c.Evaluator.EvaluatorCount <= 0 { - return fmt.Errorf("Configuration error: evaluator.evaluator_count is less-equal than 0") - } - if c.Evaluator.TriggerArrayChannelSize <= 0 { - return fmt.Errorf("Configuration error: evaluator.trigger_array_channel_size is less-equal than 0") - } - if c.DefaultBreachDurationSecs < 60 || c.DefaultBreachDurationSecs > 3600 { - return fmt.Errorf("Configuration error: defaultBreachDurationSecs should be between 60 and 3600") - } - if c.DefaultStatWindowSecs < 60 || c.DefaultStatWindowSecs > 3600 { - return fmt.Errorf("Configuration error: defaultStatWindowSecs should be between 60 and 3600") - } - - if (c.Server.NodeIndex >= len(c.Server.NodeAddrs)) || (c.Server.NodeIndex < 0) { - return fmt.Errorf("Configuration error: server.node_index out of range") - } - - if c.HttpClientTimeout <= time.Duration(0) { - return fmt.Errorf("Configuration error: http_client_timeout is less-equal than 0") - } - - if err := c.Health.Validate(); err != nil { - return err - } - - return nil -} + + package config + + import ( + "bytes" + "fmt" + "strings" + "time" + + "gopkg.in/yaml.v3" + + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" + ) + + const ( + DefaultLoggingLevel = "info" + DefaultServerPort = 8080 + DefaultHealthServerPort = 8081 + DefaultPolicyPollerInterval = 40 * time.Second + DefaultAggregatorExecuteInterval = 40 * time.Second + DefaultSaveInterval = 5 * time.Second + DefaultMetricPollerCount = 20 + DefaultAppMonitorChannelSize = 200 + DefaultAppMetricChannelSize = 200 + DefaultEvaluationExecuteInterval = 40 * time.Second + DefaultEvaluatorCount = 20 + DefaultTriggerArrayChannelSize = 200 + DefaultBackOffInitialInterval = 5 * time.Minute + DefaultBackOffMaxInterval = 2 * time.Hour + DefaultBreakerConsecutiveFailureCount = 3 + DefaultHttpClientTimeout = 5 * time.Second + DefaultMetricCacheSizePerApp = 100 + ) + + var defaultCFServerConfig = helpers.ServerConfig{ + Port: 8082, + } + + type ServerConfig struct { + helpers.ServerConfig `yaml:",inline"` + NodeAddrs []string `yaml:"node_addrs"` + NodeIndex int `yaml:"node_index"` + } + + type DBConfig struct { + PolicyDB db.DatabaseConfig `yaml:"policy_db"` + AppMetricDB db.DatabaseConfig `yaml:"app_metrics_db"` + } + + type AggregatorConfig struct { + MetricPollerCount int `yaml:"metric_poller_count"` + AppMonitorChannelSize int `yaml:"app_monitor_channel_size"` + AppMetricChannelSize int `yaml:"app_metric_channel_size"` + AggregatorExecuteInterval time.Duration `yaml:"aggregator_execute_interval"` + PolicyPollerInterval time.Duration `yaml:"policy_poller_interval"` + SaveInterval time.Duration `yaml:"save_interval"` + MetricCacheSizePerApp int `yaml:"metric_cache_size_per_app"` + } + + type EvaluatorConfig struct { + EvaluatorCount int `yaml:"evaluator_count"` + TriggerArrayChannelSize int `yaml:"trigger_array_channel_size"` + EvaluationManagerInterval time.Duration `yaml:"evaluation_manager_execute_interval"` + } + + type ScalingEngineConfig struct { + ScalingEngineURL string `yaml:"scaling_engine_url"` + TLSClientCerts models.TLSCerts `yaml:"tls"` + } + + type MetricCollectorConfig struct { + MetricCollectorURL string `yaml:"metric_collector_url"` + TLSClientCerts models.TLSCerts `yaml:"tls"` + UAACreds models.UAACreds `yaml:"uaa"` + } + + type CircuitBreakerConfig struct { + BackOffInitialInterval time.Duration `yaml:"back_off_initial_interval"` + BackOffMaxInterval time.Duration `yaml:"back_off_max_interval"` + ConsecutiveFailureCount int64 `yaml:"consecutive_failure_count"` + } + + type Config struct { + Logging helpers.LoggingConfig `yaml:"logging"` + Server ServerConfig `yaml:"server"` + CFServer helpers.ServerConfig `yaml:"cf_server"` + Health helpers.HealthConfig `yaml:"health"` + DB DBConfig `yaml:"db"` + Aggregator AggregatorConfig `yaml:"aggregator"` + Evaluator EvaluatorConfig `yaml:"evaluator"` + ScalingEngine ScalingEngineConfig `yaml:"scalingEngine"` + MetricCollector MetricCollectorConfig `yaml:"metricCollector"` + DefaultStatWindowSecs int `yaml:"defaultStatWindowSecs"` + DefaultBreachDurationSecs int `yaml:"defaultBreachDurationSecs"` + CircuitBreaker CircuitBreakerConfig `yaml:"circuitBreaker"` + HttpClientTimeout time.Duration `yaml:"http_client_timeout"` + } + + func LoadConfig(config []byte) (*Config, error) { + conf := defaultConfig() + dec := yaml.NewDecoder(bytes.NewBuffer(config)) + dec.KnownFields(true) + if err := dec.Decode(conf); err != nil { + return nil, err + } + setDefaults(conf) + return conf, nil + } + + func defaultConfig() *Config { + return &Config{ + Logging: helpers.LoggingConfig{ + Level: DefaultLoggingLevel, + }, + Server: ServerConfig{ + ServerConfig: helpers.ServerConfig{ + Port: DefaultServerPort, + }, + }, + CFServer: defaultCFServerConfig, + Health: helpers.HealthConfig{ + ServerConfig: helpers.ServerConfig{ + Port: DefaultHealthServerPort, + }, + }, + Aggregator: AggregatorConfig{ + AggregatorExecuteInterval: DefaultAggregatorExecuteInterval, + PolicyPollerInterval: DefaultPolicyPollerInterval, + SaveInterval: DefaultSaveInterval, + MetricPollerCount: DefaultMetricPollerCount, + AppMonitorChannelSize: DefaultAppMonitorChannelSize, + AppMetricChannelSize: DefaultAppMetricChannelSize, + MetricCacheSizePerApp: DefaultMetricCacheSizePerApp, + }, + Evaluator: EvaluatorConfig{ + EvaluationManagerInterval: DefaultEvaluationExecuteInterval, + EvaluatorCount: DefaultEvaluatorCount, + TriggerArrayChannelSize: DefaultTriggerArrayChannelSize, + }, + HttpClientTimeout: DefaultHttpClientTimeout, + } + } + + func setDefaults(conf *Config) { + conf.Logging.Level = strings.ToLower(conf.Logging.Level) + if conf.CircuitBreaker.ConsecutiveFailureCount == 0 { + conf.CircuitBreaker.ConsecutiveFailureCount = DefaultBreakerConsecutiveFailureCount + } + if conf.CircuitBreaker.BackOffInitialInterval == 0 { + conf.CircuitBreaker.BackOffInitialInterval = DefaultBackOffInitialInterval + } + if conf.CircuitBreaker.BackOffMaxInterval == 0 { + conf.CircuitBreaker.BackOffMaxInterval = DefaultBackOffMaxInterval + } + } + + func (c *Config) Validate() error { + if err := c.validateDB(); err != nil { + return err + } + if err := c.validateScalingEngine(); err != nil { + return err + } + if err := c.validateMetricCollector(); err != nil { + return err + } + if err := c.validateAggregator(); err != nil { + return err + } + if err := c.validateEvaluator(); err != nil { + return err + } + if err := c.validateDefaults(); err != nil { + return err + } + if err := c.validateServer(); err != nil { + return err + } + if err := c.validateHealth(); err != nil { + return err + } + return nil + } + + func (c *Config) validateDB() error { + if c.DB.PolicyDB.URL == "" { + return fmt.Errorf("Configuration error: db.policy_db.url is empty") + } + if c.DB.AppMetricDB.URL == "" { + return fmt.Errorf("Configuration error: db.app_metrics_db.url is empty") + } + return nil + } + + func (c *Config) validateScalingEngine() error { + if c.ScalingEngine.ScalingEngineURL == "" { + return fmt.Errorf("Configuration error: scalingEngine.scaling_engine_url is empty") + } + return nil + } + + func (c *Config) validateMetricCollector() error { + if c.MetricCollector.MetricCollectorURL == "" { + return fmt.Errorf("Configuration error: metricCollector.metric_collector_url is empty") + } + return nil + } + + func (c *Config) validateAggregator() error { + if c.Aggregator.AggregatorExecuteInterval <= 0 { + return fmt.Errorf("Configuration error: aggregator.aggregator_execute_interval is less-equal than 0") + } + if c.Aggregator.PolicyPollerInterval <= 0 { + return fmt.Errorf("Configuration error: aggregator.policy_poller_interval is less-equal than 0") + } + if c.Aggregator.SaveInterval <= 0 { + return fmt.Errorf("Configuration error: aggregator.save_interval is less-equal than 0") + } + if c.Aggregator.MetricPollerCount <= 0 { + return fmt.Errorf("Configuration error: aggregator.metric_poller_count is less-equal than 0") + } + if c.Aggregator.AppMonitorChannelSize <= 0 { + return fmt.Errorf("Configuration error: aggregator.app_monitor_channel_size is less-equal than 0") + } + if c.Aggregator.AppMetricChannelSize <= 0 { + return fmt.Errorf("Configuration error: aggregator.app_metric_channel_size is less-equal than 0") + } + if c.Aggregator.MetricCacheSizePerApp <= 0 { + return fmt.Errorf("Configuration error: aggregator.metric_cache_size_per_app is less-equal than 0") + } + return nil + } + + func (c *Config) validateEvaluator() error { + if c.Evaluator.EvaluationManagerInterval <= 0 { + return fmt.Errorf("Configuration error: evaluator.evaluation_manager_execute_interval is less-equal than 0") + } + if c.Evaluator.EvaluatorCount <= 0 { + return fmt.Errorf("Configuration error: evaluator.evaluator_count is less-equal than 0") + } + if c.Evaluator.TriggerArrayChannelSize <= 0 { + return fmt.Errorf("Configuration error: evaluator.trigger_array_channel_size is less-equal than 0") + } + return nil + } + + func (c *Config) validateDefaults() error { + if c.DefaultBreachDurationSecs < 60 || c.DefaultBreachDurationSecs > 3600 { + return fmt.Errorf("Configuration error: defaultBreachDurationSecs should be between 60 and 3600") + } + if c.DefaultStatWindowSecs < 60 || c.DefaultStatWindowSecs > 3600 { + return fmt.Errorf("Configuration error: defaultStatWindowSecs should be between 60 and 3600") + } + if c.HttpClientTimeout <= 0 { + return fmt.Errorf("Configuration error: http_client_timeout is less-equal than 0") + } + return nil + } + + func (c *Config) validateServer() error { + if c.Server.NodeIndex < 0 || c.Server.NodeIndex >= len(c.Server.NodeAddrs) { + return fmt.Errorf("Configuration error: server.node_index out of range") + } + return nil + } + + func (c *Config) validateHealth() error { + return c.Health.Validate() + } + \ No newline at end of file diff --git a/src/autoscaler/eventgenerator/config/config_test.go b/src/autoscaler/eventgenerator/config/config_test.go index 192143cf4c..3c53b9799c 100644 --- a/src/autoscaler/eventgenerator/config/config_test.go +++ b/src/autoscaler/eventgenerator/config/config_test.go @@ -41,6 +41,8 @@ server: ca_file: /var/vcap/jobs/autoscaler/config/certs/ca.crt node_addrs: [address1, address2] node_index: 1 +cf_server: + port: 9082 health: server_config: port: 9999 @@ -105,6 +107,9 @@ circuitBreaker: NodeAddrs: []string{"address1", "address2"}, NodeIndex: 1, }, + CFServer: helpers.ServerConfig{ + Port: 9082, + }, Health: helpers.HealthConfig{ ServerConfig: helpers.ServerConfig{ Port: 9999, @@ -234,6 +239,9 @@ defaultBreachDurationSecs: 600 TLS: models.TLSCerts{}, }, }, + CFServer: helpers.ServerConfig{ + Port: 8082, + }, Health: helpers.HealthConfig{ ServerConfig: helpers.ServerConfig{ Port: 8081, diff --git a/src/autoscaler/eventgenerator/server/server.go b/src/autoscaler/eventgenerator/server/server.go index a6a4ccaf97..ff76684c8f 100644 --- a/src/autoscaler/eventgenerator/server/server.go +++ b/src/autoscaler/eventgenerator/server/server.go @@ -8,6 +8,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" + "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers/auth" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/config" @@ -20,18 +21,31 @@ import ( "github.com/tedsuo/ifrit" ) +type EventgeneratorServer interface { + CreateMtlsServer() (ifrit.Runner, error) +} + type VarsFunc func(w http.ResponseWriter, r *http.Request, vars map[string]string) func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) vh(w, r, vars) } -func createEventGeneratorRouter(logger lager.Logger, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector, serverConfig config.ServerConfig) (*mux.Router, error) { - httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector) - eh := NewEventGenHandler(logger, queryAppMetric) - r := routes.EventGeneratorRoutes() + +func Liveness(w http.ResponseWriter, r *http.Request, vars map[string]string) { + w.WriteHeader(http.StatusOK) +} + +func (s *Server) createEventGeneratorRouter() (*mux.Router, error) { + httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(s.httpStatusCollector) + eh := NewEventGenHandler(s.logger, s.queryAppMetric) + autoscalerRouter := routes.NewRouter() + + r := autoscalerRouter.CreateEventGeneratorRoutes() r.Use(otelmux.Middleware("eventgenerator")) r.Use(httpStatusCollectMiddleware.Collect) + + r.Get(routes.LivenessRouteName).Handler(VarsFunc(Liveness)) r.Get(routes.GetAggregatedMetricHistoriesRouteName).Handler(VarsFunc(eh.GetAggregatedMetricHistories)) return r, nil } @@ -43,15 +57,8 @@ type Server struct { policyDb db.PolicyDB queryAppMetric aggregator.QueryAppMetricsFunc httpStatusCollector healthendpoint.HTTPStatusCollector -} -func (s *Server) GetMtlsServer() (ifrit.Runner, error) { - eventGeneratorRouter, err := createEventGeneratorRouter(s.logger, s.queryAppMetric, s.httpStatusCollector, s.conf.Server) - if err != nil { - return nil, fmt.Errorf("failed to create event generator router: %w", err) - } - - return helpers.NewHTTPServer(s.logger, serverConfigFrom(s.conf), eventGeneratorRouter) + healthRouter *mux.Router } func NewServer(logger lager.Logger, conf *config.Config, appMetricDB db.AppMetricDB, policyDb db.PolicyDB, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector) *Server { @@ -72,26 +79,52 @@ func serverConfigFrom(conf *config.Config) helpers.ServerConfig { } } -func (s *Server) GetHealthServer() (ifrit.Runner, error) { - healthRouter, err := createHealthRouter(s.appMetricDB, s.policyDb, s.logger, s.conf, s.httpStatusCollector) +func (s *Server) CreateHealthServer() (ifrit.Runner, error) { + err := s.createHealthRouter() if err != nil { return nil, fmt.Errorf("failed to create health router: %w", err) } - return helpers.NewHTTPServer(s.logger, s.conf.Health.ServerConfig, healthRouter) + return helpers.NewHTTPServer(s.logger, s.conf.Health.ServerConfig, s.healthRouter) } -func createHealthRouter(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, logger lager.Logger, conf *config.Config, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) { +func (s *Server) createHealthRouter() error { checkers := []healthendpoint.Checker{} - gatherer := CreatePrometheusRegistry(appMetricDB, policyDb, httpStatusCollector, logger) - healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now) + gatherer := createPrometheusRegistry(s.appMetricDB, s.policyDb, s.httpStatusCollector, s.logger) + healthRouter, err := healthendpoint.NewHealthRouter(s.conf.Health, checkers, s.logger.Session("health-server"), gatherer, time.Now) if err != nil { + return fmt.Errorf("failed to create health router: %w", err) + } + + s.healthRouter = healthRouter + return nil +} + +func (s *Server) CreateCFServer(am auth.XFCCAuthMiddleware) (ifrit.Runner, error) { + eventgenerator, err := s.createEventGeneratorRouter() + if err != nil { + return nil, fmt.Errorf("failed to create event generator router: %w", err) + } + + eventgenerator.Use(am.XFCCAuthenticationMiddleware) + if err := s.createHealthRouter(); err != nil { return nil, fmt.Errorf("failed to create health router: %w", err) } - return healthRouter, nil + eventgenerator.PathPrefix("/health").Handler(s.healthRouter) + + return helpers.NewHTTPServer(s.logger, s.conf.CFServer, eventgenerator) +} + +func (s *Server) CreateMtlsServer() (ifrit.Runner, error) { + eventGeneratorRouter, err := s.createEventGeneratorRouter() + if err != nil { + return nil, fmt.Errorf("failed to create event generator router: %w", err) + } + + return helpers.NewHTTPServer(s.logger, serverConfigFrom(s.conf), eventGeneratorRouter) } -func CreatePrometheusRegistry(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry { +func createPrometheusRegistry(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry { promRegistry := prometheus.NewRegistry() healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{ healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "appMetricDB", appMetricDB), diff --git a/src/autoscaler/eventgenerator/server/server_test.go b/src/autoscaler/eventgenerator/server/server_test.go index 391f45de7f..ddc3ba2198 100644 --- a/src/autoscaler/eventgenerator/server/server_test.go +++ b/src/autoscaler/eventgenerator/server/server_test.go @@ -4,11 +4,12 @@ import ( "net/http" "net/url" "strconv" + "strings" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/db" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/config" - "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/server" + . "code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/server" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/fakes" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" @@ -21,30 +22,35 @@ import ( var _ = Describe("Server", func() { var ( + serverUrl *url.URL + server *Server + serverProcess ifrit.Process + + conf *config.Config + rsp *http.Response err error - serverProcess ifrit.Process - serverUrl *url.URL policyDB *fakes.FakePolicyDB httpStatusCollector *fakes.FakeHTTPStatusCollector + xfccAuthMiddleware *fakes.FakeXFCCAuthMiddleware appMetricDB *fakes.FakeAppMetricDB - conf *config.Config queryAppMetrics aggregator.QueryAppMetricsFunc ) BeforeEach(func() { - port := 1111 + GinkgoParallelProcess() conf = &config.Config{ Server: config.ServerConfig{ ServerConfig: helpers.ServerConfig{ - Port: port, + Port: 1111 + GinkgoParallelProcess(), }, }, + CFServer: helpers.ServerConfig{ + Port: 3333 + GinkgoParallelProcess(), + }, } - serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(port)) - Expect(err).ToNot(HaveOccurred()) + xfccAuthMiddleware = &fakes.FakeXFCCAuthMiddleware{} queryAppMetrics = func(appID string, metricType string, start int64, end int64, orderType db.OrderType) ([]*models.AppMetric, error) { return nil, nil @@ -54,46 +60,103 @@ var _ = Describe("Server", func() { policyDB = &fakes.FakePolicyDB{} appMetricDB = &fakes.FakeAppMetricDB{} + server = NewServer(lager.NewLogger("test"), conf, appMetricDB, policyDB, queryAppMetrics, httpStatusCollector) }) AfterEach(func() { ginkgomon_v2.Interrupt(serverProcess) }) - JustBeforeEach(func() { - httpServer, err := server.NewServer(lager.NewLogger("test"), conf, appMetricDB, policyDB, queryAppMetrics, httpStatusCollector).GetMtlsServer() - Expect(err).NotTo(HaveOccurred()) - serverProcess = ginkgomon_v2.Invoke(httpServer) - }) + Describe("#CreateMTLSServer", func() { + Describe("request on /v1/apps/an-app-id/aggregated_metric_histories/a-metric-type", func() { + BeforeEach(func() { + httpServer, err := server.CreateMtlsServer() + Expect(err).NotTo(HaveOccurred()) - Describe("request on /v1/apps/an-app-id/aggregated_metric_histories/a-metric-type", func() { - BeforeEach(func() { - serverUrl.Path = "/v1/apps/an-app-id/aggregated_metric_histories/a-metric-type" - }) + serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(conf.Server.ServerConfig.Port)) + Expect(err).ToNot(HaveOccurred()) - JustBeforeEach(func() { - rsp, err = http.Get(serverUrl.String()) - }) + serverProcess = ginkgomon_v2.Invoke(httpServer) + + serverUrl.Path = "/v1/apps/an-app-id/aggregated_metric_histories/a-metric-type" + }) - It("should return 200", func() { - Expect(err).ToNot(HaveOccurred()) - Expect(rsp.StatusCode).To(Equal(http.StatusOK)) - rsp.Body.Close() - }) - When("using wrong method to retrieve aggregared metrics history", func() { JustBeforeEach(func() { - rsp, err = http.Post(serverUrl.String(), "garbage", nil) + rsp, err = http.Get(serverUrl.String()) }) - It("should return 405", func() { + It("should return 200", func() { Expect(err).ToNot(HaveOccurred()) - Expect(rsp.StatusCode).To(Equal(http.StatusMethodNotAllowed)) + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) rsp.Body.Close() }) + + When("using wrong method to retrieve aggregared metrics history", func() { + JustBeforeEach(func() { + rsp, err = http.Post(serverUrl.String(), "garbage", nil) + }) + + It("should return 405", func() { + Expect(err).ToNot(HaveOccurred()) + Expect(rsp.StatusCode).To(Equal(http.StatusMethodNotAllowed)) + rsp.Body.Close() + }) + }) }) }) - When("requesting the wrong path", func() { + Describe("#CreateCFServer", func() { + BeforeEach(func() { + xfccAuthMiddleware.XFCCAuthenticationMiddlewareReturns(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.RequestURI, "invalid-guid") { + w.WriteHeader(http.StatusUnauthorized) + } else { + w.WriteHeader(http.StatusOK) + } + })) + httpServer, err := server.CreateCFServer(xfccAuthMiddleware) + Expect(err).NotTo(HaveOccurred()) + serverProcess = ginkgomon_v2.Invoke(httpServer) + serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(conf.CFServer.Port)) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("GET /v1/apps/{GUID}/aggregated_metric_histories/a-metric-type", func() { + Describe("when XFCC authentication is ok", func() { + BeforeEach(func() { + serverUrl.Path = "/v1/apps/valid-guid/aggregated_metric_histories/a-metric-type" + }) + + JustBeforeEach(func() { + rsp, err = http.Get(serverUrl.String()) + }) + + It("should return 200", func() { + Expect(err).ToNot(HaveOccurred()) + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) + rsp.Body.Close() + }) + }) + + Describe("when XFCC authentication fails", func() { + BeforeEach(func() { + serverUrl.Path = "/v1/apps/invalid-guid/aggregated_metric_histories/a-metric-type" + }) + + JustBeforeEach(func() { + rsp, err = http.Get(serverUrl.String()) + }) + + It("should return 401", func() { + Expect(err).ToNot(HaveOccurred()) + Expect(rsp.StatusCode).To(Equal(http.StatusUnauthorized)) + rsp.Body.Close() + }) + }) + }) + }) + + XWhen("requesting the wrong path", func() { BeforeEach(func() { serverUrl.Path = "/not-exist-path" }) @@ -109,4 +172,5 @@ var _ = Describe("Server", func() { }) }) + }) diff --git a/src/autoscaler/routes/routes.go b/src/autoscaler/routes/routes.go index 13a2f484a9..72b78b329a 100644 --- a/src/autoscaler/routes/routes.go +++ b/src/autoscaler/routes/routes.go @@ -10,7 +10,7 @@ const ( MetricHistoriesPath = "/v1/apps/{appid}/metric_histories/{metrictype}" GetMetricHistoriesRouteName = "GetMetricHistories" - AggregatedMetricHistoriesPath = "/v1/apps/{appid}/aggregated_metric_histories/{metrictype}" + AggregatedMetricHistoriesPath = "/v1/apps/{appId}/aggregated_metric_histories/{metrictype}" GetAggregatedMetricHistoriesRouteName = "GetAggregatedMetricHistories" ScalePath = "/v1/apps/{appid}/scale" @@ -73,10 +73,10 @@ func NewRouter() *Router { func (r *Router) RegisterRoutes() { r.registerMetricsCollectorRoutes() - r.registerEventGeneratorRoutes() r.registerMetricsForwarderRoutes() r.registerSchedulerRoutes() + r.CreateEventGeneratorRoutes() r.CreateScalingEngineRoutes() r.CreateApiPublicSubrouter() r.CreateApiSubrouter() @@ -99,10 +99,6 @@ func (r *Router) registerMetricsCollectorRoutes() { r.router.Path(MetricHistoriesPath).Methods(http.MethodGet).Name(GetMetricHistoriesRouteName) } -func (r *Router) registerEventGeneratorRoutes() { - r.router.Path(AggregatedMetricHistoriesPath).Methods(http.MethodGet).Name(GetAggregatedMetricHistoriesRouteName) -} - func (r *Router) registerMetricsForwarderRoutes() { r.router.Path(CustomMetricsPath).Methods(http.MethodPost).Name(PostCustomMetricsRouteName) } @@ -149,8 +145,10 @@ func MetricsCollectorRoutes() *mux.Router { return autoScalerRouteInstance.GetRouter() } -func EventGeneratorRoutes() *mux.Router { - return autoScalerRouteInstance.GetRouter() +func (r *Router) CreateEventGeneratorRoutes() *mux.Router { + r.router.Path(AggregatedMetricHistoriesPath).Methods(http.MethodGet).Name(GetAggregatedMetricHistoriesRouteName) + r.router.Path(LivenessPath).Methods(http.MethodGet).Name(LivenessRouteName) + return r.router } func ScalingEngineRoutes() *mux.Router { diff --git a/src/autoscaler/routes/routes_test.go b/src/autoscaler/routes/routes_test.go index b589a13011..50ab13090c 100644 --- a/src/autoscaler/routes/routes_test.go +++ b/src/autoscaler/routes/routes_test.go @@ -211,10 +211,14 @@ var _ = Describe("Routes", func() { }) Describe("EventGeneratorRoutes", func() { + JustBeforeEach(func() { + autoscalerRouter.CreateEventGeneratorRoutes() + }) + Context("GetAggregatedMetricHistoriesRouteName", func() { Context("when provide correct route variable", func() { It("should return the correct path", func() { - path, err := routes.EventGeneratorRoutes().Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("appid", testAppId, "metrictype", testMetricType) + path, err := router.Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("appId", testAppId, "metrictype", testMetricType) Expect(err).NotTo(HaveOccurred()) Expect(path.Path).To(Equal("/v1/apps/" + testAppId + "/aggregated_metric_histories/" + testMetricType)) }) @@ -222,7 +226,7 @@ var _ = Describe("Routes", func() { Context("when provide wrong route variable", func() { It("should return error", func() { - _, err := routes.EventGeneratorRoutes().Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("wrongVariable", testAppId) + _, err := router.Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("wrongVariable", testAppId) Expect(err).To(HaveOccurred()) }) @@ -230,7 +234,7 @@ var _ = Describe("Routes", func() { Context("when provide not enough route variable", func() { It("should return error", func() { - _, err := routes.EventGeneratorRoutes().Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("appid", testAppId) + _, err := router.Get(routes.GetAggregatedMetricHistoriesRouteName).URLPath("appid", testAppId) Expect(err).To(HaveOccurred()) }) diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go index 20b1553203..a51d0d184b 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go @@ -17,6 +17,7 @@ import ( "github.com/jmoiron/sqlx" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" yaml "gopkg.in/yaml.v3" @@ -188,24 +189,29 @@ func NewScalingEngineRunner() *ScalingEngineRunner { } } -func (engine *ScalingEngineRunner) Start() { +func (se *ScalingEngineRunner) Start() { // #nosec G204 - engineSession, err := gexec.Start( + seSession, err := gexec.Start( exec.Command( enginePath, "-c", - engine.configPath, + se.configPath, ), gexec.NewPrefixedWriter("\x1b[32m[o]\x1b[32m[engine]\x1b[0m ", GinkgoWriter), gexec.NewPrefixedWriter("\x1b[91m[e]\x1b[32m[engine]\x1b[0m ", GinkgoWriter), ) Expect(err).NotTo(HaveOccurred()) - engine.Session = engineSession + + if se.startCheck != "" { + Eventually(seSession.Buffer, 6).Should(gbytes.Say(se.startCheck)) + } + + se.Session = seSession } -func (engine *ScalingEngineRunner) Interrupt() { - if engine.Session != nil { - engine.Session.Interrupt().Wait(5 * time.Second) +func (se *ScalingEngineRunner) Interrupt() { + if se.Session != nil { + se.Session.Interrupt().Wait(5 * time.Second) } } diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go index ef08d8b933..0f8f264fdc 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go @@ -3,7 +3,6 @@ package main_test import ( "io" "strconv" - "time" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/cf" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" @@ -16,9 +15,7 @@ import ( "github.com/onsi/gomega/gbytes" "bytes" - "encoding/base64" "encoding/json" - "encoding/pem" "fmt" "net/http" "net/url" @@ -56,52 +53,6 @@ var _ = Describe("Main", func() { runner.KillWithFire() }) - Describe("with a correct config", func() { - When("starting 1 scaling engine instance", func() { - It("scaling engine should start", func() { - Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say(runner.startCheck)) - Consistently(runner.Session).ShouldNot(Exit()) - }) - - It("http server starts directly", func() { - Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.http-server.new-http-server")) - }) - }) - - When("starting multiple scaling engine instances", func() { - var ( - secondRunner *ScalingEngineRunner - ) - - JustBeforeEach(func() { - secondRunner = NewScalingEngineRunner() - secondConf := conf - - secondConf.Server.Port += 500 - secondConf.Health.ServerConfig.Port += 500 - secondConf.CFServer.Port += 500 - secondRunner.configPath = writeConfig(&secondConf).Name() - secondRunner.Start() - }) - - AfterEach(func() { - secondRunner.KillWithFire() - }) - - It("2 http server instances start", func() { - Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.http-server.new-http-server")) - Eventually(secondRunner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.http-server.new-http-server")) - Eventually(runner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.started")) - Eventually(secondRunner.Session.Buffer, 2*time.Second).Should(gbytes.Say("scalingengine.started")) - - Consistently(runner.Session).ShouldNot(Exit()) - Consistently(secondRunner.Session).ShouldNot(Exit()) - }) - - }) - - }) - Describe("With incorrect config", func() { Context("with a missing config file", func() { @@ -164,11 +115,6 @@ var _ = Describe("Main", func() { }) Describe("when http server is ready to serve RESTful API", func() { - - JustBeforeEach(func() { - Eventually(runner.Session.Buffer, 2).Should(gbytes.Say("scalingengine.started")) - }) - When("a request to trigger scaling comes", func() { It("returns with a 200", func() { body, err := json.Marshal(models.Trigger{Adjustment: "+1"}) @@ -228,10 +174,6 @@ var _ = Describe("Main", func() { runner.configPath = writeConfig(&basicAuthConfig).Name() }) - JustBeforeEach(func() { - Eventually(runner.Session.Buffer, 2).Should(gbytes.Say("scalingengine.started")) - }) - When("a request to query health comes", func() { It("returns with a 200", func() { rsp, err := httpClient.Get(healthURL.String()) @@ -256,10 +198,6 @@ var _ = Describe("Main", func() { healthURL.Path = "/health" }) - JustBeforeEach(func() { - Eventually(runner.Session.Buffer, 2).Should(gbytes.Say("scalingengine.started")) - }) - When("username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { req, err := http.NewRequest(http.MethodGet, healthURL.String(), nil) @@ -293,10 +231,6 @@ var _ = Describe("Main", func() { healthURL.Path = "/health" }) - JustBeforeEach(func() { - Eventually(runner.Session.Buffer, 2).Should(gbytes.Say("scalingengine.started")) - }) - When("username and password are incorrect for basic authentication during health check", func() { It("should return 401", func() { @@ -326,10 +260,6 @@ var _ = Describe("Main", func() { }) }) When("running CF server", func() { - - JustBeforeEach(func() { - Eventually(runner.Session.Buffer, 2).Should(gbytes.Say("scalingengine.started")) - }) When("running outside cf", func() { It("/v1/liveness should return 200", func() { cfServerURL.Path = "/v1/liveness" @@ -337,22 +267,14 @@ var _ = Describe("Main", func() { req, err := http.NewRequest(http.MethodGet, cfServerURL.String(), nil) Expect(err).NotTo(HaveOccurred()) - setXFCCCertHeader(req, conf.CFServer.XFCC.ValidOrgGuid, conf.CFServer.XFCC.ValidSpaceGuid) + err = SetXFCCCertHeader(req, conf.CFServer.XFCC.ValidOrgGuid, conf.CFServer.XFCC.ValidSpaceGuid) + Expect(err).NotTo(HaveOccurred()) rsp, err := healthHttpClient.Do(req) Expect(err).ToNot(HaveOccurred()) Expect(rsp.StatusCode).To(Equal(http.StatusOK)) - }) }) }) }) - -func setXFCCCertHeader(req *http.Request, orgGuid, spaceGuid string) { - xfccClientCert, err := GenerateClientCert(orgGuid, spaceGuid) - block, _ := pem.Decode(xfccClientCert) - Expect(err).NotTo(HaveOccurred()) - Expect(block).ShouldNot(BeNil()) - req.Header.Add("X-Forwarded-Client-Cert", base64.StdEncoding.EncodeToString(block.Bytes)) -} diff --git a/src/autoscaler/scalingengine/server/server_test.go b/src/autoscaler/scalingengine/server/server_test.go index 0353af6e09..70cf0fb82a 100644 --- a/src/autoscaler/scalingengine/server/server_test.go +++ b/src/autoscaler/scalingengine/server/server_test.go @@ -1,7 +1,6 @@ package server_test import ( - "fmt" "strconv" "strings" @@ -71,7 +70,6 @@ var _ = Describe("Server", func() { }) JustBeforeEach(func() { - fmt.Println("serverUrl: ", serverUrl.String()) req, err = http.NewRequest(method, serverUrl.String(), bodyReader) Expect(err).NotTo(HaveOccurred()) rsp, err = http.DefaultClient.Do(req) @@ -279,6 +277,7 @@ var _ = Describe("Server", func() { httpServer, err := server.CreateCFServer(xfccAuthMiddleware) Expect(err).NotTo(HaveOccurred()) serverProcess = ginkgomon_v2.Invoke(httpServer) + serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(conf.CFServer.Port)) Expect(err).ToNot(HaveOccurred()) }) diff --git a/src/autoscaler/testhelpers/certs.go b/src/autoscaler/testhelpers/certs.go index ef61c5d375..a1c93ac40c 100644 --- a/src/autoscaler/testhelpers/certs.go +++ b/src/autoscaler/testhelpers/certs.go @@ -5,9 +5,11 @@ import ( "crypto/rsa" "crypto/x509" "crypto/x509/pkix" + "encoding/base64" "encoding/pem" "fmt" "math/big" + "net/http" ) // generateClientCert generates a client certificate with the specified spaceGUID and orgGUID @@ -44,3 +46,15 @@ func GenerateClientCert(orgGUID, spaceGUID string) ([]byte, error) { return certPEM, nil } + +func SetXFCCCertHeader(req *http.Request, orgGuid, spaceGuid string) error { + xfccClientCert, err := GenerateClientCert(orgGuid, spaceGuid) + if err != nil { + return err + } + + block, _ := pem.Decode(xfccClientCert) + + req.Header.Add("X-Forwarded-Client-Cert", base64.StdEncoding.EncodeToString(block.Bytes)) + return nil +}