Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🕵️ #2838

Closed
wants to merge 16 commits into from
6 changes: 3 additions & 3 deletions src/acceptance/app/dynamic_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when throughput is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1)
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 79)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(25 * time.Millisecond)
ticker = time.NewTicker(10 * time.Millisecond)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -198,7 +198,7 @@ var _ = Describe("AutoScaler dynamic policy", func() {
}(doneChan)
})

It("should scale out", func() {
FIt("should scale out", func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

Expand Down
2 changes: 1 addition & 1 deletion src/autoscaler/api/policyvalidator/policy_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (pv *PolicyValidator) validateScalingRuleThreshold(policy *models.ScalingPo
}

switch scalingRule.MetricType {
case "memoryused":
case "memoryused": // TODO: make use of models constants e.g. models.MetricNameMemoryUsed
if scalingRule.Threshold < 0 {
formatString := shouldBeGreaterThanOrEqual("memoryused", 1)
err := newPolicyValidationError(currentContext, formatString, errDetails)
Expand Down
9 changes: 7 additions & 2 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type EnvelopeProcessorCreator interface {
type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
GetCollectionInterval() time.Duration
}

var _ EnvelopeProcessor = &Processor{}
Expand Down Expand Up @@ -49,6 +50,10 @@ func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID s
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)
}

func (p Processor) GetCollectionInterval() time.Duration {
return p.collectionInterval
}

// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
// loggregator. We compact this message by matching source_id and timestamp to facilitate metrics calulations.
func (p Processor) CompactEnvelopes(envelopes []*loggregator_v2.Envelope) []*loggregator_v2.Envelope {
Expand Down Expand Up @@ -91,10 +96,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
8 changes: 8 additions & 0 deletions src/autoscaler/envelopeprocessor/envelopeprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ var _ = Describe("Envelopeprocessor", func() {

})
})

Describe("GetCollectionInterval", func() {
When("interval is set", func() {
It("returns interval", func() {
Expect(processor.GetCollectionInterval()).To(Equal(TestCollectInterval))
})
})
})
})

func generateHttpStartStopEnvelope(sourceID, instance string, start, stop, timestamp int64) *loggregator_v2.Envelope {
Expand Down
73 changes: 70 additions & 3 deletions src/autoscaler/eventgenerator/client/log_cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"context"
"crypto/tls"
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/envelopeprocessor"
Expand All @@ -15,6 +17,7 @@

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/models"
logcache "code.cloudfoundry.org/go-log-cache/v2"
"code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
rpc "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
"code.cloudfoundry.org/lager/v3"
Expand All @@ -36,6 +39,7 @@

type LogCacheClientReader interface {
Read(ctx context.Context, sourceID string, start time.Time, opts ...logcache.ReadOption) ([]*loggregator_v2.Envelope, error)
PromQL(ctx context.Context, query string, opts ...logcache.PromQLOption) (*logcache_v1.PromQL_InstantQueryResult, error)
}

type GRPCOptions interface {
Expand Down Expand Up @@ -90,9 +94,74 @@

func (c *LogCacheClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) {
var metrics []models.AppInstanceMetric

var err error

if metricType == models.MetricNameThroughput {
collectionInterval := fmt.Sprintf("%.0f", c.envelopeProcessor.GetCollectionInterval().Seconds())
now := time.Now()

query := fmt.Sprintf("sum by (instance_id) (count_over_time(http{source_id='%s'}[%ss])) / %s)", appId, collectionInterval, collectionInterval)
result, err :=c.Client.PromQL(context.Background(), query, logcache.WithPromQLTime(now))
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return []models.AppInstanceMetric{}, fmt.Errorf("failed getting PromQL throughput result (appId: %s, collectionInterval: %s, query: %s, time: %s): %w", appId, collectionInterval, query, now.String(), err)
}

// safeguard: the query ensures that we get a vector but let's double-check
vector := result.GetVector()
if vector == nil {
return []models.AppInstanceMetric{}, fmt.Errorf("throughput result is not a vector")
}

// return empty metric if there are no samples
if len(vector.GetSamples()) <= 0 {
return []models.AppInstanceMetric{
{
AppId: appId,
InstanceIndex: 0,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: "0",
CollectedAt: now.UnixNano(),
Timestamp: now.UnixNano(),
},
}, nil
}

// convert promQL result into the autoscaler metric struct
var metrics []models.AppInstanceMetric
for _, s := range vector.GetSamples() {
// safeguard: metric label instance_id should be always there but let's double-check
instanceIdStr, ok := s.GetMetric()["instance_id"]
if !ok {
return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain instance_id: %w", err)
}

instanceIdUInt, err := strconv.ParseUint(instanceIdStr, 10, 32)
if err != nil {
return []models.AppInstanceMetric{}, fmt.Errorf("could not convert instance_id to uint32: %w", err)
}

p := s.GetPoint()
if p == nil {
return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain a point")
}

instanceId := uint32(instanceIdUInt)
valueWithoutDecimalsRoundedToCeiling := fmt.Sprintf("%.0f", math.Ceil(p.GetValue()))

metrics = append(metrics, models.AppInstanceMetric{
AppId: appId,
InstanceIndex: instanceId,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: valueWithoutDecimalsRoundedToCeiling,
CollectedAt: now.UnixNano(),
Timestamp: now.UnixNano(),
})
}
return metrics, nil
}

filters := logCacheFiltersFor(endTime, metricType)
c.logger.Debug("GetMetrics", lager.Data{"filters": valuesFrom(filters)})
envelopes, err := c.Client.Read(context.Background(), appId, startTime, filters...)
Expand Down Expand Up @@ -143,14 +212,12 @@
var opts []logcache.ClientOption

if c.uaaCreds.IsEmpty() {
opts = append(opts, c.goLogCache.WithViaGRPC(c.grpc.WithTransportCredentials(credentials.NewTLS(c.TLSConfig))))

Check failure on line 215 in src/autoscaler/eventgenerator/client/log_cache_client.go

View workflow job for this annotation

GitHub Actions / reviewdog

[golangci] reported by reviewdog 🐶 ineffectual assignment to opts (ineffassign) Raw Output: eventgenerator/client/log_cache_client.go:215:3: ineffectual assignment to opts (ineffassign) opts = append(opts, c.goLogCache.WithViaGRPC(c.grpc.WithTransportCredentials(credentials.NewTLS(c.TLSConfig)))) ^
} else {
oauth2HTTPOpts := c.goLogCache.WithOauth2HTTPClient(c.getUaaHttpClient())
oauth2HTTPClient := c.goLogCache.NewOauth2HTTPClient(c.uaaCreds.URL, c.uaaCreds.ClientID, c.uaaCreds.ClientSecret, oauth2HTTPOpts)
opts = append(opts, c.goLogCache.WithHTTPClient(oauth2HTTPClient))
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
}

c.Client = c.goLogCache.NewClient(c.url, opts...)
}

func (c *LogCacheClient) GetUaaTlsConfig() *tls.Config {
Expand Down
Loading