Skip to content

Commit

Permalink
fix(eventgenerator): use PromQL API for metric types throughput and…
Browse files Browse the repository at this point in the history
… `responsetime` (#2890)
  • Loading branch information
geigerj0 authored Apr 26, 2024
1 parent 3e10da2 commit b8a5d64
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 219 deletions.
79 changes: 65 additions & 14 deletions src/acceptance/app/dynamic_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"acceptance"
. "acceptance/helpers"
"fmt"
"sync"
"time"

cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers"
Expand All @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan chan bool
ticker *time.Ticker
maxHeapLimitMb int
wg sync.WaitGroup
)

const minimalMemoryUsage = 28 // observed minimal memory usage by the test app
Expand All @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Expect(err).NotTo(HaveOccurred())
StartApp(appName, cfg.CfPushTimeoutDuration())
instanceName = CreatePolicy(cfg, appName, appGUID, policy)

wg = sync.WaitGroup{}
})
BeforeEach(func() {
maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage
Expand Down Expand Up @@ -105,12 +109,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when responsetime is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 2000)
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 800)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 slow requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -120,7 +126,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/slow/3000", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -129,17 +139,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale out", Label(acceptance.LabelSmokeTests), func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

Context("when responsetime is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "responsetime", 1000)
policy = GenerateDynamicScaleInPolicyBetween("responsetime", 800, 1200)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(2 * time.Second)
// simulate ongoing ~20 fast requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -149,7 +167,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -158,12 +180,17 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

})

Context("when scaling by throughput", func() {

JustBeforeEach(func() {
doneChan = make(chan bool)
doneAcceptChan = make(chan bool)
Expand All @@ -177,12 +204,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when throughput is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1)
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 15)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(25 * time.Millisecond)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -192,7 +221,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -202,17 +235,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})

})

Context("when throughput is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100)
policy = GenerateDynamicScaleInPolicyBetween("throughput", 15, 25)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -222,16 +263,26 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
})

It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})
})

// To check existing aggregated cpu metrics do: cf asm APP_NAME cpu
Expand Down
46 changes: 46 additions & 0 deletions src/acceptance/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,52 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName
return string(marshaled)
}

// GenerateDynamicScaleInPolicyBetween creates a scaling policy, that scales down from 2 instances to 1, if the metric value lies within a given range.
// Example how the scaling rules must be defined to achieve a "scale down if value is in range" behaviour:
//
// val < 10 ➡ +1 ➡ don't do anything if below 10 because there are already 2 instances
// val > 30 ➡ +1 ➡ don't do anything if above 30 because there are already 2 instances
// val <= 30 ➡ -1 ➡ scale down if less than 30
func GenerateDynamicScaleInPolicyBetween(metricName string, scaleInLowerThreshold int64, scaleInUpperThreshold int64) string {
noDownscalingWhenBelowLower := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInLowerThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

noDownscalingWhenAboveUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: ">",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

downscalingWhenBelowUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "-1",
}

policy := ScalingPolicy{
InstanceMin: 1,
InstanceMax: 2,
ScalingRules: []*ScalingRule{&noDownscalingWhenBelowLower, &noDownscalingWhenAboveUpper, &downscalingWhenBelowUpper},
}

marshaled, err := MarshalWithoutHTMLEscape(policy)
Expect(err).NotTo(HaveOccurred())

return string(marshaled)
}

func GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime time.Time, scheduledInstanceMin, scheduledInstanceMax, scheduledInstanceInit int) string {
scalingInRule := ScalingRule{
MetricType: "cpu",
Expand Down
20 changes: 6 additions & 14 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@ import (
)

type EnvelopeProcessorCreator interface {
NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor
NewProcessor(logger lager.Logger) Processor
}

type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
}

var _ EnvelopeProcessor = &Processor{}

type Processor struct {
logger lager.Logger
collectionInterval time.Duration
logger lager.Logger
}

func NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor {
func NewProcessor(logger lager.Logger) Processor {
return Processor{
logger: logger.Session("EnvelopeProcessor"),
collectionInterval: collectionInterval,
logger: logger.Session("EnvelopeProcessor"),
}
}

Expand All @@ -43,11 +40,6 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current
p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes})
return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp)
}
func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric {
p.logger.Debug("GetTimerMetrics")
p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes})
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)
}

// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
// loggregator. We compact this message by matching source_id and timestamp to facilitate metrics calulations.
Expand Down Expand Up @@ -91,10 +83,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
Loading

0 comments on commit b8a5d64

Please sign in to comment.