Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Splunk as a metrics provider #1733

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions artifacts/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions charts/flagger/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
51 changes: 51 additions & 0 deletions docs/gitbook/usage/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,54 @@ Only relevant if the `type` is set to `analysis`.

For the type `analysis`, the value returned by the provider is either `0`
(if the analysis failed), or `1` (analysis passed).

## Splunk

You can create custom metric checks using the Splunk provider.

Create a secret that contains your authentication token that can be found in the Splunk o11y UI.

```yaml
apiVersion: v1
kind: Secret
metadata:
name: splunk
namespace: istio-system
data:
sf_token_key: your-access-token
```

Splunk template example:

```yaml
apiVersion: flagger.app/v1beta1
kind: MetricTemplate
metadata:
name: success-rate
namespace: istio-system
spec:
provider:
type: splunk
address: https://api.<REALM>.signalfx.com
secretRef:
name: splunk
query: |
total = data('traces.count', filter=filter('sf_service', 'my-service-primary', 'my-service')).sum().publish(enable=False)
success = data('traces.count', filter=filter('sf_service', 'my-service-primary', 'my-service') and filter('sf_error', 'false')).sum().publish(enable=False)
((success/total) * 100).publish()
```
The query format documentation can be found [here](https://dev.splunk.com/observability/docs/signalflow).

Reference the template in the canary analysis:

```yaml
analysis:
metrics:
- name: "success rate"
templateRef:
name: success-rate
namespace: istio-system
thresholdRange:
max: 99
interval: 1m
```
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/prometheus/client_golang v1.20.5
github.com/signalfx/signalflow-client-go v0.1.0
github.com/signalfx/signalfx-go v1.44.0
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.9.0
Expand Down Expand Up @@ -50,6 +52,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gT
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o=
github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -172,6 +174,10 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw=
github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs=
github.com/signalfx/signalfx-go v1.44.0 h1:BkLtohTJkq3mr1Yl1OzCWK+e2DZRqZ0M0zD9Gs+c41Q=
github.com/signalfx/signalfx-go v1.44.0/go.mod h1:I30umyhRTu8mPpEtMzEbG0z9wOYjkUKTp9U0gFxFsmk=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
Expand Down
1 change: 1 addition & 0 deletions kustomize/base/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric
return NewDynatraceProvider(metricInterval, provider, credentials)
case "keptn":
return NewKeptnProvider(config)
case "splunk":
return NewSplunkProvider(metricInterval, provider, credentials)
default:
return NewPrometheusProvider(provider, credentials)
}
Expand Down
195 changes: 195 additions & 0 deletions pkg/metrics/providers/splunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
Copyright 2020 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package providers

import (
"cmp"
"context"
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"

"github.com/signalfx/signalflow-client-go/signalflow"
"github.com/signalfx/signalflow-client-go/signalflow/messages"

flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
)

// https://dev.splunk.com/observability/reference
const (
signalFxSignalFlowApiPath = "/v2/signalflow"
signalFxValidationPath = "/v2/metric?limit=1"

signalFxTokenSecretKey = "sf_token_key"

signalFxTokenHeaderKey = "X-SF-Token"

signalFxFromDeltaMultiplierOnMetricInterval = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you specify the purpose of this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I referred to the datadog implementation.
In some cases, the splunk api could only acquire empty data if the specified period was short, so to ensure that data is acquired, a period 10 times longer than the set interval is used.

)

// SplunkProvider executes signalfx queries
type SplunkProvider struct {
metricsQueryEndpoint string
apiValidationEndpoint string

timeout time.Duration
token string
fromDelta int64
}

type splunkResponse struct {
}

// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and
// returns a Splunk client ready to execute queries against the API
func NewSplunkProvider(metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte) (*SplunkProvider, error) {

address := provider.Address
if address == "" {
return nil, fmt.Errorf("splunk endpoint is not set")
}

sp := SplunkProvider{
timeout: 5 * time.Second,
// Convert the configured address to match the protocol of the respective API
// ex.
// https://api.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
// wss://stream.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1),
// ex.
// https://api.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
// wss://stream.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1),
}

if b, ok := credentials[signalFxTokenSecretKey]; ok {
sp.token = string(b)
} else {
return nil, fmt.Errorf("splunk credentials does not contain sf_token_key")
}

md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %w", err)
}

sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Milliseconds())
return &sp, nil
}

// RunQuery executes the query and converts the first result to float64
func (p *SplunkProvider) RunQuery(query string) (float64, error) {
c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token))
if err != nil {
return 0, fmt.Errorf("error creating signalflow client: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

now := time.Now().UnixMilli()
comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{
Program: query,
Start: time.UnixMilli(now - p.fromDelta),
Stop: time.UnixMilli(now),
Immediate: true,
})
if err != nil {
return 0, fmt.Errorf("error executing query: %w", err)
}

payloads := p.receivePaylods(comp)

if comp.Err() != nil {
return 0, fmt.Errorf("error executing query: %w", comp.Err())
}

payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool {
return msg.Value() == nil
})
if len(payloads) < 1 {
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
}

// Error when a SignalFlow query returns two or more results.
// Since a different TSID is set for each metrics to be retrieved, eliminate duplicate TSIDs and determine if two or more TSIDs exist.
_payloads := slices.Clone(payloads)
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments explaining this logic would be nice

return cmp.Compare(i.TSID, j.TSID)
})
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
}

payload := payloads[len(payloads)-1]
switch payload.Type {
case messages.ValTypeLong:
return float64(payload.Int64()), nil
case messages.ValTypeDouble:
return payload.Float64(), nil
case messages.ValTypeInt:
return float64(payload.Int32()), nil
default:
return 0, fmt.Errorf("invalid response: UnsupportedValueType")
}
}

func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload {
payloads := []messages.DataPayload{}
for dataMsg := range comp.Data() {
if dataMsg == nil {
continue
}
payloads = append(payloads, dataMsg.Payloads...)
}
return payloads
}

// IsOnline calls the provider endpoint and returns an error if the API is unreachable
func (p *SplunkProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %w", err)
}

req.Header.Add(signalFxTokenHeaderKey, p.token)

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, fmt.Errorf("request failed: %w", err)
}

defer r.Body.Close()

b, err := io.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %w", err)
}

if r.StatusCode != http.StatusOK {
return false, fmt.Errorf("error response: %s", string(b))
}

return true, nil
}
Loading