Skip to content

Commit

Permalink
Query Frontend: add new field for dense native histogram format (#6199)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored Dec 4, 2024
1 parent 264e640 commit 320e475
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 146 deletions.
4 changes: 3 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ func NewQuerierHandler(
)

// JSON codec is already installed. Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.ProtobufCodec{})
api.InstallCodec(codec.ProtobufCodec{CortexInternal: false})
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
api.InstallCodec(codec.ProtobufCodec{CortexInternal: true})

router := mux.NewRouter()

Expand Down
69 changes: 46 additions & 23 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type ProtobufCodec struct{}
type ProtobufCodec struct {
// cortexInternal enables encoding the whole native histogram data fields in response instead of keeping
// only few sparse information like the default JSON/Protobuf codec does.
// This will be used by Cortex Ruler to get native histograms data from Cortex Query Frontend because
// rule evaluation requires the full native histogram data.
CortexInternal bool
}

func (p ProtobufCodec) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
if !p.CortexInternal {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}
// TODO: switch to use constants.
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
Expand All @@ -29,15 +39,15 @@ func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {

// ProtobufCodec implementation is derived from https://github.com/prometheus/prometheus/blob/main/web/api/v1/json_codec.go
func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
prometheusQueryResponse, err := createPrometheusQueryResponse(resp)
prometheusQueryResponse, err := createPrometheusQueryResponse(resp, p.CortexInternal)
if err != nil {
return []byte{}, err
}
b, err := proto.Marshal(prometheusQueryResponse)
return b, err
}

func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) {
func createPrometheusQueryResponse(resp *v1.Response, cortexInternal bool) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)

var queryResult tripperware.PrometheusQueryResult
Expand All @@ -51,7 +61,10 @@ func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusRe
case model.ValVector.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: *getVectorSamples(data),
// cortexInternal tries to encode native histogram as dense format instead of sparse format.
// This is only used for vector response type since internal response is only available for Ruler
// client and Ruler only expects vector or scalar response type.
Samples: *getVectorSamples(data, cortexInternal),
},
}
default:
Expand Down Expand Up @@ -139,7 +152,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
func getVectorSamples(data *v1.QueryData, cortexInternal bool) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

Expand All @@ -158,27 +171,37 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
}
vectorSamples[i].Labels = labels

if sample.H != nil {
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
} else {
// Float samples only.
if sample.H == nil {
vectorSamples[i].Sample = &cortexpb.Sample{
TimestampMs: sample.T,
Value: sample.F,
}
continue
}

// Cortex Internal request. Encode dense float native histograms.
if cortexInternal {
hp := cortexpb.FloatHistogramToHistogramProto(sample.T, sample.H)
vectorSamples[i].RawHistogram = &hp
continue
}

// Encode sparse native histograms.
bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets)
if sample.H.ZeroCount > 0 {
bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1
}
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sample.H.AllBucketIterator()
getBuckets(buckets, it)
vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{
TimestampMs: sample.T,
Histogram: tripperware.SampleHistogram{
Count: sample.H.Count,
Sum: sample.H.Sum,
Buckets: buckets,
},
}
}
return &vectorSamples
Expand Down
112 changes: 69 additions & 43 deletions pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@ import (
)

func TestProtobufCodec_Encode(t *testing.T) {
testFloatHistogram := &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}
testProtoHistogram := cortexpb.FloatHistogramToHistogramProto(1000, testFloatHistogram)

tests := []struct {
data interface{}
expected *tripperware.PrometheusResponse
name string
data *v1.QueryData
cortexInternal bool
expected *tripperware.PrometheusResponse
}{
{
data: &v1.QueryData{
Expand Down Expand Up @@ -207,23 +227,8 @@ func TestProtobufCodec_Encode(t *testing.T) {
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
}, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
Histograms: []promql.HPoint{{H: testFloatHistogram, T: 1000}},
Metric: labels.FromStrings("__name__", "foo"),
},
},
},
Expand Down Expand Up @@ -313,22 +318,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: &histogram.FloatHistogram{
Schema: 2,
ZeroThreshold: 0.001,
ZeroCount: 12,
Count: 10,
Sum: 20,
PositiveSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 1, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: 2, Length: 2},
},
PositiveBuckets: []float64{1, 2, 2, 1, 1},
NegativeBuckets: []float64{2, 1},
},
H: testFloatHistogram,
},
},
},
Expand Down Expand Up @@ -409,17 +399,53 @@ func TestProtobufCodec_Encode(t *testing.T) {
},
},
},
{
name: "cortex internal with native histogram",
cortexInternal: true,
data: &v1.QueryData{
ResultType: parser.ValueTypeVector,
Result: promql.Vector{
promql.Sample{
Metric: labels.FromStrings("__name__", "foo"),
T: 1000,
H: testFloatHistogram,
},
},
},
expected: &tripperware.PrometheusResponse{
Status: tripperware.StatusSuccess,
Data: tripperware.PrometheusData{
ResultType: model.ValVector.String(),
Result: tripperware.PrometheusQueryResult{
Result: &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: []tripperware.Sample{
{
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
RawHistogram: &testProtoHistogram,
},
},
},
},
},
},
},
},
}

codec := ProtobufCodec{}
for _, test := range tests {
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
t.Run(test.name, func(t *testing.T) {
codec := ProtobufCodec{CortexInternal: test.cortexInternal}
body, err := codec.Encode(&v1.Response{
Status: tripperware.StatusSuccess,
Data: test.data,
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
})
require.NoError(t, err)
b, err := proto.Marshal(test.expected)
require.NoError(t, err)
require.Equal(t, string(b), string(body))
}
}
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
return samples[searchResult:]
}

// sliceHistogram assumes given histogram are sorted by timestamp in ascending order and
// sliceHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
Expand Down
Loading

0 comments on commit 320e475

Please sign in to comment.