From cf4f4ae918ac23c0c737526f144a81bf14cefafc Mon Sep 17 00:00:00 2001 From: smithclay Date: Mon, 2 Oct 2023 17:05:52 -0700 Subject: [PATCH] use callbacks, add tests --- src/go.mod | 1 + src/go.sum | 1 + src/mapper/cmd/main.go | 18 ++++-- src/mapper/pkg/clouduploader/cloud_upload.go | 21 ++----- .../pkg/clouduploader/cloud_uploader_test.go | 23 +++---- src/mapper/pkg/intentsstore/holder.go | 35 ++++++++++- src/mapper/pkg/metricexporter/edge_metric.go | 7 +++ .../pkg/metricexporter/metric_exporter.go | 34 +++++++++++ .../metric_exporter_config.go} | 2 +- .../metricexporter/metric_exporter_test.go | 60 +++++++++++++++++++ .../pkg/metricexporter/mock_edge_metric.go | 47 +++++++++++++++ .../otel_edge_metric.go} | 44 +++----------- .../pkg/otelexporter/otel_exporter_test.go | 53 ---------------- 13 files changed, 220 insertions(+), 126 deletions(-) create mode 100644 src/mapper/pkg/metricexporter/edge_metric.go create mode 100644 src/mapper/pkg/metricexporter/metric_exporter.go rename src/mapper/pkg/{otelexporter/otel_config.go => metricexporter/metric_exporter_config.go} (93%) create mode 100644 src/mapper/pkg/metricexporter/metric_exporter_test.go create mode 100644 src/mapper/pkg/metricexporter/mock_edge_metric.go rename src/mapper/pkg/{otelexporter/otel_exporter.go => metricexporter/otel_edge_metric.go} (64%) delete mode 100644 src/mapper/pkg/otelexporter/otel_exporter_test.go diff --git a/src/go.mod b/src/go.mod index 7f4e41df..a61834b9 100644 --- a/src/go.mod +++ b/src/go.mod @@ -31,6 +31,7 @@ require ( go.opentelemetry.io/otel/metric v1.18.0 go.opentelemetry.io/otel/sdk v1.18.0 go.opentelemetry.io/otel/sdk/metric v0.41.0 + go.uber.org/mock v0.2.0 golang.org/x/exp v0.0.0-20230124195608-d38c7dcee874 golang.org/x/sync v0.3.0 gotest.tools/v3 v3.0.3 diff --git a/src/go.sum b/src/go.sum index eeee9fff..04094e3d 100644 --- a/src/go.sum +++ b/src/go.sum @@ -389,6 +389,7 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8 go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU= +go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/src/mapper/cmd/main.go b/src/mapper/cmd/main.go index 491e9cbe..f311890d 100644 --- a/src/mapper/cmd/main.go +++ b/src/mapper/cmd/main.go @@ -20,7 +20,7 @@ import ( "github.com/otterize/network-mapper/src/mapper/pkg/config" "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" "github.com/otterize/network-mapper/src/mapper/pkg/kubefinder" - "github.com/otterize/network-mapper/src/mapper/pkg/otelexporter" + "github.com/otterize/network-mapper/src/mapper/pkg/metricexporter" "github.com/otterize/network-mapper/src/mapper/pkg/resolvers" sharedconfig "github.com/otterize/network-mapper/src/shared/config" "github.com/otterize/network-mapper/src/shared/kubeutils" @@ -119,21 +119,27 @@ func main() { if err != nil { logrus.WithError(err).Fatal("Failed to initialize cloud client") } + + cloudUploaderConfig := clouduploader.ConfigFromViper() if cloudEnabled { - cloudUploaderConfig := clouduploader.ConfigFromViper() cloudUploader := clouduploader.NewCloudUploader(intentsHolder, cloudUploaderConfig, cloudClient) - go cloudUploader.PeriodicIntentsUpload(cloudClientCtx) + intentsHolder.RegisterGetCallback(cloudUploader.GetIntentCallback) go cloudUploader.PeriodicStatusReport(cloudClientCtx) } otelCtx, otelCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer otelCancel() - otelExporterConfig := otelexporter.ConfigFromViper() - otelExporter, err := otelexporter.NewOtelExporter(otelCtx, intentsHolder, otelExporterConfig) + otelExporterConfig := metricexporter.ConfigFromViper() + otelExporter, err := metricexporter.NewMetricExporter(otelCtx, otelExporterConfig) + intentsHolder.RegisterGetCallback(otelExporter.GetIntentCallback) if err != nil { logrus.WithError(err).Fatal("Failed to initialize otel exporter") } - go otelExporter.PeriodicIntentsExport(otelCtx) + + // start intent discover and notify callbacks of new intents + ihCtx, ihCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer ihCancel() + go intentsHolder.PeriodicIntentsUpload(ihCtx, cloudUploaderConfig.UploadInterval) telemetrysender.SetGlobalVersion(version.Version()) telemetrysender.SendNetworkMapper(telemetriesgql.EventTypeStarted, 1) diff --git a/src/mapper/pkg/clouduploader/cloud_upload.go b/src/mapper/pkg/clouduploader/cloud_upload.go index 1e99a3f0..22bea1f6 100644 --- a/src/mapper/pkg/clouduploader/cloud_upload.go +++ b/src/mapper/pkg/clouduploader/cloud_upload.go @@ -2,13 +2,14 @@ package clouduploader import ( "context" + "time" + "github.com/cenkalti/backoff/v4" "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" "github.com/samber/lo" "github.com/sirupsen/logrus" - "time" ) type CloudUploader struct { @@ -25,10 +26,10 @@ func NewCloudUploader(intentsHolder *intentsstore.IntentsHolder, config Config, } } -func (c *CloudUploader) uploadDiscoveredIntents(ctx context.Context) { +func (c *CloudUploader) GetIntentCallback(ctx context.Context, intents []intentsstore.TimestampedIntent) { logrus.Info("Search for intents") - discoveredIntents := lo.Map(c.intentsHolder.GetNewIntentsSinceLastGet(), func(intent intentsstore.TimestampedIntent, _ int) *cloudclient.DiscoveredIntentInput { + discoveredIntents := lo.Map(intents, func(intent intentsstore.TimestampedIntent, _ int) *cloudclient.DiscoveredIntentInput { return &cloudclient.DiscoveredIntentInput{ DiscoveredAt: lo.ToPtr(intent.Timestamp), Intent: &cloudclient.IntentInput{ @@ -90,20 +91,6 @@ func (c *CloudUploader) reportStatus(ctx context.Context) { } } -func (c *CloudUploader) PeriodicIntentsUpload(ctx context.Context) { - logrus.Info("Starting periodic intents upload") - - for { - select { - case <-time.After(c.config.UploadInterval): - c.uploadDiscoveredIntents(ctx) - - case <-ctx.Done(): - return - } - } -} - func (c *CloudUploader) PeriodicStatusReport(ctx context.Context) { logrus.Info("Starting status reporting") c.reportStatus(ctx) diff --git a/src/mapper/pkg/clouduploader/cloud_uploader_test.go b/src/mapper/pkg/clouduploader/cloud_uploader_test.go index 2ee0d345..2f39cc1e 100644 --- a/src/mapper/pkg/clouduploader/cloud_uploader_test.go +++ b/src/mapper/pkg/clouduploader/cloud_uploader_test.go @@ -3,6 +3,9 @@ package clouduploader import ( "context" "errors" + "testing" + "time" + "github.com/golang/mock/gomock" "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" cloudclientmocks "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient/mocks" @@ -11,8 +14,6 @@ import ( "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" "github.com/samber/lo" "github.com/stretchr/testify/suite" - "testing" - "time" ) var ( @@ -75,7 +76,7 @@ func (s *CloudUploaderTestSuite) TestUploadIntents() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents1)).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) s.addIntent("client2", s.testNamespace, "server1", s.testNamespace) @@ -85,7 +86,7 @@ func (s *CloudUploaderTestSuite) TestUploadIntents() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents2)).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestUploadIntentsInBatches() { @@ -102,13 +103,13 @@ func (s *CloudUploaderTestSuite) TestUploadIntentsInBatches() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher([]cloudclient.IntentInput{intents1[0]})).Return(nil).Times(1) s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher([]cloudclient.IntentInput{intents1[1]})).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestDontUploadWithoutIntents() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), gomock.Any()).Times(0) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestUploadSameIntentOnce() { @@ -120,11 +121,11 @@ func (s *CloudUploaderTestSuite) TestUploadSameIntentOnce() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1) s.addIntent("client", s.testNamespace, "server", s.testNamespace) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestRetryOnFailed() { @@ -138,7 +139,7 @@ func (s *CloudUploaderTestSuite) TestRetryOnFailed() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestDontUploadWhenNothingNew() { @@ -150,8 +151,8 @@ func (s *CloudUploaderTestSuite) TestDontUploadWhenNothingNew() { s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) + s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet()) } func (s *CloudUploaderTestSuite) TestReportMapperComponent() { diff --git a/src/mapper/pkg/intentsstore/holder.go b/src/mapper/pkg/intentsstore/holder.go index c600f7cf..54a990c1 100644 --- a/src/mapper/pkg/intentsstore/holder.go +++ b/src/mapper/pkg/intentsstore/holder.go @@ -1,7 +1,12 @@ package intentsstore import ( + "context" "encoding/json" + "strings" + "sync" + "time" + "github.com/amit7itz/goset" "github.com/otterize/network-mapper/src/mapper/pkg/config" "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" @@ -9,9 +14,6 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/exp/slices" "k8s.io/apimachinery/pkg/types" - "strings" - "sync" - "time" ) type IntentsStoreKey struct { @@ -31,6 +33,7 @@ type IntentsHolder struct { accumulatingStore IntentsStore sinceLastGetStore IntentsStore lock sync.Mutex + callbacks []func(context.Context, []TimestampedIntent) } func NewIntentsHolder() *IntentsHolder { @@ -38,6 +41,7 @@ func NewIntentsHolder() *IntentsHolder { accumulatingStore: make(IntentsStore), sinceLastGetStore: make(IntentsStore), lock: sync.Mutex{}, + callbacks: make([]func(context.Context, []TimestampedIntent), 0), } } @@ -153,6 +157,31 @@ func (i *IntentsHolder) addIntentToStore(store IntentsStore, newTimestamp time.T store[key] = existingIntent } +func (i *IntentsHolder) PeriodicIntentsUpload(ctx context.Context, d time.Duration) { + logrus.Info("Starting periodic intents upload") + + for { + select { + case <-time.After(d): + if len(i.callbacks) == 0 { + return + } + + intents := i.GetNewIntentsSinceLastGet() + for _, callback := range i.callbacks { + callback(ctx, intents) + } + + case <-ctx.Done(): + return + } + } +} + +func (i *IntentsHolder) RegisterGetCallback(callback func(context.Context, []TimestampedIntent)) { + i.callbacks = append(i.callbacks, callback) +} + func (i *IntentsHolder) AddIntent(newTimestamp time.Time, intent model.Intent) { if config.ExcludedNamespaces().Contains(intent.Client.Namespace) || config.ExcludedNamespaces().Contains(intent.Server.Namespace) { return diff --git a/src/mapper/pkg/metricexporter/edge_metric.go b/src/mapper/pkg/metricexporter/edge_metric.go new file mode 100644 index 00000000..b3d5c30e --- /dev/null +++ b/src/mapper/pkg/metricexporter/edge_metric.go @@ -0,0 +1,7 @@ +package metricexporter + +import "context" + +type EdgeMetric interface { + Record(ctx context.Context, from string, to string) +} diff --git a/src/mapper/pkg/metricexporter/metric_exporter.go b/src/mapper/pkg/metricexporter/metric_exporter.go new file mode 100644 index 00000000..87532892 --- /dev/null +++ b/src/mapper/pkg/metricexporter/metric_exporter.go @@ -0,0 +1,34 @@ +package metricexporter + +import ( + "context" + + "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" + "github.com/sirupsen/logrus" +) + +type MetricExporter struct { + config Config + edgeMetric EdgeMetric +} + +func NewMetricExporter(ctx context.Context, config Config) (*MetricExporter, error) { + em, err := NewOtelEdgeMetric(ctx, config.ExportInterval) + if err != nil { + return nil, err + } + + return &MetricExporter{ + config: config, + edgeMetric: em, + }, nil +} + +func (o *MetricExporter) GetIntentCallback(ctx context.Context, intents []intentsstore.TimestampedIntent) { + for _, intent := range intents { + clientName := intent.Intent.Client.Name + serverName := intent.Intent.Server.Name + logrus.Debugf("recording metric counter: %s -> %s", clientName, serverName) + o.edgeMetric.Record(ctx, clientName, serverName) + } +} diff --git a/src/mapper/pkg/otelexporter/otel_config.go b/src/mapper/pkg/metricexporter/metric_exporter_config.go similarity index 93% rename from src/mapper/pkg/otelexporter/otel_config.go rename to src/mapper/pkg/metricexporter/metric_exporter_config.go index 9b908f66..686363cd 100644 --- a/src/mapper/pkg/otelexporter/otel_config.go +++ b/src/mapper/pkg/metricexporter/metric_exporter_config.go @@ -1,4 +1,4 @@ -package otelexporter +package metricexporter import ( "time" diff --git a/src/mapper/pkg/metricexporter/metric_exporter_test.go b/src/mapper/pkg/metricexporter/metric_exporter_test.go new file mode 100644 index 00000000..cde1883f --- /dev/null +++ b/src/mapper/pkg/metricexporter/metric_exporter_test.go @@ -0,0 +1,60 @@ +package metricexporter + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" + "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" + "github.com/stretchr/testify/suite" +) + +type MetricExporterTestSuite struct { + suite.Suite + testNamespace string + intentsHolder *intentsstore.IntentsHolder + edgeMock *MockEdgeMetric + metricExporter *MetricExporter +} + +var ( + testTimestamp = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) +) + +func (o *MetricExporterTestSuite) SetupTest() { + o.testNamespace = "default" + o.intentsHolder = intentsstore.NewIntentsHolder() +} + +func (o *MetricExporterTestSuite) BeforeTest(s, testName string) { + controller := gomock.NewController(o.T()) + o.edgeMock = NewMockEdgeMetric(controller) + + metricExporter, _ := NewMetricExporter(context.Background(), Config{ExportInterval: 1 * time.Second}) + metricExporter.edgeMetric = o.edgeMock + o.metricExporter = metricExporter +} + +func (o *MetricExporterTestSuite) addIntent(source string, srcNamespace string, destination string, dstNamespace string) { + o.intentsHolder.AddIntent( + testTimestamp, + model.Intent{ + Client: &model.OtterizeServiceIdentity{Name: source, Namespace: srcNamespace}, + Server: &model.OtterizeServiceIdentity{Name: destination, Namespace: dstNamespace}, + }, + ) +} + +func (o *MetricExporterTestSuite) TestExportIntents() { + o.addIntent("client1", o.testNamespace, "server1", o.testNamespace) + o.addIntent("client1", o.testNamespace, "server2", "external-namespace") + o.edgeMock.EXPECT().Record(context.Background(), "client1", "server1").Times(1) + o.edgeMock.EXPECT().Record(context.Background(), "client1", "server2").Times(1) + o.metricExporter.GetIntentCallback(context.Background(), o.intentsHolder.GetNewIntentsSinceLastGet()) +} + +func TestRunSuite(t *testing.T) { + suite.Run(t, new(MetricExporterTestSuite)) +} diff --git a/src/mapper/pkg/metricexporter/mock_edge_metric.go b/src/mapper/pkg/metricexporter/mock_edge_metric.go new file mode 100644 index 00000000..76a9e581 --- /dev/null +++ b/src/mapper/pkg/metricexporter/mock_edge_metric.go @@ -0,0 +1,47 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: network-mapper/src/mapper/pkg/otelexporter/edge_metric.go + +// Package mock_otelexporter is a generated GoMock package. +package metricexporter + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockEdgeMetric is a mock of EdgeMetric interface. +type MockEdgeMetric struct { + ctrl *gomock.Controller + recorder *MockEdgeMetricMockRecorder +} + +// MockEdgeMetricMockRecorder is the mock recorder for MockEdgeMetric. +type MockEdgeMetricMockRecorder struct { + mock *MockEdgeMetric +} + +// NewMockEdgeMetric creates a new mock instance. +func NewMockEdgeMetric(ctrl *gomock.Controller) *MockEdgeMetric { + mock := &MockEdgeMetric{ctrl: ctrl} + mock.recorder = &MockEdgeMetricMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEdgeMetric) EXPECT() *MockEdgeMetricMockRecorder { + return m.recorder +} + +// Record mocks base method. +func (m *MockEdgeMetric) Record(ctx context.Context, from, to string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Record", ctx, from, to) +} + +// Record indicates an expected call of Record. +func (mr *MockEdgeMetricMockRecorder) Record(ctx, from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Record", reflect.TypeOf((*MockEdgeMetric)(nil).Record), ctx, from, to) +} \ No newline at end of file diff --git a/src/mapper/pkg/otelexporter/otel_exporter.go b/src/mapper/pkg/metricexporter/otel_edge_metric.go similarity index 64% rename from src/mapper/pkg/otelexporter/otel_exporter.go rename to src/mapper/pkg/metricexporter/otel_edge_metric.go index 39618329..4540d51e 100644 --- a/src/mapper/pkg/otelexporter/otel_exporter.go +++ b/src/mapper/pkg/metricexporter/otel_edge_metric.go @@ -1,13 +1,10 @@ -package otelexporter +package metricexporter import ( "context" "time" - "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" sharedconfig "github.com/otterize/network-mapper/src/shared/config" - "github.com/otterize/network-mapper/src/shared/version" - "github.com/sirupsen/logrus" "github.com/spf13/viper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -18,18 +15,15 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.21.0" ) -type OtelExporter struct { - config Config +type OtelEdgeMetric struct { meterProvider *sdk.MeterProvider counter metric.Int64Counter - intentsHolder *intentsstore.IntentsHolder } func newResource() (*resource.Resource, error) { return resource.Merge(resource.Default(), resource.NewWithAttributes(semconv.SchemaURL, semconv.OTelLibraryName("otterize/network-mapper"), - semconv.OTelLibraryVersion(version.Version()), )) } @@ -71,13 +65,17 @@ func newMeterProvider(ctx context.Context, res *resource.Resource, exportInterva return meterProvider, nil } -func NewOtelExporter(ctx context.Context, ih *intentsstore.IntentsHolder, config Config) (*OtelExporter, error) { +func (o *OtelEdgeMetric) Record(ctx context.Context, from string, to string) { + o.counter.Add(ctx, 1, metric.WithAttributes(attribute.String(ClientAttributeName, from), attribute.String(ServerAttributeName, to))) +} + +func NewOtelEdgeMetric(ctx context.Context, d time.Duration) (*OtelEdgeMetric, error) { res, err := newResource() if err != nil { return nil, err } - meterProvider, err := newMeterProvider(ctx, res, config.ExportInterval) + meterProvider, err := newMeterProvider(ctx, res, d) if err != nil { return nil, err } @@ -91,32 +89,8 @@ func NewOtelExporter(ctx context.Context, ih *intentsstore.IntentsHolder, config return nil, err } - return &OtelExporter{ - intentsHolder: ih, - config: config, + return &OtelEdgeMetric{ counter: edgeCounter, meterProvider: meterProvider, }, nil } - -func (o *OtelExporter) countDiscoveredIntents(ctx context.Context) { - for _, intent := range o.intentsHolder.GetNewIntentsSinceLastGet() { - clientName := intent.Intent.Client.Name - serverName := intent.Intent.Server.Name - logrus.Debugf("incremeting otel metric counter: %s -> %s", clientName, serverName) - o.counter.Add(ctx, 1, metric.WithAttributes(attribute.String(ClientAttributeName, clientName), attribute.String(ServerAttributeName, serverName))) - } -} - -func (o *OtelExporter) PeriodicIntentsExport(ctx context.Context) { - for { - select { - case <-time.After(o.config.ExportInterval): - o.countDiscoveredIntents(ctx) - - case <-ctx.Done(): - o.meterProvider.Shutdown(ctx) - return - } - } -} diff --git a/src/mapper/pkg/otelexporter/otel_exporter_test.go b/src/mapper/pkg/otelexporter/otel_exporter_test.go deleted file mode 100644 index 269ac412..00000000 --- a/src/mapper/pkg/otelexporter/otel_exporter_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package otelexporter - -import ( - "context" - "testing" - "time" - - "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" - "github.com/otterize/network-mapper/src/mapper/pkg/intentsstore" - "github.com/stretchr/testify/suite" -) - -var ( - testTimestamp = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) -) - -type OTelExporterTestSuite struct { - suite.Suite - testNamespace string - intentsHolder *intentsstore.IntentsHolder - otelExporter *OtelExporter -} - -func (o *OTelExporterTestSuite) SetupTest() { - o.testNamespace = "default" - o.intentsHolder = intentsstore.NewIntentsHolder() -} - -func (o *OTelExporterTestSuite) BeforeTest(_, testName string) { - otelExporter, _ := NewOtelExporter(context.Background(), o.intentsHolder, Config{ExportInterval: 1 * time.Second}) - o.otelExporter = otelExporter -} - -func (o *OTelExporterTestSuite) addIntent(source string, srcNamespace string, destination string, dstNamespace string) { - o.intentsHolder.AddIntent( - testTimestamp, - model.Intent{ - Client: &model.OtterizeServiceIdentity{Name: source, Namespace: srcNamespace}, - Server: &model.OtterizeServiceIdentity{Name: destination, Namespace: dstNamespace}, - }, - ) -} - -func (o *OTelExporterTestSuite) TestExportIntents() { - o.addIntent("client1", o.testNamespace, "server1", o.testNamespace) - o.addIntent("client1", o.testNamespace, "server2", "external-namespace") - - o.otelExporter.countDiscoveredIntents(context.Background()) -} - -func TestRunSuite(t *testing.T) { - suite.Run(t, new(OTelExporterTestSuite)) -}