Skip to content

Commit

Permalink
Add informational headers to EEC provider requests
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Aug 28, 2024
1 parent ca159ba commit ad491c6
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 26 deletions.
9 changes: 9 additions & 0 deletions internal/confmap/provider/eecprovider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ dynatrace-otel-collector --config=eec://my.eec.host:31098#refresh-interval=5s&au
| refresh-interval | 10s | A time duration that defines how frequently the provider should check the given URL for updates. |
| timeout | 8s | A time duration that defines how long the provider will wait until cancelling an ongoing HTTP request. |
| insecure | false | If set to "true", use HTTP for the connection to the server. If unset or set to "false", use HTTPS. |

### Request headers

The following request headers are sent to give information about the current state of the Collector:

| Header | Description |
|-----|-------------|
| `X-Otelcol-Config-Refresh` | Indicates whether this request is refreshing the config. Will be "false" for the first request and "true" for all subsequent requests. |
| `X-Otelcol-Config-Changed` | Indicates whether the previous request resulted in a config change and therefore reloaded the Collector. Will be "true" if a Collector reload occurred, and "false" otherwise. |
68 changes: 53 additions & 15 deletions internal/confmap/provider/eecprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ type SchemeType string
const (
EECScheme SchemeType = "eec"

AuthHeaderKey = "Authorization"
AuthHeaderKey = "Authorization"
ApiTokenPrefixFormat = "Api-Token %s"

ConfigRefreshHeaderKey = "X-Otelcol-Config-Refresh"
ConfigChangedHeaderKey = "X-Otelcol-Config-Changed"
)

type provider struct {
Expand All @@ -52,6 +55,9 @@ type provider struct {
// Keeps track of ongoing watcher functions and cancels existing runs
// if the same URL is requested again.
watcherCancelMap map[string]func()

// Caches new config to avoid duplicate requests after the config changes.
newConfig map[string][]byte
}

var _ confmap.Provider = (*provider)(nil)
Expand All @@ -67,7 +73,12 @@ func NewFactory() confmap.ProviderFactory {

func newProvider(_ confmap.ProviderSettings) confmap.Provider {
shutdown := make(chan struct{})
return &provider{shutdown: shutdown, watcherMapMux: &sync.Mutex{}, watcherCancelMap: map[string]func(){}}
return &provider{
shutdown: shutdown,
watcherMapMux: &sync.Mutex{},
watcherCancelMap: map[string]func(){},
newConfig: map[string][]byte{},
}
}

// Create the client based on the type of scheme that was selected.
Expand Down Expand Up @@ -138,8 +149,9 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap
parsedUrl.Scheme = "https"
}

url := parsedUrl.String()
newContextBoundRequest := func(ctx context.Context) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, parsedUrl.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
Expand All @@ -149,16 +161,27 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap
return req, nil
}

reqCtx, cancel := context.WithTimeoutCause(ctx, 3*time.Second, errors.New("request to EEC timed out"))
defer cancel()
req, err := newContextBoundRequest(reqCtx)
if err != nil {
return nil, err
}
var body []byte

body, err := p.getConfigBytes(client, req)
if err != nil {
return nil, err
// If we have the new config obtained by the watcher, return it directly instead of re-fetching it.
// Otherwise, we are making an initial request, so fetch the config.
if b, ok := p.newConfig[url]; ok {
delete(p.newConfig, url)
body = b
} else {
reqCtx, cancel := context.WithTimeoutCause(ctx, 3*time.Second, errors.New("request to EEC timed out"))
defer cancel()
req, err := newContextBoundRequest(reqCtx)
if err != nil {
return nil, err
}
req.Header.Add(ConfigRefreshHeaderKey, "false")
req.Header.Add(ConfigChangedHeaderKey, "false")

body, err = p.getConfigBytes(client, req)
if err != nil {
return nil, err
}
}

// If the Collector has not provided a watcherFunc, or if
Expand All @@ -171,7 +194,6 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap
// so we need to at a minimum protect against this case.
watcherCtx, cancel := context.WithCancel(ctx)
p.watcherMapMux.Lock()
url := req.URL.String()
if _, ok := p.watcherCancelMap[url]; ok {
p.watcherCancelMap[url]()
}
Expand All @@ -180,18 +202,34 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap
}
p.watcherMapMux.Unlock()

