Skip to content

Commit

Permalink
Merge pull request #367 from kubescape/seq
Browse files Browse the repository at this point in the history
add random delay when fetching full AP and NN
  • Loading branch information
matthyx authored Sep 16, 2024
2 parents 140be31 + 9ba12c2 commit 710ec05
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 42 deletions.
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ func main() {
ruleBindingNotify = make(chan rulebinding.RuleBindingNotify, 100)
ruleBindingCache.AddNotifier(&ruleBindingNotify)

apc := applicationprofilecache.NewApplicationProfileCache(nodeName, storageClient.StorageClient)
apc := applicationprofilecache.NewApplicationProfileCache(nodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
dWatcher.AddAdaptor(apc)

nnc := networkneighborhoodcache.NewNetworkNeighborhoodCache(nodeName, storageClient.StorageClient)
nnc := networkneighborhoodcache.NewNetworkNeighborhoodCache(nodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
dWatcher.AddAdaptor(nnc)

dc := dnscache.NewDnsCache(dnsResolver)
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
InitialDelay time.Duration `mapstructure:"initialDelay"`
MaxSniffingTime time.Duration `mapstructure:"maxSniffingTimePerContainer"`
UpdateDataPeriod time.Duration `mapstructure:"updateDataPeriod"`
MaxDelaySeconds int `mapstructure:"maxDelaySeconds"`
MaxJitterPercentage int `mapstructure:"maxJitterPercentage"`
EnableFullPathTracing bool `mapstructure:"fullPathTracingEnabled"`
EnableApplicationProfile bool `mapstructure:"applicationProfileServiceEnabled"`
Expand All @@ -41,6 +42,7 @@ func LoadConfig(path string) (Config, error) {
viper.SetDefault("fullPathTracingEnabled", true)
viper.SetDefault("initialDelay", 2*time.Minute)
viper.SetDefault("nodeProfileInterval", 10*time.Minute)
viper.SetDefault("maxDelaySeconds", 30)
viper.SetDefault("maxJitterPercentage", 5)

viper.AutomaticEnv()
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestLoadConfig(t *testing.T) {
MaxSniffingTime: 6 * time.Hour,
UpdateDataPeriod: 1 * time.Minute,
NodeProfileInterval: 1 * time.Minute,
MaxDelaySeconds: 30,
MaxJitterPercentage: 5,
EnablePrometheusExporter: true,
EnableRuntimeDetection: true,
Expand Down
32 changes: 21 additions & 11 deletions pkg/objectcache/applicationprofilecache/applicationprofilecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package applicationprofilecache
import (
"context"
"fmt"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
Expand All @@ -12,6 +13,7 @@ import (
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/node-agent/pkg/watcher"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
versioned "github.com/kubescape/storage/pkg/generated/clientset/versioned/typed/softwarecomposition/v1beta1"
Expand Down Expand Up @@ -52,11 +54,13 @@ type ApplicationProfileCacheImpl struct {
storageClient versioned.SpdxV1beta1Interface
allProfiles mapset.Set[string] // cache all the application profiles that are ready. this will enable removing from cache AP without pods that are running on the same node
nodeName string
maxDelaySeconds int // maximum delay in seconds before getting the full object from the storage
}

func NewApplicationProfileCache(nodeName string, storageClient versioned.SpdxV1beta1Interface) *ApplicationProfileCacheImpl {
func NewApplicationProfileCache(nodeName string, storageClient versioned.SpdxV1beta1Interface, maxDelaySeconds int) *ApplicationProfileCacheImpl {
return &ApplicationProfileCacheImpl{
nodeName: nodeName,
maxDelaySeconds: maxDelaySeconds,
storageClient: storageClient,
containerToSlug: maps.SafeMap[string, string]{},
slugToContainers: maps.SafeMap[string, mapset.Set[string]]{},
Expand Down Expand Up @@ -233,18 +237,24 @@ func (ap *ApplicationProfileCacheImpl) addApplicationProfile(_ context.Context,
if ap.slugToContainers.Has(apName) {
// get the full application profile from the storage
// the watch only returns the metadata
fullAP, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Error("failed to get full application profile", helpers.Error(err))
return
}
ap.slugToAppProfile.Set(apName, fullAP)
for _, i := range ap.slugToContainers.Get(apName).ToSlice() {
ap.containerToSlug.Set(i, apName)
}
// avoid thundering herd problem by adding a random delay
time.AfterFunc(utils.RandomDuration(ap.maxDelaySeconds, time.Second), func() {
ap.addFullApplicationProfile(appProfile, apName)
})
}
}

logger.L().Debug("added pod to application profile cache", helpers.String("name", apName))
func (ap *ApplicationProfileCacheImpl) addFullApplicationProfile(appProfile *v1beta1.ApplicationProfile, apName string) {
fullAP, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Error("failed to get full application profile", helpers.Error(err))
return
}
ap.slugToAppProfile.Set(apName, fullAP)
for _, i := range ap.slugToContainers.Get(apName).ToSlice() {
ap.containerToSlug.Set(i, apName)
}
logger.L().Debug("added pod to application profile cache", helpers.String("name", apName))
}

func (ap *ApplicationProfileCacheImpl) deleteApplicationProfile(obj runtime.Object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"slices"
"testing"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/kubescape/node-agent/mocks"
Expand Down Expand Up @@ -80,7 +81,7 @@ func Test_AddHandlers(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tt.obj.(metav1.Object).SetNamespace("default")
storageClient := fake.NewSimpleClientset().SpdxV1beta1()
ap := NewApplicationProfileCache("", storageClient)
ap := NewApplicationProfileCache("", storageClient, 0)
ap.slugToContainers.Set(tt.slug, mapset.NewSet[string]())

tt.f(ap, context.Background(), tt.obj)
Expand Down Expand Up @@ -179,7 +180,7 @@ func Test_addApplicationProfile(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

ap := NewApplicationProfileCache("", storageClient)
ap := NewApplicationProfileCache("", storageClient, 0)

for i := range tt.preCreatedPods {
ap.addPod(tt.preCreatedPods[i])
Expand All @@ -189,6 +190,7 @@ func Test_addApplicationProfile(t *testing.T) {
}

ap.addApplicationProfile(context.Background(), tt.obj)
time.Sleep(1 * time.Second) // add is async

// test if the application profile is added to the cache
apName := objectcache.MetaUniqueName(tt.obj.(metav1.Object))
Expand Down Expand Up @@ -253,7 +255,7 @@ func Test_deleteApplicationProfile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ap := NewApplicationProfileCache("", nil)
ap := NewApplicationProfileCache("", nil, 0)

ap.allProfiles.Append(tt.slugs...)
for _, i := range tt.slugs {
Expand Down Expand Up @@ -316,7 +318,7 @@ func Test_deletePod(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ap := NewApplicationProfileCache("", nil)
ap := NewApplicationProfileCache("", nil, 0)
for _, i := range tt.otherSlugs {
ap.slugToContainers.Set(i, mapset.NewSet[string]())
ap.slugToAppProfile.Set(i, &v1beta1.ApplicationProfile{})
Expand Down Expand Up @@ -424,7 +426,7 @@ func Test_GetApplicationProfile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ap := NewApplicationProfileCache("", fake.NewSimpleClientset().SpdxV1beta1())
ap := NewApplicationProfileCache("", fake.NewSimpleClientset().SpdxV1beta1(), 0)

for _, c := range tt.pods {
ap.containerToSlug.Set(c.containerID, c.slug)
Expand Down Expand Up @@ -503,7 +505,7 @@ func Test_addApplicationProfile_existing(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

ap := NewApplicationProfileCache("", storageClient)
ap := NewApplicationProfileCache("", storageClient, 0)

// add pods
for i := range tt.pods {
Expand All @@ -512,6 +514,7 @@ func Test_addApplicationProfile_existing(t *testing.T) {
}

ap.addApplicationProfile(context.Background(), tt.obj1)
time.Sleep(1 * time.Second) // add is async
ap.addApplicationProfile(context.Background(), tt.obj2)

// test if the application profile is added to the cache
Expand Down Expand Up @@ -580,7 +583,7 @@ func Test_getApplicationProfile(t *testing.T) {
}

func Test_WatchResources(t *testing.T) {
ap := NewApplicationProfileCache("test-node", nil)
ap := NewApplicationProfileCache("test-node", nil, 0)

expectedPodWatchResource := watcher.NewWatchResource(schema.GroupVersionResource{
Group: "",
Expand Down Expand Up @@ -701,9 +704,10 @@ func Test_addPod(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

ap := NewApplicationProfileCache("", storageClient)
ap := NewApplicationProfileCache("", storageClient, 0)

ap.addApplicationProfile(context.Background(), tt.preCreatedAP)
time.Sleep(1 * time.Second) // add is async

tt.obj.(metav1.Object).SetNamespace(namespace)
ap.addPod(tt.obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package networkneighborhoodcache
import (
"context"
"fmt"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
Expand All @@ -12,6 +13,7 @@ import (
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/node-agent/pkg/watcher"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
versioned "github.com/kubescape/storage/pkg/generated/clientset/versioned/typed/softwarecomposition/v1beta1"
Expand Down Expand Up @@ -52,11 +54,13 @@ type NetworkNeighborhoodCacheImpl struct {
storageClient versioned.SpdxV1beta1Interface
allNetworkNeighborhoods mapset.Set[string] // cache all the NN that are ready. this will enable removing from cache NN without pods that are running on the same node
nodeName string
maxDelaySeconds int // maximum delay in seconds before getting the full object from the storage
}

func NewNetworkNeighborhoodCache(nodeName string, storageClient versioned.SpdxV1beta1Interface) *NetworkNeighborhoodCacheImpl {
func NewNetworkNeighborhoodCache(nodeName string, storageClient versioned.SpdxV1beta1Interface, maxDelaySeconds int) *NetworkNeighborhoodCacheImpl {
return &NetworkNeighborhoodCacheImpl{
nodeName: nodeName,
maxDelaySeconds: maxDelaySeconds,
storageClient: storageClient,
containerToSlug: maps.SafeMap[string, string]{},
slugToContainers: maps.SafeMap[string, mapset.Set[string]]{},
Expand Down Expand Up @@ -233,18 +237,24 @@ func (nn *NetworkNeighborhoodCacheImpl) addNetworkNeighborhood(_ context.Context
if nn.slugToContainers.Has(nnName) {
// get the full network neighborhood from the storage
// the watch only returns the metadata
fullNN, err := nn.getNetworkNeighborhood(netNeighborhood.GetNamespace(), netNeighborhood.GetName())
if err != nil {
logger.L().Error("failed to get full network neighborhood", helpers.Error(err))
return
}
nn.slugToNetworkNeighborhood.Set(nnName, fullNN)
for _, i := range nn.slugToContainers.Get(nnName).ToSlice() {
nn.containerToSlug.Set(i, nnName)
}
// avoid thundering herd problem by adding a random delay
time.AfterFunc(utils.RandomDuration(nn.maxDelaySeconds, time.Second), func() {
nn.addFullNetworkNeighborhood(netNeighborhood, nnName)
})
}
}

logger.L().Debug("added pod to network neighborhood cache", helpers.String("name", nnName))
func (nn *NetworkNeighborhoodCacheImpl) addFullNetworkNeighborhood(netNeighborhood *v1beta1.NetworkNeighborhood, nnName string) {
fullNN, err := nn.getNetworkNeighborhood(netNeighborhood.GetNamespace(), netNeighborhood.GetName())
if err != nil {
logger.L().Error("failed to get full network neighborhood", helpers.Error(err))
return
}
nn.slugToNetworkNeighborhood.Set(nnName, fullNN)
for _, i := range nn.slugToContainers.Get(nnName).ToSlice() {
nn.containerToSlug.Set(i, nnName)
}
logger.L().Debug("added pod to network neighborhood cache", helpers.String("name", nnName))
}

func (nn *NetworkNeighborhoodCacheImpl) deleteNetworkNeighborhood(obj runtime.Object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"slices"
"testing"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/kubescape/node-agent/mocks"
Expand Down Expand Up @@ -80,7 +81,7 @@ func Test_AddHandlers(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tt.obj.(metav1.Object).SetNamespace("default")
storageClient := fake.NewSimpleClientset().SpdxV1beta1()
nn := NewNetworkNeighborhoodCache("", storageClient)
nn := NewNetworkNeighborhoodCache("", storageClient, 0)
nn.slugToContainers.Set(tt.slug, mapset.NewSet[string]())

tt.f(nn, context.Background(), tt.obj)
Expand Down Expand Up @@ -179,7 +180,7 @@ func Test_addNetworkNeighborhood(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

nn := NewNetworkNeighborhoodCache("", storageClient)
nn := NewNetworkNeighborhoodCache("", storageClient, 0)

for i := range tt.preCreatedPods {
nn.addPod(tt.preCreatedPods[i])
Expand All @@ -189,6 +190,7 @@ func Test_addNetworkNeighborhood(t *testing.T) {
}

nn.addNetworkNeighborhood(context.Background(), tt.obj)
time.Sleep(1 * time.Second) // add is async

// test if the network neighborhood is added to the cache
apName := objectcache.MetaUniqueName(tt.obj.(metav1.Object))
Expand Down Expand Up @@ -253,7 +255,7 @@ func Test_deleteNetworkNeighborhood(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nn := NewNetworkNeighborhoodCache("", nil)
nn := NewNetworkNeighborhoodCache("", nil, 0)

nn.allNetworkNeighborhoods.Append(tt.slugs...)
for _, i := range tt.slugs {
Expand Down Expand Up @@ -316,7 +318,7 @@ func Test_deletePod(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nn := NewNetworkNeighborhoodCache("", nil)
nn := NewNetworkNeighborhoodCache("", nil, 0)
for _, i := range tt.otherSlugs {
nn.slugToContainers.Set(i, mapset.NewSet[string]())
nn.slugToNetworkNeighborhood.Set(i, &v1beta1.NetworkNeighborhood{})
Expand Down Expand Up @@ -424,7 +426,7 @@ func Test_GetNetworkNeighborhood(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nn := NewNetworkNeighborhoodCache("", fake.NewSimpleClientset().SpdxV1beta1())
nn := NewNetworkNeighborhoodCache("", fake.NewSimpleClientset().SpdxV1beta1(), 0)

for _, c := range tt.pods {
nn.containerToSlug.Set(c.containerID, c.slug)
Expand Down Expand Up @@ -503,7 +505,7 @@ func Test_addNetworkNeighborhood_existing(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

nn := NewNetworkNeighborhoodCache("", storageClient)
nn := NewNetworkNeighborhoodCache("", storageClient, 0)

// add pods
for i := range tt.pods {
Expand All @@ -512,6 +514,7 @@ func Test_addNetworkNeighborhood_existing(t *testing.T) {
}

nn.addNetworkNeighborhood(context.Background(), tt.obj1)
time.Sleep(1 * time.Second) // add is async
nn.addNetworkNeighborhood(context.Background(), tt.obj2)

// test if the network neighborhood is added to the cache
Expand Down Expand Up @@ -580,7 +583,7 @@ func Test_getNetworkNeighborhood(t *testing.T) {
}

func Test_WatchResources(t *testing.T) {
nn := NewNetworkNeighborhoodCache("test-node", nil)
nn := NewNetworkNeighborhoodCache("test-node", nil, 0)

expectedPodWatchResource := watcher.NewWatchResource(schema.GroupVersionResource{
Group: "",
Expand Down Expand Up @@ -701,9 +704,10 @@ func Test_addPod(t *testing.T) {

storageClient := fake.NewSimpleClientset(runtimeObjs...).SpdxV1beta1()

nn := NewNetworkNeighborhoodCache("", storageClient)
nn := NewNetworkNeighborhoodCache("", storageClient, 0)

nn.addNetworkNeighborhood(context.Background(), tt.preCreatedNN)
time.Sleep(1 * time.Second) // add is async

tt.obj.(metav1.Object).SetNamespace(namespace)
nn.addPod(tt.obj)
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func AddJitter(duration time.Duration, maxJitterPercentage int) time.Duration {
return duration * time.Duration(jitter)
}

// RandomDuration returns a duration between 1/2 max and max
func RandomDuration(max int, duration time.Duration) time.Duration {
// we don't initialize the seed, so we will get the same sequence of random numbers every time
mini := max / 2
return time.Duration(rand.Intn(1+max-mini)+mini) * duration
}

func Atoi(s string) int {
i, err := strconv.Atoi(s)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions tests/chart/templates/node-agent/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ data:
"runtimeDetectionEnabled": {{ eq .Values.capabilities.runtimeDetection "enable" }},
"networkServiceEnabled": {{ eq .Values.capabilities.networkPolicyService "enable" }},
"malwareDetectionEnabled": {{ eq .Values.capabilities.malwareDetection "enable" }},
"InitialDelay": "{{ .Values.nodeAgent.config.learningPeriod }}",
"initialDelay": "{{ .Values.nodeAgent.config.learningPeriod }}",
"updateDataPeriod": "{{ .Values.nodeAgent.config.updatePeriod }}",
"maxDelaySeconds": "{{ .Values.nodeAgent.config.maxDelaySeconds }}",
"maxSniffingTimePerContainer": "{{ .Values.nodeAgent.config.maxLearningPeriod }}",
"exporters": {
"httpExporterConfig": {{- .Values.nodeAgent.config.httpExporterConfig | toJson }},
Expand All @@ -37,4 +38,4 @@ data:
{{ .Files.Get "clamav/clamd.conf" | indent 4 }}
freshclam.conf: |-
{{ .Files.Get "clamav/freshclam.conf" | indent 4 }}
{{- end }}
{{- end }}
Loading

0 comments on commit 710ec05

Please sign in to comment.