Skip to content

Commit

Permalink
🕵️
Browse files Browse the repository at this point in the history
  • Loading branch information
geigerj0 committed Apr 12, 2024
1 parent 59ecb3c commit ad16d6c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/autoscaler/api/policyvalidator/policy_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (pv *PolicyValidator) validateScalingRuleThreshold(policy *models.ScalingPo
}

switch scalingRule.MetricType {
case "memoryused":
case "memoryused": // TODO: make use of models constants e.g. models.MetricNameMemoryUsed
if scalingRule.Threshold < 0 {
formatString := shouldBeGreaterThanOrEqual("memoryused", 1)
err := newPolicyValidationError(currentContext, formatString, errDetails)
Expand Down
9 changes: 7 additions & 2 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type EnvelopeProcessorCreator interface {
type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
GetCollectionInterval() time.Duration
}

var _ EnvelopeProcessor = &Processor{}
Expand Down Expand Up @@ -49,6 +50,10 @@ func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID s
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)
}

func (p Processor) GetCollectionInterval() time.Duration {
return p.collectionInterval
}

// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
// loggregator. We compact this message by matching source_id and timestamp to facilitate metrics calulations.
func (p Processor) CompactEnvelopes(envelopes []*loggregator_v2.Envelope) []*loggregator_v2.Envelope {
Expand Down Expand Up @@ -91,10 +96,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
8 changes: 8 additions & 0 deletions src/autoscaler/envelopeprocessor/envelopeprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ var _ = Describe("Envelopeprocessor", func() {

})
})

Describe("GetCollectionInterval", func() {
When("interval is set", func() {
It("returns interval", func() {
Expect(processor.GetCollectionInterval()).To(Equal(TestCollectInterval))
})
})
})
})

func generateHttpStartStopEnvelope(sourceID, instance string, start, stop, timestamp int64) *loggregator_v2.Envelope {
Expand Down
73 changes: 70 additions & 3 deletions src/autoscaler/eventgenerator/client/log_cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"crypto/tls"
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/envelopeprocessor"
Expand All @@ -15,6 +17,7 @@ import (

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/models"
logcache "code.cloudfoundry.org/go-log-cache/v2"
"code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
rpc "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
"code.cloudfoundry.org/lager/v3"
Expand All @@ -36,6 +39,7 @@ type LogCacheClient struct {

type LogCacheClientReader interface {
Read(ctx context.Context, sourceID string, start time.Time, opts ...logcache.ReadOption) ([]*loggregator_v2.Envelope, error)
PromQL(ctx context.Context, query string, opts ...logcache.PromQLOption) (*logcache_v1.PromQL_InstantQueryResult, error)
}

type GRPCOptions interface {
Expand Down Expand Up @@ -90,9 +94,74 @@ func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopePr

func (c *LogCacheClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) {
var metrics []models.AppInstanceMetric

var err error

if metricType == models.MetricNameThroughput {
collectionInterval := fmt.Sprintf("%.0f", c.envelopeProcessor.GetCollectionInterval().Seconds())
now := time.Now()

query := fmt.Sprintf("sum by (instance_id) (count_over_time(http{source_id='%s'}[%ss])) / %s)", appId, collectionInterval, collectionInterval)
result, err :=c.Client.PromQL(context.Background(), query, logcache.WithPromQLTime(now))
if err != nil {
return []models.AppInstanceMetric{}, fmt.Errorf("failed getting PromQL throughput result (appId: %s, collectionInterval: %s, query: %s, time: %s): %w", appId, collectionInterval, query, now.String(), err)
}

// safeguard: the query ensures that we get a vector but let's double-check
vector := result.GetVector()
if vector == nil {
return []models.AppInstanceMetric{}, fmt.Errorf("throughput result is not a vector")
}

// return empty metric if there are no samples
if len(vector.GetSamples()) <= 0 {
return []models.AppInstanceMetric{
{
AppId: appId,
InstanceIndex: 0,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: "0",
CollectedAt: now.UnixNano(),
Timestamp: now.UnixNano(),
},
}, nil
}

// convert promQL result into the autoscaler metric struct
var metrics []models.AppInstanceMetric
for _, s := range vector.GetSamples() {
// safeguard: metric label instance_id should be always there but let's double-check
instanceIdStr, ok := s.GetMetric()["instance_id"]
if !ok {
return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain instance_id: %w", err)
}

instanceIdUInt, err := strconv.ParseUint(instanceIdStr, 10, 32)
if err != nil {
return []models.AppInstanceMetric{}, fmt.Errorf("could not convert instance_id to uint32: %w", err)
}

p := s.GetPoint()
if p == nil {
return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain a point")
}

instanceId := uint32(instanceIdUInt)
valueWithoutDecimalsRoundedToCeiling := fmt.Sprintf("%.0f", math.Ceil(p.GetValue()))

metrics = append(metrics, models.AppInstanceMetric{
AppId: appId,
InstanceIndex: instanceId,
Name: models.MetricNameThroughput,
Unit: models.UnitRPS,
Value: valueWithoutDecimalsRoundedToCeiling,
CollectedAt: now.UnixNano(),
Timestamp: now.UnixNano(),
})
}
return metrics, nil
}

filters := logCacheFiltersFor(endTime, metricType)
c.logger.Debug("GetMetrics", lager.Data{"filters": valuesFrom(filters)})
envelopes, err := c.Client.Read(context.Background(), appId, startTime, filters...)
Expand Down Expand Up @@ -149,8 +218,6 @@ func (c *LogCacheClient) Configure() {
oauth2HTTPClient := c.goLogCache.NewOauth2HTTPClient(c.uaaCreds.URL, c.uaaCreds.ClientID, c.uaaCreds.ClientSecret, oauth2HTTPOpts)
opts = append(opts, c.goLogCache.WithHTTPClient(oauth2HTTPClient))
}

c.Client = c.goLogCache.NewClient(c.url, opts...)
}

func (c *LogCacheClient) GetUaaTlsConfig() *tls.Config {
Expand Down

0 comments on commit ad16d6c

Please sign in to comment.