configChanged := "true"

watcher := &watcher{
shutdown: p.shutdown,
getConfigBytes: func(ctx context.Context) ([]byte, error) {
req, err := newContextBoundRequest(ctx)
if err != nil {
return nil, err
}
req.Header.Add(ConfigRefreshHeaderKey, "true")

req.Header.Add(ConfigChangedHeaderKey, configChanged)

// The watcher will only ever make a single request where the config
// changed changed during the previous request, since after the config
// changes, the watcher is recreated.
if configChanged == "true" {
configChanged = "false"
}

return p.getConfigBytes(client, req)
},
refreshInterval: cfg.refreshInterval,
watcherFunc: watcherFunc,
configHash: sha256.Sum256(body),
watcherFunc: func(b []byte) {
p.newConfig[url] = b
watcherFunc(&confmap.ChangeEvent{})
},
configHash: sha256.Sum256(body),
}

go watcher.watchForChanges(watcherCtx)
Expand Down
187 changes: 186 additions & 1 deletion internal/confmap/provider/eecprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,76 @@ func TestReloadIfConfigChanges(t *testing.T) {
assert.NoError(t, ep.Shutdown(context.Background()))
}

func TestCachesConfigOnReload(t *testing.T) {
count := &atomic.Uint32{}
failOnRequest := &atomic.Bool{}
updatedConfigFile := "./testdata/otel-config-updated.yaml"
answerWithCount := func(w http.ResponseWriter, _ *http.Request) {
configFile := "./testdata/otel-config.yaml"
if count.Load() > 0 {
configFile = updatedConfigFile
}
if failOnRequest.Load() {
assert.Fail(t, "Should only make a single watch request before shutting down")
}
f, err := os.ReadFile(configFile)
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, innerErr := w.Write([]byte("Cannot find the config file"))
if innerErr != nil {
fmt.Println("Write failed: ", innerErr)
}
return
}
w.WriteHeader(http.StatusOK)
_, err = w.Write(f)
if err != nil {
fmt.Println("Write failed: ", err)
}
count.Add(1)
}
called := &atomic.Bool{}
watcherFunc := func(_ *confmap.ChangeEvent) {
if called.Load() {
require.FailNow(t, "Reloaded more than once")
}
called.Store(true)
}
ep := newEECProvider(confmaptest.NewNopProviderSettings())
ts := httptest.NewServer(http.HandlerFunc(answerWithCount))
defer ts.Close()
uri, err := url.Parse(makeInsecure(t, ts.URL))
require.NoError(t, err)
params, err := url.ParseQuery(uri.Fragment)
require.NoError(t, err)
params.Set(RefreshInterval, "20ms")
uri.Fragment = params.Encode()

_, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc)
require.NoError(t, err)

require.Eventually(t, func() bool {
return called.Load()
}, time.Second*3, time.Millisecond*5)
failOnRequest.Store(true)

cfg, err := ep.Retrieve(context.Background(), uri.String(), watcherFunc)
require.NoError(t, err)
conf, err := cfg.AsConf()
require.NoError(t, err)

f, err := os.ReadFile(updatedConfigFile)
require.NoError(t, err)
cfg2, err := NewRetrievedFromYAML(f)
require.NoError(t, err)
conf2, err := cfg2.AsConf()
require.NoError(t, err)

require.Equal(t, conf.ToStringMap(), conf2.ToStringMap())

require.NoError(t, ep.Shutdown(context.Background()))
}

func TestContinuesRetryingOnRefreshError(t *testing.T) {
count := &atomic.Uint32{}
answerWithCount := func(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -480,7 +550,7 @@ func TestFragmentConfiguration(t *testing.T) {
fmt.Println("Write failed: ", err)
}

require.Equal(t, "Api-Token " + token, req.Header.Get("Authorization"))
require.Equal(t, "Api-Token "+token, req.Header.Get("Authorization"))

wg.Done()
}
Expand All @@ -506,6 +576,121 @@ func TestFragmentConfiguration(t *testing.T) {
assert.NoError(t, ep.Shutdown(context.Background()))
}

func TestConfigRefreshHeader(t *testing.T) {
count := &atomic.Int64{}

answerWithConfig := func(w http.ResponseWriter, req *http.Request) {
configFile := "./testdata/otel-config.yaml"
if count.Load()%2 == 1 {
configFile = "./testdata/otel-config-updated.yaml"
}
f, err := os.ReadFile(configFile)
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, innerErr := w.Write([]byte("Cannot find the config file"))
if innerErr != nil {
fmt.Println("Write failed: ", innerErr)
}
return
}
w.WriteHeader(http.StatusOK)
_, err = w.Write(f)
if err != nil {
fmt.Println("Write failed: ", err)
}

if count.Load() == 0 {
assert.Equal(t, "false", req.Header.Get(ConfigRefreshHeaderKey))
} else {
assert.Equal(t, "true", req.Header.Get(ConfigRefreshHeaderKey))
}

count.Add(1)
}
watcherFunc := func(_ *confmap.ChangeEvent) {}
ep := newEECProvider(confmaptest.NewNopProviderSettings())
ts := httptest.NewServer(http.HandlerFunc(answerWithConfig))
defer ts.Close()
uri, err := url.Parse(makeInsecure(t, ts.URL))
require.NoError(t, err)
params, err := url.ParseQuery(uri.Fragment)
require.NoError(t, err)
params.Set(RefreshInterval, "5ms")
uri.Fragment = params.Encode()

for i := 0; i < 5; i++ {
_, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc)
require.NoError(t, err)

require.Eventually(t, func() bool {
return count.Load() > int64(i)
}, time.Second*3, time.Millisecond*20)
}

require.NoError(t, ep.Shutdown(context.Background()))
}

func TestConfigChangedHeader(t *testing.T) {
count := &atomic.Int64{}

answerWithConfig := func(w http.ResponseWriter, req *http.Request) {
configFile := "./testdata/otel-config.yaml"
// Change the config every third request
if count.Load()%4 == 1 {
configFile = "./testdata/otel-config-updated.yaml"
}
f, err := os.ReadFile(configFile)
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, innerErr := w.Write([]byte("Cannot find the config file"))
if innerErr != nil {
fmt.Println("Write failed: ", innerErr)
}
return
}
w.WriteHeader(http.StatusOK)
_, err = w.Write(f)
if err != nil {
fmt.Println("Write failed: ", err)
}

// The header should only be set to true on the request after the config changed
if count.Load() == 1 || count.Load()%4 == 2 || count.Load()%4 == 3 {
assert.Equal(t, "true", req.Header.Get(ConfigChangedHeaderKey))
} else {
assert.Equal(t, "false", req.Header.Get(ConfigChangedHeaderKey))
}

count.Add(1)
}
done := make(chan struct{}, 1)
watcherFunc := func(_ *confmap.ChangeEvent) {
done <- struct{}{}
}
ep := newEECProvider(confmaptest.NewNopProviderSettings())
ts := httptest.NewServer(http.HandlerFunc(answerWithConfig))
defer ts.Close()
uri, err := url.Parse(makeInsecure(t, ts.URL))
require.NoError(t, err)
params, err := url.ParseQuery(uri.Fragment)
require.NoError(t, err)
params.Set(RefreshInterval, "20ms")
uri.Fragment = params.Encode()

for i := 0; i < 5; i++ {
_, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc)
require.NoError(t, err)

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for config reload")
}
}

require.NoError(t, ep.Shutdown(context.Background()))
}

func makeInsecure(t *testing.T, URL string) string {
parsedURL, err := url.Parse(URL)
require.NoError(t, err)
Expand Down
6 changes: 2 additions & 4 deletions internal/confmap/provider/eecprovider/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/confmap"
)

type watcher struct {
Expand All @@ -19,7 +17,7 @@ type watcher struct {
getConfigBytes func(context.Context) ([]byte, error)

refreshInterval time.Duration
watcherFunc confmap.WatcherFunc
watcherFunc func([]byte)
configHash [32]byte
}

Expand All @@ -46,7 +44,7 @@ func (w *watcher) watchForChanges(ctx context.Context) {
// the Collector and stop watching. A new watcher
// will be created once the provider's Retrieve
// method is called again.
w.watcherFunc(&confmap.ChangeEvent{})
w.watcherFunc(body)
return
}
case <-w.shutdown:
Expand Down
Loading

0 comments on commit ad491c6

Please sign in to comment.