Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🕵️ #2838

Closed
wants to merge 16 commits into from
45 changes: 36 additions & 9 deletions src/acceptance/app/dynamic_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"acceptance"
. "acceptance/helpers"
"fmt"
"sync"
"time"

cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers"
Expand All @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan chan bool
ticker *time.Ticker
maxHeapLimitMb int
wg sync.WaitGroup
)

const minimalMemoryUsage = 28 // observed minimal memory usage by the test app
Expand All @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Expect(err).NotTo(HaveOccurred())
StartApp(appName, cfg.CfPushTimeoutDuration())
instanceName = CreatePolicy(cfg, appName, appGUID, policy)

wg = sync.WaitGroup{}
})
BeforeEach(func() {
maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage
Expand Down Expand Up @@ -163,7 +167,6 @@ var _ = Describe("AutoScaler dynamic policy", func() {
})

Context("when scaling by throughput", func() {

JustBeforeEach(func() {
doneChan = make(chan bool)
doneAcceptChan = make(chan bool)
Expand All @@ -177,12 +180,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when throughput is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1)
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 50)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(25 * time.Millisecond)
// simulate ongoing ~100 requests per second
ticker = time.NewTicker(10 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -192,7 +197,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -202,17 +211,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})

})

Context("when throughput is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100)
policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 30)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -222,16 +239,26 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
})
It("should scale in", func() {

FIt("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})
})

// To check existing aggregated cpu metrics do: cf asm APP_NAME cpu
Expand Down
13 changes: 6 additions & 7 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type EnvelopeProcessorCreator interface {

type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
GetCollectionInterval() time.Duration
}

var _ EnvelopeProcessor = &Processor{}
Expand All @@ -43,10 +43,9 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current
p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes})
return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp)
}
func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric {
p.logger.Debug("GetTimerMetrics")
p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes})
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)

func (p Processor) GetCollectionInterval() time.Duration {
return p.collectionInterval
}

// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
Expand Down Expand Up @@ -91,10 +90,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
86 changes: 4 additions & 82 deletions src/autoscaler/envelopeprocessor/envelopeprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,94 +285,16 @@
})
})

Describe("#GetTimerMetrics", func() {
BeforeEach(func() {
envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 10*1000*1000, 25*1000*1000, 1111))
envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 10*1000*1000, 30*1000*1000, 1111))
envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 20*1000*1000, 30*1000*1000, 1111))
envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 50*1000*1000, 1111))
envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 30*1000*1000, 1111))
})

It("sends throughput and responsetime metric to channel", func() {
timestamp := time.Now().UnixNano()
metrics := processor.GetTimerMetrics(envelopes, "test-app-id", timestamp)

Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "test-app-id",
InstanceIndex: 0,
CollectedAt: timestamp,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: "2",
Timestamp: timestamp,
}))

Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "test-app-id",
InstanceIndex: 0,
CollectedAt: timestamp,
Name: models.MetricNameResponseTime,
Unit: models.UnitMilliseconds,
Value: "13",
Timestamp: timestamp,
}))

Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "test-app-id",
InstanceIndex: 1,
CollectedAt: timestamp,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: "3",
Timestamp: timestamp,
}))

Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "test-app-id",
InstanceIndex: 1,
CollectedAt: timestamp,
Name: models.MetricNameResponseTime,
Unit: models.UnitMilliseconds,
Value: "20",
Timestamp: timestamp,
}))
})

Context("when no available envelopes for app", func() {
BeforeEach(func() {
envelopes = []*loggregator_v2.Envelope{}
Describe("GetCollectionInterval", func() {
When("interval is set", func() {
It("returns interval", func() {
Expect(processor.GetCollectionInterval()).To(Equal(TestCollectInterval))
})

It("sends send 0 throughput and responsetime metric", func() {
timestamp := time.Now().UnixNano()
metrics := processor.GetTimerMetrics(envelopes, "another-test-app-id", timestamp)
Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "another-test-app-id",
InstanceIndex: 0,
CollectedAt: timestamp,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: "0",
Timestamp: timestamp,
}))
Expect(metrics).To(ContainElement(models.AppInstanceMetric{
AppId: "another-test-app-id",
InstanceIndex: 0,
CollectedAt: timestamp,
Name: models.MetricNameResponseTime,
Unit: models.UnitMilliseconds,
Value: "0",
Timestamp: timestamp,
}))

})

})
})
})

func generateHttpStartStopEnvelope(sourceID, instance string, start, stop, timestamp int64) *loggregator_v2.Envelope {

Check failure on line 297 in src/autoscaler/envelopeprocessor/envelopeprocessor_test.go

View workflow job for this annotation

GitHub Actions / reviewdog

[golangci] reported by reviewdog 🐶 func `generateHttpStartStopEnvelope` is unused (unused) Raw Output: envelopeprocessor/envelopeprocessor_test.go:297:6: func `generateHttpStartStopEnvelope` is unused (unused) func generateHttpStartStopEnvelope(sourceID, instance string, start, stop, timestamp int64) *loggregator_v2.Envelope { ^
e := &loggregator_v2.Envelope{
SourceId: sourceID,
InstanceId: instance,
Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/eventgenerator/aggregator/metric_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *MetricPoller) retrieveMetric(appMonitor *models.AppMonitor) error {
metrics, err := m.metricClient.GetMetrics(appId, metricType, startTime, endTime)
m.logger.Debug("received metrics from metricClient", lager.Data{"retrievedMetrics": metrics})
if err != nil {
return fmt.Errorf("retriveMetric Failed: %w", err)
return fmt.Errorf("retrieveMetric Failed: %w", err)
}
avgMetric := m.aggregate(appId, metricType, metrics)
m.logger.Debug("save-aggregated-appmetric", lager.Data{"appMetric": avgMetric})
Expand Down
Loading
Loading