Skip to content

Commit

Permalink
use callbacks, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
smithclay committed Oct 3, 2023
1 parent fe6c234 commit 42da5b3
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 126 deletions.
1 change: 1 addition & 0 deletions src/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions src/mapper/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/otterize/network-mapper/src/mapper/pkg/config"
"github.com/otterize/network-mapper/src/mapper/pkg/intentsstore"
"github.com/otterize/network-mapper/src/mapper/pkg/kubefinder"
"github.com/otterize/network-mapper/src/mapper/pkg/otelexporter"
"github.com/otterize/network-mapper/src/mapper/pkg/metricexporter"
"github.com/otterize/network-mapper/src/mapper/pkg/resolvers"
sharedconfig "github.com/otterize/network-mapper/src/shared/config"
"github.com/otterize/network-mapper/src/shared/kubeutils"
Expand Down Expand Up @@ -119,21 +119,27 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize cloud client")
}

cloudUploaderConfig := clouduploader.ConfigFromViper()
if cloudEnabled {
cloudUploaderConfig := clouduploader.ConfigFromViper()
cloudUploader := clouduploader.NewCloudUploader(intentsHolder, cloudUploaderConfig, cloudClient)
go cloudUploader.PeriodicIntentsUpload(cloudClientCtx)
intentsHolder.RegisterGetCallback(cloudUploader.GetIntentCallback)
go cloudUploader.PeriodicStatusReport(cloudClientCtx)
}

otelCtx, otelCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer otelCancel()
otelExporterConfig := otelexporter.ConfigFromViper()
otelExporter, err := otelexporter.NewOtelExporter(otelCtx, intentsHolder, otelExporterConfig)
otelExporterConfig := metricexporter.ConfigFromViper()
otelExporter, err := metricexporter.NewMetricExporter(otelCtx, otelExporterConfig)
intentsHolder.RegisterGetCallback(otelExporter.GetIntentCallback)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize otel exporter")
}
go otelExporter.PeriodicIntentsExport(otelCtx)

// start intent discover and notify callbacks of new intents
ihCtx, ihCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer ihCancel()
go intentsHolder.PeriodicIntentsUpload(ihCtx, cloudUploaderConfig.UploadInterval)

telemetrysender.SetGlobalVersion(version.Version())
telemetrysender.SendNetworkMapper(telemetriesgql.EventTypeStarted, 1)
Expand Down
21 changes: 4 additions & 17 deletions src/mapper/pkg/clouduploader/cloud_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package clouduploader

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/otterize/network-mapper/src/mapper/pkg/cloudclient"
"github.com/otterize/network-mapper/src/mapper/pkg/graph/model"
"github.com/otterize/network-mapper/src/mapper/pkg/intentsstore"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"time"
)

type CloudUploader struct {
Expand All @@ -25,10 +26,10 @@ func NewCloudUploader(intentsHolder *intentsstore.IntentsHolder, config Config,
}
}

