From 0b6d2b630db2a532f81c6fce4e21ee04a16454b6 Mon Sep 17 00:00:00 2001 From: kane8n Date: Mon, 18 Nov 2024 22:47:16 +0900 Subject: [PATCH 1/6] add splunk provider Signed-off-by: kane8n --- artifacts/flagger/crd.yaml | 1 + charts/flagger/crds/crd.yaml | 1 + kustomize/base/flagger/crd.yaml | 1 + pkg/metrics/providers/factory.go | 2 + pkg/metrics/providers/splunk.go | 173 +++++++++++++++++++++++++++ pkg/metrics/providers/splunk_test.go | 160 +++++++++++++++++++++++++ 6 files changed, 338 insertions(+) create mode 100644 pkg/metrics/providers/splunk.go create mode 100644 pkg/metrics/providers/splunk_test.go diff --git a/artifacts/flagger/crd.yaml b/artifacts/flagger/crd.yaml index 99940df06..ceb5822e6 100644 --- a/artifacts/flagger/crd.yaml +++ b/artifacts/flagger/crd.yaml @@ -1299,6 +1299,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/charts/flagger/crds/crd.yaml b/charts/flagger/crds/crd.yaml index 99940df06..ceb5822e6 100644 --- a/charts/flagger/crds/crd.yaml +++ b/charts/flagger/crds/crd.yaml @@ -1299,6 +1299,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/kustomize/base/flagger/crd.yaml b/kustomize/base/flagger/crd.yaml index 99940df06..ceb5822e6 100644 --- a/kustomize/base/flagger/crd.yaml +++ b/kustomize/base/flagger/crd.yaml @@ -1299,6 +1299,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/pkg/metrics/providers/factory.go b/pkg/metrics/providers/factory.go index 2370d7e76..e49e44c56 100644 --- a/pkg/metrics/providers/factory.go +++ b/pkg/metrics/providers/factory.go @@ -43,6 +43,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric return NewDynatraceProvider(metricInterval, provider, credentials) case "keptn": return NewKeptnProvider(config) + case "splunk": + return NewSplunkProvider(metricInterval, provider, credentials) default: return NewPrometheusProvider(provider, credentials) } diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go new file mode 100644 index 000000000..c535fa7b6 --- /dev/null +++ b/pkg/metrics/providers/splunk.go @@ -0,0 +1,173 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "time" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) + +// https://docs.datadoghq.com/api/ +const ( + signalFxMTSQueryPath = "/v1/timeserieswindow" + signalFxValidationPath = "/v2/metric?limit=1" + + signalFxTokenSecretKey = "sf_token_key" + + signalFxTokenHeaderKey = "X-SF-Token" + + signalFxFromDeltaMultiplierOnMetricInterval = 10 +) + +// SplunkProvider executes signalfx queries +type SplunkProvider struct { + metricsQueryEndpoint string + apiValidationEndpoint string + + timeout time.Duration + token string + fromDelta int64 +} + +type splunkResponse struct { + Data map[string][][]float64 `json:"data"` +} + +// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and +// returns a Splunk client ready to execute queries against the API +func NewSplunkProvider(metricInterval string, + provider flaggerv1.MetricTemplateProvider, + credentials map[string][]byte) (*SplunkProvider, error) { + + address := provider.Address + if address == "" { + return nil, fmt.Errorf("splunk endpoint is not set") + } + + sp := SplunkProvider{ + timeout: 5 * time.Second, + metricsQueryEndpoint: address + signalFxMTSQueryPath, + apiValidationEndpoint: address + signalFxValidationPath, + } + + if b, ok := credentials[signalFxTokenSecretKey]; ok { + sp.token = string(b) + } else { + return nil, fmt.Errorf("splunk credentials does not contain sf_token_key") + } + + md, err := time.ParseDuration(metricInterval) + if err != nil { + return nil, fmt.Errorf("error parsing metric interval: %w", err) + } + + sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Seconds()) + return &sp, nil +} + +// RunQuery executes the query and converts the first result to float64 +func (p *SplunkProvider) RunQuery(query string) (float64, error) { + + req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil) + if err != nil { + return 0, fmt.Errorf("error http.NewRequest: %w", err) + } + + req.Header.Set(signalFxTokenHeaderKey, p.token) + now := time.Now().Unix() + q := req.URL.Query() + q.Add("query", query) + q.Add("startMS", strconv.FormatInt(now-p.fromDelta, 10)) + q.Add("endMS", strconv.FormatInt(now, 10)) + req.URL.RawQuery = q.Encode() + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, fmt.Errorf("request failed: %w", err) + } + + defer r.Body.Close() + b, err := io.ReadAll(r.Body) + if err != nil { + return 0, fmt.Errorf("error reading body: %w", err) + } + + if r.StatusCode != http.StatusOK { + return 0, fmt.Errorf("error response: %s: %w", string(b), err) + } + + var res splunkResponse + if err := json.Unmarshal(b, &res); err != nil { + return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b)) + } + + if len(res.Data) < 1 { + return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) + } + + if len(res.Data) > 1 { + return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrMultipleValuesReturned) + } + + for _, v := range res.Data { + vs := v[len(v)-1] + if len(vs) < 1 { + return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) + } + return vs[1], nil + } + return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) +} + +// IsOnline calls the provider endpoint and returns an error if the API is unreachable +func (p *SplunkProvider) IsOnline() (bool, error) { + req, err := http.NewRequest("GET", p.apiValidationEndpoint, nil) + if err != nil { + return false, fmt.Errorf("error http.NewRequest: %w", err) + } + + req.Header.Add(signalFxTokenHeaderKey, p.token) + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return false, fmt.Errorf("request failed: %w", err) + } + + defer r.Body.Close() + + b, err := io.ReadAll(r.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %w", err) + } + + if r.StatusCode != http.StatusOK { + return false, fmt.Errorf("error response: %s", string(b)) + } + + return true, nil +} diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go new file mode 100644 index 000000000..cb77669ca --- /dev/null +++ b/pkg/metrics/providers/splunk_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "errors" + "fmt" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) + +func TestNewSplunkProvider(t *testing.T) { + token := "token" + cs := map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + } + + mi := "100s" + md, err := time.ParseDuration(mi) + require.NoError(t, err) + + sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs) + require.NoError(t, err) + assert.Equal(t, "https://api.us1.signalfx.com/v1/timeserieswindow", sp.metricsQueryEndpoint) + assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) + assert.Equal(t, int64(md.Seconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) + assert.Equal(t, token, sp.token) +} + +func TestSplunkProvider_RunQuery(t *testing.T) { + token := "token" + t.Run("ok", func(t *testing.T) { + expected := 1.11111 + eq := `sf_metric:service.request.count AND http_status_code:*` + now := time.Now().Unix() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + aq := r.URL.Query().Get("query") + assert.Equal(t, eq, aq) + assert.Equal(t, token, r.Header.Get(signalFxTokenHeaderKey)) + + from, err := strconv.ParseInt(r.URL.Query().Get("startMS"), 10, 64) + if assert.NoError(t, err) { + assert.Less(t, from, now) + } + + to, err := strconv.ParseInt(r.URL.Query().Get("endMS"), 10, 64) + if assert.NoError(t, err) { + assert.GreaterOrEqual(t, to, now) + } + + json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,%f]]},"errors":[]}`, expected) + w.Write([]byte(json)) + })) + defer ts.Close() + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + }, + ) + require.NoError(t, err) + + f, err := sp.RunQuery(eq) + require.NoError(t, err) + assert.Equal(t, expected, f) + }) + + t.Run("no values", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := fmt.Sprintf(`{"data": {}, "errors": []}`) + w.Write([]byte(json)) + })) + defer ts.Close() + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + }, + ) + require.NoError(t, err) + _, err = sp.RunQuery("") + require.True(t, errors.Is(err, ErrNoValuesFound)) + }) + + t.Run("multiple values", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,6]],"AAAAAAAAAAE":[[1731643210000,6]]},"errors":[]}`) + w.Write([]byte(json)) + })) + defer ts.Close() + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + }, + ) + require.NoError(t, err) + _, err = sp.RunQuery("") + require.True(t, errors.Is(err, ErrMultipleValuesReturned)) + }) +} + +func TestSplunkProvider_IsOnline(t *testing.T) { + for _, c := range []struct { + code int + errExpected bool + }{ + {code: http.StatusOK, errExpected: false}, + {code: http.StatusUnauthorized, errExpected: true}, + } { + t.Run(fmt.Sprintf("%d", c.code), func(t *testing.T) { + token := "token" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, token, r.Header.Get(signalFxTokenHeaderKey)) + w.WriteHeader(c.code) + })) + defer ts.Close() + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + }, + ) + require.NoError(t, err) + + _, err = sp.IsOnline() + if c.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} From 476a870bf395ab5ae9287cae81149a573774a137 Mon Sep 17 00:00:00 2001 From: kane8n Date: Fri, 22 Nov 2024 14:01:00 +0900 Subject: [PATCH 2/6] fix time unit. sec -> millisec Signed-off-by: kane8n --- pkg/metrics/providers/splunk.go | 4 ++-- pkg/metrics/providers/splunk_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go index c535fa7b6..df458ff9a 100644 --- a/pkg/metrics/providers/splunk.go +++ b/pkg/metrics/providers/splunk.go @@ -82,7 +82,7 @@ func NewSplunkProvider(metricInterval string, return nil, fmt.Errorf("error parsing metric interval: %w", err) } - sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Seconds()) + sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Milliseconds()) return &sp, nil } @@ -95,7 +95,7 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) { } req.Header.Set(signalFxTokenHeaderKey, p.token) - now := time.Now().Unix() + now := time.Now().UnixMilli() q := req.URL.Query() q.Add("query", query) q.Add("startMS", strconv.FormatInt(now-p.fromDelta, 10)) diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go index cb77669ca..f154322c1 100644 --- a/pkg/metrics/providers/splunk_test.go +++ b/pkg/metrics/providers/splunk_test.go @@ -45,7 +45,7 @@ func TestNewSplunkProvider(t *testing.T) { require.NoError(t, err) assert.Equal(t, "https://api.us1.signalfx.com/v1/timeserieswindow", sp.metricsQueryEndpoint) assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) - assert.Equal(t, int64(md.Seconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) + assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) assert.Equal(t, token, sp.token) } @@ -54,7 +54,7 @@ func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("ok", func(t *testing.T) { expected := 1.11111 eq := `sf_metric:service.request.count AND http_status_code:*` - now := time.Now().Unix() + now := time.Now().UnixMilli() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { aq := r.URL.Query().Get("query") assert.Equal(t, eq, aq) From 0b73e245df5d097173f8702497dd2be4b236f15c Mon Sep 17 00:00:00 2001 From: kane8n Date: Tue, 26 Nov 2024 21:07:43 +0900 Subject: [PATCH 3/6] Change to use signalflow api Signed-off-by: kane8n --- go.mod | 3 + go.sum | 6 ++ pkg/metrics/providers/splunk.go | 89 +++++++++++------------ pkg/metrics/providers/splunk_test.go | 103 ++++++++++++++++----------- 4 files changed, 111 insertions(+), 90 deletions(-) diff --git a/go.mod b/go.mod index 575c985ae..4042e2728 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/prometheus/client_golang v1.19.1 + github.com/signalfx/signalflow-client-go v0.1.0 + github.com/signalfx/signalfx-go v1.34.0 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 @@ -50,6 +52,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/imdario/mergo v0.3.15 // indirect diff --git a/go.sum b/go.sum index 655532281..abae88514 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg= github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -163,6 +165,10 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw= +github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs= +github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE= +github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go index df458ff9a..4afbf46b6 100644 --- a/pkg/metrics/providers/splunk.go +++ b/pkg/metrics/providers/splunk.go @@ -17,20 +17,24 @@ limitations under the License. package providers import ( + "cmp" "context" - "encoding/json" "fmt" "io" "net/http" - "strconv" + "slices" + "strings" "time" + "github.com/signalfx/signalflow-client-go/signalflow" + "github.com/signalfx/signalflow-client-go/signalflow/messages" + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" ) // https://docs.datadoghq.com/api/ const ( - signalFxMTSQueryPath = "/v1/timeserieswindow" + signalFxMTSQueryPath = "/v2/signalflow/execute" signalFxValidationPath = "/v2/metric?limit=1" signalFxTokenSecretKey = "sf_token_key" @@ -51,7 +55,6 @@ type SplunkProvider struct { } type splunkResponse struct { - Data map[string][][]float64 `json:"data"` } // NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and @@ -67,8 +70,8 @@ func NewSplunkProvider(metricInterval string, sp := SplunkProvider{ timeout: 5 * time.Second, - metricsQueryEndpoint: address + signalFxMTSQueryPath, - apiValidationEndpoint: address + signalFxValidationPath, + metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxMTSQueryPath, "http", "ws", 1), "api", "stream", 1), + apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1), } if b, ok := credentials[signalFxTokenSecretKey]; ok { @@ -88,58 +91,48 @@ func NewSplunkProvider(metricInterval string, // RunQuery executes the query and converts the first result to float64 func (p *SplunkProvider) RunQuery(query string) (float64, error) { - - req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil) + c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token)) if err != nil { - return 0, fmt.Errorf("error http.NewRequest: %w", err) + return 0, fmt.Errorf("error creating signalflow client: %w", err) } - req.Header.Set(signalFxTokenHeaderKey, p.token) - now := time.Now().UnixMilli() - q := req.URL.Query() - q.Add("query", query) - q.Add("startMS", strconv.FormatInt(now-p.fromDelta, 10)) - q.Add("endMS", strconv.FormatInt(now, 10)) - req.URL.RawQuery = q.Encode() - - ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() - r, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return 0, fmt.Errorf("request failed: %w", err) - } - defer r.Body.Close() - b, err := io.ReadAll(r.Body) + now := time.Now().UnixMilli() + comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{ + Program: query, + Start: time.Unix(0, (now-p.fromDelta)*time.Millisecond.Nanoseconds()), + Stop: time.Unix(0, now*time.Millisecond.Nanoseconds()), + Immediate: true, + }) if err != nil { - return 0, fmt.Errorf("error reading body: %w", err) + return 0, fmt.Errorf("error executing query: %w", err) } - if r.StatusCode != http.StatusOK { - return 0, fmt.Errorf("error response: %s: %w", string(b), err) - } - - var res splunkResponse - if err := json.Unmarshal(b, &res); err != nil { - return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b)) - } - - if len(res.Data) < 1 { - return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) - } - - if len(res.Data) > 1 { - return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrMultipleValuesReturned) - } - - for _, v := range res.Data { - vs := v[len(v)-1] - if len(vs) < 1 { - return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) + select { + case dataMsg := <-comp.Data(): + payloads := slices.DeleteFunc(dataMsg.Payloads, func(msg messages.DataPayload) bool { + return msg.Value() == nil + }) + if len(payloads) < 1 { + return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) + } + _payloads := slices.Clone(payloads) + slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { + return cmp.Compare(i.TSID, j.TSID) + }) + if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { + return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) + } + return payloads[len(payloads)-1].Value().(float64), nil + case <-time.After(p.timeout): + err := comp.Stop(ctx) + if err != nil { + return 0, fmt.Errorf("error stopping query: %w", err) } - return vs[1], nil + return 0, fmt.Errorf("timeout waiting for query result") } - return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound) } // IsOnline calls the provider endpoint and returns an error if the API is unreachable diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go index f154322c1..b42459262 100644 --- a/pkg/metrics/providers/splunk_test.go +++ b/pkg/metrics/providers/splunk_test.go @@ -19,12 +19,16 @@ package providers import ( "errors" "fmt" + "math/rand" "net/http" "net/http/httptest" - "strconv" + "net/url" "testing" "time" + "github.com/signalfx/signalflow-client-go/signalflow" + "github.com/signalfx/signalflow-client-go/signalflow/messages" + "github.com/signalfx/signalfx-go/idtool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,84 +47,99 @@ func TestNewSplunkProvider(t *testing.T) { sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs) require.NoError(t, err) - assert.Equal(t, "https://api.us1.signalfx.com/v1/timeserieswindow", sp.metricsQueryEndpoint) + assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow/execute", sp.metricsQueryEndpoint) assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) assert.Equal(t, token, sp.token) } func TestSplunkProvider_RunQuery(t *testing.T) { - token := "token" t.Run("ok", func(t *testing.T) { - expected := 1.11111 - eq := `sf_metric:service.request.count AND http_status_code:*` - now := time.Now().UnixMilli() - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - aq := r.URL.Query().Get("query") - assert.Equal(t, eq, aq) - assert.Equal(t, token, r.Header.Get(signalFxTokenHeaderKey)) - - from, err := strconv.ParseInt(r.URL.Query().Get("startMS"), 10, 64) - if assert.NoError(t, err) { - assert.Less(t, from, now) - } + fakeBackend := signalflow.NewRunningFakeBackend() + defer fakeBackend.Stop() - to, err := strconv.ParseInt(r.URL.Query().Get("endMS"), 10, 64) - if assert.NoError(t, err) { - assert.GreaterOrEqual(t, to, now) - } + tsids := []idtool.ID{idtool.ID(rand.Int63())} + var expected float64 = float64(len(tsids)) - json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,%f]]},"errors":[]}`, expected) - w.Write([]byte(json)) - })) - defer ts.Close() + for i, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + fakeBackend.SetTSIDFloatData(tsid, float64(i+1)) + } + + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) sp, err := NewSplunkProvider("1m", - flaggerv1.MetricTemplateProvider{Address: ts.URL}, + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, map[string][]byte{ - signalFxTokenSecretKey: []byte(token), + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), }, ) require.NoError(t, err) - f, err := sp.RunQuery(eq) + f, err := sp.RunQuery(pg) require.NoError(t, err) assert.Equal(t, expected, f) }) t.Run("no values", func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := fmt.Sprintf(`{"data": {}, "errors": []}`) - w.Write([]byte(json)) - })) - defer ts.Close() + fakeBackend := signalflow.NewRunningFakeBackend() + defer fakeBackend.Stop() + + tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} + for _, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + } + + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) sp, err := NewSplunkProvider("1m", - flaggerv1.MetricTemplateProvider{Address: ts.URL}, + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, map[string][]byte{ - signalFxTokenSecretKey: []byte(token), + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), }, ) require.NoError(t, err) - _, err = sp.RunQuery("") + _, err = sp.RunQuery(pg) require.True(t, errors.Is(err, ErrNoValuesFound)) }) t.Run("multiple values", func(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,6]],"AAAAAAAAAAE":[[1731643210000,6]]},"errors":[]}`) - w.Write([]byte(json)) - })) - defer ts.Close() + fakeBackend := signalflow.NewRunningFakeBackend() + defer fakeBackend.Stop() + + tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} + for i, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + fakeBackend.SetTSIDFloatData(tsid, float64(i+1)) + } + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish(); data('service.request.count', filter=filter('service.name', 'myservice2')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) sp, err := NewSplunkProvider("1m", - flaggerv1.MetricTemplateProvider{Address: ts.URL}, + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, map[string][]byte{ - signalFxTokenSecretKey: []byte(token), + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), }, ) require.NoError(t, err) - _, err = sp.RunQuery("") + _, err = sp.RunQuery(pg) require.True(t, errors.Is(err, ErrMultipleValuesReturned)) }) } From f83e8dac3377b656f1e072d7ba20a68f4dee94d1 Mon Sep 17 00:00:00 2001 From: kane8n Date: Thu, 28 Nov 2024 19:33:21 +0900 Subject: [PATCH 4/6] bug fix Signed-off-by: kane8n --- pkg/metrics/providers/splunk.go | 69 +++++++++++++++++----------- pkg/metrics/providers/splunk_test.go | 17 +++++-- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go index 4afbf46b6..44540ad1b 100644 --- a/pkg/metrics/providers/splunk.go +++ b/pkg/metrics/providers/splunk.go @@ -34,8 +34,8 @@ import ( // https://docs.datadoghq.com/api/ const ( - signalFxMTSQueryPath = "/v2/signalflow/execute" - signalFxValidationPath = "/v2/metric?limit=1" + signalFxSignalFlowApiPath = "/v2/signalflow" + signalFxValidationPath = "/v2/metric?limit=1" signalFxTokenSecretKey = "sf_token_key" @@ -70,7 +70,7 @@ func NewSplunkProvider(metricInterval string, sp := SplunkProvider{ timeout: 5 * time.Second, - metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxMTSQueryPath, "http", "ws", 1), "api", "stream", 1), + metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1), apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1), } @@ -102,37 +102,54 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) { now := time.Now().UnixMilli() comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{ Program: query, - Start: time.Unix(0, (now-p.fromDelta)*time.Millisecond.Nanoseconds()), - Stop: time.Unix(0, now*time.Millisecond.Nanoseconds()), + Start: time.UnixMilli(now - p.fromDelta), + Stop: time.UnixMilli(now), Immediate: true, }) if err != nil { return 0, fmt.Errorf("error executing query: %w", err) } - select { - case dataMsg := <-comp.Data(): - payloads := slices.DeleteFunc(dataMsg.Payloads, func(msg messages.DataPayload) bool { - return msg.Value() == nil - }) - if len(payloads) < 1 { - return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) - } - _payloads := slices.Clone(payloads) - slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { - return cmp.Compare(i.TSID, j.TSID) - }) - if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { - return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) - } - return payloads[len(payloads)-1].Value().(float64), nil - case <-time.After(p.timeout): - err := comp.Stop(ctx) - if err != nil { - return 0, fmt.Errorf("error stopping query: %w", err) + payloads := p.receivePaylods(comp) + + if comp.Err() != nil { + return 0, fmt.Errorf("error executing query: %w", comp.Err()) + } + payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool { + return msg.Value() == nil + }) + if len(payloads) < 1 { + return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) + } + _payloads := slices.Clone(payloads) + slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { + return cmp.Compare(i.TSID, j.TSID) + }) + if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { + return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) + } + payload := payloads[len(payloads)-1] + switch payload.Type { + case messages.ValTypeLong: + return float64(payload.Int64()), nil + case messages.ValTypeDouble: + return payload.Float64(), nil + case messages.ValTypeInt: + return float64(payload.Int32()), nil + default: + return 0, fmt.Errorf("invalid response: UnsupportedValueType") + } +} + +func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload { + payloads := []messages.DataPayload{} + for dataMsg := range comp.Data() { + if dataMsg == nil { + continue } - return 0, fmt.Errorf("timeout waiting for query result") + payloads = append(payloads, dataMsg.Payloads...) } + return payloads } // IsOnline calls the provider endpoint and returns an error if the API is unreachable diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go index b42459262..bd0302718 100644 --- a/pkg/metrics/providers/splunk_test.go +++ b/pkg/metrics/providers/splunk_test.go @@ -47,7 +47,7 @@ func TestNewSplunkProvider(t *testing.T) { sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs) require.NoError(t, err) - assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow/execute", sp.metricsQueryEndpoint) + assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow", sp.metricsQueryEndpoint) assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) assert.Equal(t, token, sp.token) @@ -56,7 +56,10 @@ func TestNewSplunkProvider(t *testing.T) { func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("ok", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63())} var expected float64 = float64(len(tsids)) @@ -89,7 +92,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("no values", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} for _, tsid := range tsids { @@ -117,7 +123,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("multiple values", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} for i, tsid := range tsids { From a4bb1cb1f69047f0ef4884bb776e880bd1141df5 Mon Sep 17 00:00:00 2001 From: kane8n Date: Thu, 28 Nov 2024 21:28:56 +0900 Subject: [PATCH 5/6] update metrics.md Signed-off-by: kane8n --- docs/gitbook/usage/metrics.md | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/docs/gitbook/usage/metrics.md b/docs/gitbook/usage/metrics.md index c51fc61c2..19f3ec401 100644 --- a/docs/gitbook/usage/metrics.md +++ b/docs/gitbook/usage/metrics.md @@ -730,3 +730,54 @@ Only relevant if the `type` is set to `analysis`. For the type `analysis`, the value returned by the provider is either `0` (if the analysis failed), or `1` (analysis passed). + +## Splunk + +You can create custom metric checks using the Splunk provider. + +Create a secret that contains your authentication token that can be found in the Splunk o11y UI. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: splunk + namespace: istio-system +data: + sf_token_key: your-access-token +``` + +Splunk template example: + +```yaml +apiVersion: flagger.app/v1beta1 +kind: MetricTemplate +metadata: + name: success-rate + namespace: istio-system +spec: + provider: + type: splunk + address: https://api..signalfx.com + secretRef: + name: splunk + query: | + total = data('traces.count', filter=filter('sf_service', 'my-service-primary', 'my-service')).sum().publish(enable=False) + success = data('traces.count', filter=filter('sf_service', 'my-service-primary', 'my-service') and filter('sf_error', 'false')).sum().publish(enable=False) + ((success/total) * 100).publish() +``` +The query format documentation can be found [here](https://dev.splunk.com/observability/docs/signalflow). + +Reference the template in the canary analysis: + +```yaml + analysis: + metrics: + - name: "success rate" + templateRef: + name: success-rate + namespace: istio-system + thresholdRange: + max: 99 + interval: 1m +``` From 3e510072af2d5227d2afbd32b88140dee4d73c08 Mon Sep 17 00:00:00 2001 From: kane8n Date: Sat, 30 Nov 2024 23:12:29 +0900 Subject: [PATCH 6/6] add some comments Signed-off-by: kane8n --- pkg/metrics/providers/splunk.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go index 44540ad1b..3097644c6 100644 --- a/pkg/metrics/providers/splunk.go +++ b/pkg/metrics/providers/splunk.go @@ -32,7 +32,7 @@ import ( flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" ) -// https://docs.datadoghq.com/api/ +// https://dev.splunk.com/observability/reference const ( signalFxSignalFlowApiPath = "/v2/signalflow" signalFxValidationPath = "/v2/metric?limit=1" @@ -69,8 +69,15 @@ func NewSplunkProvider(metricInterval string, } sp := SplunkProvider{ - timeout: 5 * time.Second, - metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1), + timeout: 5 * time.Second, + // Convert the configured address to match the protocol of the respective API + // ex. + // https://api..signalfx.com -> wss://stream..signalfx.com + // wss://stream..signalfx.com -> wss://stream..signalfx.com + metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1), + // ex. + // https://api..signalfx.com -> https://api..signalfx.com + // wss://stream..signalfx.com -> https://api..signalfx.com apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1), } @@ -115,12 +122,16 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) { if comp.Err() != nil { return 0, fmt.Errorf("error executing query: %w", comp.Err()) } + payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool { return msg.Value() == nil }) if len(payloads) < 1 { return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) } + + // Error when a SignalFlow query returns two or more results. + // Since a different TSID is set for each metrics to be retrieved, eliminate duplicate TSIDs and determine if two or more TSIDs exist. _payloads := slices.Clone(payloads) slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { return cmp.Compare(i.TSID, j.TSID) @@ -128,6 +139,7 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) { if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) } + payload := payloads[len(payloads)-1] switch payload.Type { case messages.ValTypeLong: