diff --git a/internal/confmap/provider/eecprovider/README.md b/internal/confmap/provider/eecprovider/README.md index f135fd79..49119a6a 100644 --- a/internal/confmap/provider/eecprovider/README.md +++ b/internal/confmap/provider/eecprovider/README.md @@ -26,3 +26,12 @@ dynatrace-otel-collector --config=eec://my.eec.host:31098#refresh-interval=5s&au | refresh-interval | 10s | A time duration that defines how frequently the provider should check the given URL for updates. | | timeout | 8s | A time duration that defines how long the provider will wait until cancelling an ongoing HTTP request. | | insecure | false | If set to "true", use HTTP for the connection to the server. If unset or set to "false", use HTTPS. | + +### Request headers + +The following request headers are sent to give information about the current state of the Collector: + +| Header | Description | +|-----|-------------| +| `X-Otelcol-Config-Refresh` | Indicates whether this request is refreshing the config. Will be "false" for the first request and "true" for all subsequent requests. | +| `X-Otelcol-Config-Changed` | Indicates whether the previous request resulted in a config change and therefore reloaded the Collector. Will be "true" if a Collector reload occurred, and "false" otherwise. | diff --git a/internal/confmap/provider/eecprovider/provider.go b/internal/confmap/provider/eecprovider/provider.go index 3ca99e5d..7c572231 100644 --- a/internal/confmap/provider/eecprovider/provider.go +++ b/internal/confmap/provider/eecprovider/provider.go @@ -33,8 +33,11 @@ type SchemeType string const ( EECScheme SchemeType = "eec" - AuthHeaderKey = "Authorization" + AuthHeaderKey = "Authorization" ApiTokenPrefixFormat = "Api-Token %s" + + ConfigRefreshHeaderKey = "X-Otelcol-Config-Refresh" + ConfigChangedHeaderKey = "X-Otelcol-Config-Changed" ) type provider struct { @@ -52,6 +55,9 @@ type provider struct { // Keeps track of ongoing watcher functions and cancels existing runs // if the same URL is requested again. watcherCancelMap map[string]func() + + // Caches new config to avoid duplicate requests after the config changes. + newConfig map[string][]byte } var _ confmap.Provider = (*provider)(nil) @@ -67,7 +73,12 @@ func NewFactory() confmap.ProviderFactory { func newProvider(_ confmap.ProviderSettings) confmap.Provider { shutdown := make(chan struct{}) - return &provider{shutdown: shutdown, watcherMapMux: &sync.Mutex{}, watcherCancelMap: map[string]func(){}} + return &provider{ + shutdown: shutdown, + watcherMapMux: &sync.Mutex{}, + watcherCancelMap: map[string]func(){}, + newConfig: map[string][]byte{}, + } } // Create the client based on the type of scheme that was selected. @@ -138,8 +149,9 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap parsedUrl.Scheme = "https" } + url := parsedUrl.String() newContextBoundRequest := func(ctx context.Context) (*http.Request, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, parsedUrl.String(), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } @@ -149,16 +161,27 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap return req, nil } - reqCtx, cancel := context.WithTimeoutCause(ctx, 3*time.Second, errors.New("request to EEC timed out")) - defer cancel() - req, err := newContextBoundRequest(reqCtx) - if err != nil { - return nil, err - } + var body []byte - body, err := p.getConfigBytes(client, req) - if err != nil { - return nil, err + // If we have the new config obtained by the watcher, return it directly instead of re-fetching it. + // Otherwise, we are making an initial request, so fetch the config. + if b, ok := p.newConfig[url]; ok { + delete(p.newConfig, url) + body = b + } else { + reqCtx, cancel := context.WithTimeoutCause(ctx, 3*time.Second, errors.New("request to EEC timed out")) + defer cancel() + req, err := newContextBoundRequest(reqCtx) + if err != nil { + return nil, err + } + req.Header.Add(ConfigRefreshHeaderKey, "false") + req.Header.Add(ConfigChangedHeaderKey, "false") + + body, err = p.getConfigBytes(client, req) + if err != nil { + return nil, err + } } // If the Collector has not provided a watcherFunc, or if @@ -171,7 +194,6 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap // so we need to at a minimum protect against this case. watcherCtx, cancel := context.WithCancel(ctx) p.watcherMapMux.Lock() - url := req.URL.String() if _, ok := p.watcherCancelMap[url]; ok { p.watcherCancelMap[url]() } @@ -180,6 +202,8 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap } p.watcherMapMux.Unlock() + configChanged := "true" + watcher := &watcher{ shutdown: p.shutdown, getConfigBytes: func(ctx context.Context) ([]byte, error) { @@ -187,11 +211,25 @@ func (p *provider) Retrieve(ctx context.Context, uri string, watcherFunc confmap if err != nil { return nil, err } + req.Header.Add(ConfigRefreshHeaderKey, "true") + + req.Header.Add(ConfigChangedHeaderKey, configChanged) + + // The watcher will only ever make a single request where the config + // changed changed during the previous request, since after the config + // changes, the watcher is recreated. + if configChanged == "true" { + configChanged = "false" + } + return p.getConfigBytes(client, req) }, refreshInterval: cfg.refreshInterval, - watcherFunc: watcherFunc, - configHash: sha256.Sum256(body), + watcherFunc: func(b []byte) { + p.newConfig[url] = b + watcherFunc(&confmap.ChangeEvent{}) + }, + configHash: sha256.Sum256(body), } go watcher.watchForChanges(watcherCtx) diff --git a/internal/confmap/provider/eecprovider/provider_test.go b/internal/confmap/provider/eecprovider/provider_test.go index 4c7aa87d..47d83d0c 100644 --- a/internal/confmap/provider/eecprovider/provider_test.go +++ b/internal/confmap/provider/eecprovider/provider_test.go @@ -402,6 +402,76 @@ func TestReloadIfConfigChanges(t *testing.T) { assert.NoError(t, ep.Shutdown(context.Background())) } +func TestCachesConfigOnReload(t *testing.T) { + count := &atomic.Uint32{} + failOnRequest := &atomic.Bool{} + updatedConfigFile := "./testdata/otel-config-updated.yaml" + answerWithCount := func(w http.ResponseWriter, _ *http.Request) { + configFile := "./testdata/otel-config.yaml" + if count.Load() > 0 { + configFile = updatedConfigFile + } + if failOnRequest.Load() { + assert.Fail(t, "Should only make a single watch request before shutting down") + } + f, err := os.ReadFile(configFile) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, innerErr := w.Write([]byte("Cannot find the config file")) + if innerErr != nil { + fmt.Println("Write failed: ", innerErr) + } + return + } + w.WriteHeader(http.StatusOK) + _, err = w.Write(f) + if err != nil { + fmt.Println("Write failed: ", err) + } + count.Add(1) + } + called := &atomic.Bool{} + watcherFunc := func(_ *confmap.ChangeEvent) { + if called.Load() { + require.FailNow(t, "Reloaded more than once") + } + called.Store(true) + } + ep := newEECProvider(confmaptest.NewNopProviderSettings()) + ts := httptest.NewServer(http.HandlerFunc(answerWithCount)) + defer ts.Close() + uri, err := url.Parse(makeInsecure(t, ts.URL)) + require.NoError(t, err) + params, err := url.ParseQuery(uri.Fragment) + require.NoError(t, err) + params.Set(RefreshInterval, "20ms") + uri.Fragment = params.Encode() + + _, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return called.Load() + }, time.Second*3, time.Millisecond*5) + failOnRequest.Store(true) + + cfg, err := ep.Retrieve(context.Background(), uri.String(), watcherFunc) + require.NoError(t, err) + conf, err := cfg.AsConf() + require.NoError(t, err) + + f, err := os.ReadFile(updatedConfigFile) + require.NoError(t, err) + cfg2, err := NewRetrievedFromYAML(f) + require.NoError(t, err) + conf2, err := cfg2.AsConf() + require.NoError(t, err) + + require.Equal(t, conf.ToStringMap(), conf2.ToStringMap()) + + require.NoError(t, ep.Shutdown(context.Background())) +} + func TestContinuesRetryingOnRefreshError(t *testing.T) { count := &atomic.Uint32{} answerWithCount := func(w http.ResponseWriter, _ *http.Request) { @@ -480,7 +550,7 @@ func TestFragmentConfiguration(t *testing.T) { fmt.Println("Write failed: ", err) } - require.Equal(t, "Api-Token " + token, req.Header.Get("Authorization")) + require.Equal(t, "Api-Token "+token, req.Header.Get("Authorization")) wg.Done() } @@ -506,6 +576,121 @@ func TestFragmentConfiguration(t *testing.T) { assert.NoError(t, ep.Shutdown(context.Background())) } +func TestConfigRefreshHeader(t *testing.T) { + count := &atomic.Int64{} + + answerWithConfig := func(w http.ResponseWriter, req *http.Request) { + configFile := "./testdata/otel-config.yaml" + if count.Load()%2 == 1 { + configFile = "./testdata/otel-config-updated.yaml" + } + f, err := os.ReadFile(configFile) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, innerErr := w.Write([]byte("Cannot find the config file")) + if innerErr != nil { + fmt.Println("Write failed: ", innerErr) + } + return + } + w.WriteHeader(http.StatusOK) + _, err = w.Write(f) + if err != nil { + fmt.Println("Write failed: ", err) + } + + if count.Load() == 0 { + assert.Equal(t, "false", req.Header.Get(ConfigRefreshHeaderKey)) + } else { + assert.Equal(t, "true", req.Header.Get(ConfigRefreshHeaderKey)) + } + + count.Add(1) + } + watcherFunc := func(_ *confmap.ChangeEvent) {} + ep := newEECProvider(confmaptest.NewNopProviderSettings()) + ts := httptest.NewServer(http.HandlerFunc(answerWithConfig)) + defer ts.Close() + uri, err := url.Parse(makeInsecure(t, ts.URL)) + require.NoError(t, err) + params, err := url.ParseQuery(uri.Fragment) + require.NoError(t, err) + params.Set(RefreshInterval, "5ms") + uri.Fragment = params.Encode() + + for i := 0; i < 5; i++ { + _, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return count.Load() > int64(i) + }, time.Second*3, time.Millisecond*20) + } + + require.NoError(t, ep.Shutdown(context.Background())) +} + +func TestConfigChangedHeader(t *testing.T) { + count := &atomic.Int64{} + + answerWithConfig := func(w http.ResponseWriter, req *http.Request) { + configFile := "./testdata/otel-config.yaml" + // Change the config every third request + if count.Load()%4 == 1 { + configFile = "./testdata/otel-config-updated.yaml" + } + f, err := os.ReadFile(configFile) + if err != nil { + w.WriteHeader(http.StatusNotFound) + _, innerErr := w.Write([]byte("Cannot find the config file")) + if innerErr != nil { + fmt.Println("Write failed: ", innerErr) + } + return + } + w.WriteHeader(http.StatusOK) + _, err = w.Write(f) + if err != nil { + fmt.Println("Write failed: ", err) + } + + // The header should only be set to true on the request after the config changed + if count.Load() == 1 || count.Load()%4 == 2 || count.Load()%4 == 3 { + assert.Equal(t, "true", req.Header.Get(ConfigChangedHeaderKey)) + } else { + assert.Equal(t, "false", req.Header.Get(ConfigChangedHeaderKey)) + } + + count.Add(1) + } + done := make(chan struct{}, 1) + watcherFunc := func(_ *confmap.ChangeEvent) { + done <- struct{}{} + } + ep := newEECProvider(confmaptest.NewNopProviderSettings()) + ts := httptest.NewServer(http.HandlerFunc(answerWithConfig)) + defer ts.Close() + uri, err := url.Parse(makeInsecure(t, ts.URL)) + require.NoError(t, err) + params, err := url.ParseQuery(uri.Fragment) + require.NoError(t, err) + params.Set(RefreshInterval, "20ms") + uri.Fragment = params.Encode() + + for i := 0; i < 5; i++ { + _, err = ep.Retrieve(context.Background(), uri.String(), watcherFunc) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for config reload") + } + } + + require.NoError(t, ep.Shutdown(context.Background())) +} + func makeInsecure(t *testing.T, URL string) string { parsedURL, err := url.Parse(URL) require.NoError(t, err) diff --git a/internal/confmap/provider/eecprovider/watcher.go b/internal/confmap/provider/eecprovider/watcher.go index 92c8d7e1..2da1e352 100644 --- a/internal/confmap/provider/eecprovider/watcher.go +++ b/internal/confmap/provider/eecprovider/watcher.go @@ -9,8 +9,6 @@ import ( "errors" "fmt" "time" - - "go.opentelemetry.io/collector/confmap" ) type watcher struct { @@ -19,7 +17,7 @@ type watcher struct { getConfigBytes func(context.Context) ([]byte, error) refreshInterval time.Duration - watcherFunc confmap.WatcherFunc + watcherFunc func([]byte) configHash [32]byte } @@ -46,7 +44,7 @@ func (w *watcher) watchForChanges(ctx context.Context) { // the Collector and stop watching. A new watcher // will be created once the provider's Retrieve // method is called again. - w.watcherFunc(&confmap.ChangeEvent{}) + w.watcherFunc(body) return } case <-w.shutdown: diff --git a/internal/confmap/provider/eecprovider/watcher_test.go b/internal/confmap/provider/eecprovider/watcher_test.go index e3b68f1d..fc9e23f2 100644 --- a/internal/confmap/provider/eecprovider/watcher_test.go +++ b/internal/confmap/provider/eecprovider/watcher_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/confmap" ) func TestWatchForChanges(t *testing.T) { @@ -29,7 +28,7 @@ func TestWatchForChanges(t *testing.T) { shutdown: shutdown, getConfigBytes: getConfigBytes, refreshInterval: time.Millisecond, - watcherFunc: func(ce *confmap.ChangeEvent) { + watcherFunc: func([]byte) { require.FailNow(t, "WatcherFunc should be called when config has not changed") }, configHash: sha256.Sum256(nil), @@ -56,7 +55,7 @@ func TestStopsOnRequestDone(t *testing.T) { w := watcher{ getConfigBytes: getConfigBytes, refreshInterval: time.Hour, - watcherFunc: func(ce *confmap.ChangeEvent) {}, + watcherFunc: func([]byte) {}, configHash: sha256.Sum256(nil), } @@ -71,6 +70,7 @@ func TestStopsOnRequestDone(t *testing.T) { func TestCallsWatcherFunc(t *testing.T) { wg := &sync.WaitGroup{} + body := []byte("hello") getConfigBytes := func(context.Context) ([]byte, error) { return []byte("hello"), nil } @@ -79,8 +79,8 @@ func TestCallsWatcherFunc(t *testing.T) { shutdown: shutdown, getConfigBytes: getConfigBytes, refreshInterval: time.Millisecond, - watcherFunc: func(ce *confmap.ChangeEvent) { - assert.NotNil(t, ce) + watcherFunc: func(b []byte) { + assert.Equal(t, body, b) wg.Done() }, configHash: sha256.Sum256(nil), @@ -110,7 +110,7 @@ func TestHandlesGetBodyError(t *testing.T) { shutdown: shutdown, getConfigBytes: getConfigBytes, refreshInterval: time.Millisecond * 5, - watcherFunc: func(ce *confmap.ChangeEvent) { + watcherFunc: func([]byte) { require.FailNow(t, "WatcherFunc should be called when config has not changed") }, configHash: sha256.Sum256(nil),