func (c *CloudUploader) uploadDiscoveredIntents(ctx context.Context) {
func (c *CloudUploader) GetIntentCallback(ctx context.Context, intents []intentsstore.TimestampedIntent) {
logrus.Info("Search for intents")

discoveredIntents := lo.Map(c.intentsHolder.GetNewIntentsSinceLastGet(), func(intent intentsstore.TimestampedIntent, _ int) *cloudclient.DiscoveredIntentInput {
discoveredIntents := lo.Map(intents, func(intent intentsstore.TimestampedIntent, _ int) *cloudclient.DiscoveredIntentInput {
return &cloudclient.DiscoveredIntentInput{
DiscoveredAt: lo.ToPtr(intent.Timestamp),
Intent: &cloudclient.IntentInput{
Expand Down Expand Up @@ -90,20 +91,6 @@ func (c *CloudUploader) reportStatus(ctx context.Context) {
}
}

func (c *CloudUploader) PeriodicIntentsUpload(ctx context.Context) {
logrus.Info("Starting periodic intents upload")

for {
select {
case <-time.After(c.config.UploadInterval):
c.uploadDiscoveredIntents(ctx)

case <-ctx.Done():
return
}
}
}

func (c *CloudUploader) PeriodicStatusReport(ctx context.Context) {
logrus.Info("Starting status reporting")
c.reportStatus(ctx)
Expand Down
23 changes: 12 additions & 11 deletions src/mapper/pkg/clouduploader/cloud_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package clouduploader
import (
"context"
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/otterize/network-mapper/src/mapper/pkg/cloudclient"
cloudclientmocks "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient/mocks"
Expand All @@ -11,8 +14,6 @@ import (
"github.com/otterize/network-mapper/src/mapper/pkg/intentsstore"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -75,7 +76,7 @@ func (s *CloudUploaderTestSuite) TestUploadIntents() {

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents1)).Return(nil).Times(1)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())

s.addIntent("client2", s.testNamespace, "server1", s.testNamespace)

Expand All @@ -85,7 +86,7 @@ func (s *CloudUploaderTestSuite) TestUploadIntents() {

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents2)).Return(nil).Times(1)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
}

func (s *CloudUploaderTestSuite) TestUploadIntentsInBatches() {
Expand All @@ -105,14 +106,14 @@ func (s *CloudUploaderTestSuite) TestUploadIntentsInBatches() {
secondReport,
)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())

}

func (s *CloudUploaderTestSuite) TestDontUploadWithoutIntents() {
s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), gomock.Any()).Times(0)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
}

func (s *CloudUploaderTestSuite) TestUploadSameIntentOnce() {
Expand All @@ -124,11 +125,11 @@ func (s *CloudUploaderTestSuite) TestUploadSameIntentOnce() {

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1)
s.addIntent("client", s.testNamespace, "server", s.testNamespace)
s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
}

func (s *CloudUploaderTestSuite) TestRetryOnFailed() {
Expand All @@ -142,7 +143,7 @@ func (s *CloudUploaderTestSuite) TestRetryOnFailed() {

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
}

func (s *CloudUploaderTestSuite) TestDontUploadWhenNothingNew() {
Expand All @@ -154,8 +155,8 @@ func (s *CloudUploaderTestSuite) TestDontUploadWhenNothingNew() {

s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any(), GetMatcher(intents)).Return(nil).Times(1)

s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.uploadDiscoveredIntents(context.Background())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
s.cloudUploader.GetIntentCallback(context.Background(), s.intentsHolder.GetNewIntentsSinceLastGet())
}

func (s *CloudUploaderTestSuite) TestReportMapperComponent() {
Expand Down
35 changes: 32 additions & 3 deletions src/mapper/pkg/intentsstore/holder.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package intentsstore

import (
"context"
"encoding/json"
"strings"
"sync"
"time"

"github.com/amit7itz/goset"
"github.com/otterize/network-mapper/src/mapper/pkg/config"
"github.com/otterize/network-mapper/src/mapper/pkg/graph/model"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/types"
"strings"
"sync"
"time"
)

type IntentsStoreKey struct {
Expand All @@ -31,13 +33,15 @@ type IntentsHolder struct {
accumulatingStore IntentsStore
sinceLastGetStore IntentsStore
lock sync.Mutex
callbacks []func(context.Context, []TimestampedIntent)
}

func NewIntentsHolder() *IntentsHolder {
return &IntentsHolder{
accumulatingStore: make(IntentsStore),
sinceLastGetStore: make(IntentsStore),
lock: sync.Mutex{},
callbacks: make([]func(context.Context, []TimestampedIntent), 0),
}
}

Expand Down Expand Up @@ -153,6 +157,31 @@ func (i *IntentsHolder) addIntentToStore(store IntentsStore, newTimestamp time.T
store[key] = existingIntent
}

func (i *IntentsHolder) PeriodicIntentsUpload(ctx context.Context, d time.Duration) {
logrus.Info("Starting periodic intents upload")

for {
select {
case <-time.After(d):
if len(i.callbacks) == 0 {
return
}

intents := i.GetNewIntentsSinceLastGet()
for _, callback := range i.callbacks {
callback(ctx, intents)
}

case <-ctx.Done():
return
}
}
}

func (i *IntentsHolder) RegisterGetCallback(callback func(context.Context, []TimestampedIntent)) {
i.callbacks = append(i.callbacks, callback)
}

func (i *IntentsHolder) AddIntent(newTimestamp time.Time, intent model.Intent) {
if config.ExcludedNamespaces().Contains(intent.Client.Namespace) || config.ExcludedNamespaces().Contains(intent.Server.Namespace) {
return
Expand Down
7 changes: 7 additions & 0 deletions src/mapper/pkg/metricexporter/edge_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package metricexporter

import "context"

type EdgeMetric interface {
Record(ctx context.Context, from string, to string)
}
34 changes: 34 additions & 0 deletions src/mapper/pkg/metricexporter/metric_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package metricexporter

import (
"context"

"github.com/otterize/network-mapper/src/mapper/pkg/intentsstore"
"github.com/sirupsen/logrus"
)

type MetricExporter struct {
config Config
edgeMetric EdgeMetric
}

func NewMetricExporter(ctx context.Context, config Config) (*MetricExporter, error) {
em, err := NewOtelEdgeMetric(ctx, config.ExportInterval)
if err != nil {
return nil, err
}

return &MetricExporter{
config: config,
edgeMetric: em,
}, nil
}

func (o *MetricExporter) GetIntentCallback(ctx context.Context, intents []intentsstore.TimestampedIntent) {
for _, intent := range intents {
clientName := intent.Intent.Client.Name
serverName := intent.Intent.Server.Name
logrus.Debugf("recording metric counter: %s -> %s", clientName, serverName)
o.edgeMetric.Record(ctx, clientName, serverName)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package otelexporter
package metricexporter

import (
"time"
Expand Down
60 changes: 60 additions & 0 deletions src/mapper/pkg/metricexporter/metric_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package metricexporter

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/otterize/network-mapper/src/mapper/pkg/graph/model"
"github.com/otterize/network-mapper/src/mapper/pkg/intentsstore"
"github.com/stretchr/testify/suite"
)

type MetricExporterTestSuite struct {
suite.Suite
testNamespace string
intentsHolder *intentsstore.IntentsHolder
edgeMock *MockEdgeMetric
metricExporter *MetricExporter
}

var (
testTimestamp = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
)

func (o *MetricExporterTestSuite) SetupTest() {
o.testNamespace = "default"
o.intentsHolder = intentsstore.NewIntentsHolder()
}

func (o *MetricExporterTestSuite) BeforeTest(s, testName string) {
controller := gomock.NewController(o.T())
o.edgeMock = NewMockEdgeMetric(controller)

metricExporter, _ := NewMetricExporter(context.Background(), Config{ExportInterval: 1 * time.Second})
metricExporter.edgeMetric = o.edgeMock
o.metricExporter = metricExporter
}

func (o *MetricExporterTestSuite) addIntent(source string, srcNamespace string, destination string, dstNamespace string) {
o.intentsHolder.AddIntent(
testTimestamp,
model.Intent{
Client: &model.OtterizeServiceIdentity{Name: source, Namespace: srcNamespace},
Server: &model.OtterizeServiceIdentity{Name: destination, Namespace: dstNamespace},
},
)
}

func (o *MetricExporterTestSuite) TestExportIntents() {
o.addIntent("client1", o.testNamespace, "server1", o.testNamespace)
o.addIntent("client1", o.testNamespace, "server2", "external-namespace")
o.edgeMock.EXPECT().Record(context.Background(), "client1", "server1").Times(1)
o.edgeMock.EXPECT().Record(context.Background(), "client1", "server2").Times(1)
o.metricExporter.GetIntentCallback(context.Background(), o.intentsHolder.GetNewIntentsSinceLastGet())
}

func TestRunSuite(t *testing.T) {
suite.Run(t, new(MetricExporterTestSuite))
}
Loading

0 comments on commit 42da5b3

Please sign in to comment.