Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eventgenerator): use PromQL API for metric types throughput and responsetime #2890

Merged
merged 16 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions src/acceptance/app/dynamic_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"acceptance"
. "acceptance/helpers"
"fmt"
"sync"
"time"

cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers"
Expand All @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan chan bool
ticker *time.Ticker
maxHeapLimitMb int
wg sync.WaitGroup
)

const minimalMemoryUsage = 28 // observed minimal memory usage by the test app
Expand All @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Expect(err).NotTo(HaveOccurred())
StartApp(appName, cfg.CfPushTimeoutDuration())
instanceName = CreatePolicy(cfg, appName, appGUID, policy)

wg = sync.WaitGroup{}
})
BeforeEach(func() {
maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage
Expand Down Expand Up @@ -105,12 +109,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when responsetime is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 2000)
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 800)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 slow requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -120,7 +126,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/slow/3000", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -129,17 +139,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale out", Label(acceptance.LabelSmokeTests), func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

Context("when responsetime is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "responsetime", 1000)
policy = GenerateDynamicScaleInPolicyBetween("responsetime", 800, 1200)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(2 * time.Second)
// simulate ongoing ~20 fast requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -149,7 +167,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -158,12 +180,17 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

})

Context("when scaling by throughput", func() {

JustBeforeEach(func() {
doneChan = make(chan bool)
doneAcceptChan = make(chan bool)
Expand All @@ -177,12 +204,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when throughput is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1)
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 15)
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(25 * time.Millisecond)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -192,7 +221,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -202,17 +235,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})

})

Context("when throughput is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100)
policy = GenerateDynamicScaleInPolicyBetween("throughput", 15, 25)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -222,16 +263,26 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
})

It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})
})

// To check existing aggregated cpu metrics do: cf asm APP_NAME cpu
Expand Down
46 changes: 46 additions & 0 deletions src/acceptance/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,52 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName
return string(marshaled)
}

// GenerateDynamicScaleInPolicyBetween creates a scaling policy, that scales down from 2 instances to 1, if the metric value lies within a given range.
// Example how the scaling rules must be defined to achieve a "scale down if value is in range" behaviour:
//
// val < 10 ➡ +1 ➡ don't do anything if below 10 because there are already 2 instances
// val > 30 ➡ +1 ➡ don't do anything if above 30 because there are already 2 instances
// val <= 30 ➡ -1 ➡ scale down if less than 30
func GenerateDynamicScaleInPolicyBetween(metricName string, scaleInLowerThreshold int64, scaleInUpperThreshold int64) string {
noDownscalingWhenBelowLower := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInLowerThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

noDownscalingWhenAboveUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: ">",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

downscalingWhenBelowUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "-1",
}

policy := ScalingPolicy{
InstanceMin: 1,
InstanceMax: 2,
ScalingRules: []*ScalingRule{&noDownscalingWhenBelowLower, &noDownscalingWhenAboveUpper, &downscalingWhenBelowUpper},
}

marshaled, err := MarshalWithoutHTMLEscape(policy)
Expect(err).NotTo(HaveOccurred())

return string(marshaled)
}
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved

func GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime time.Time, scheduledInstanceMin, scheduledInstanceMax, scheduledInstanceInit int) string {
scalingInRule := ScalingRule{
MetricType: "cpu",
Expand Down
20 changes: 6 additions & 14 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@ import (
)

type EnvelopeProcessorCreator interface {
NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor
NewProcessor(logger lager.Logger) Processor
}

type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
}

var _ EnvelopeProcessor = &Processor{}

type Processor struct {
logger lager.Logger
collectionInterval time.Duration
Copy link
Contributor Author

@geigerj0 geigerj0 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed collectionInterval because it was completely unused in the Processor

logger lager.Logger
}

func NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor {
func NewProcessor(logger lager.Logger) Processor {
return Processor{
logger: logger.Session("EnvelopeProcessor"),
collectionInterval: collectionInterval,
logger: logger.Session("EnvelopeProcessor"),
}
}

Expand All @@ -43,11 +40,6 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current
p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes})
return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp)
}
func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric {
p.logger.Debug("GetTimerMetrics")
p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes})
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)
}
Comment on lines -46 to -50
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need to process the timer metrics ourself since log-cache does it now for us after switching to a PromQL API call 👼


// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
// loggregator. We compact this message by matching source_id and timestamp to facilitate metrics calulations.
Expand Down Expand Up @@ -91,10 +83,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: typo fix

sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: typo fix

responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
Loading
Loading