Skip to content

Commit

Permalink
Merge branch 'main' of github.com:kubescape/node-agent into feature/t…
Browse files Browse the repository at this point in the history
…hirdparty_enricher
  • Loading branch information
afek854 committed Nov 4, 2024
2 parents c0b7545 + 2b2bc24 commit a27eff8
Show file tree
Hide file tree
Showing 23 changed files with 3,193 additions and 439 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/component-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ jobs:
Test_01_BasicAlertTest,
Test_02_AllAlertsFromMaliciousApp,
Test_03_BasicLoadActivities,
Test_04_MemoryLeak,
# Test_04_MemoryLeak,
Test_05_MemoryLeak_10K_Alerts,
Test_06_KillProcessInTheMiddle,
Test_07_RuleBindingApplyTest,
Test_08_ApplicationProfilePatching,
Test_10_MalwareDetectionTest,
Test_11_EndpointTest,
# Test_10_DemoTest
# Test_11_DuplicationTest
Test_12_MergingProfilesTest,
Test_13_MergingNetworkNeighborhoodTest,
]
steps:
- name: Checkout code
Expand Down
3 changes: 2 additions & 1 deletion clamav/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ else
if [ -S "/tmp/clamd.sock" ]; then
unlink "/tmp/clamd.sock"
fi
clamd --foreground &
# Run clamd in the foreground but redirecting output to stdout and stderr to /dev/null
clamd --foreground > /dev/null 2>&1 &
while [ ! -S "/run/clamav/clamd.sock" ] && [ ! -S "/tmp/clamd.sock" ]; do
if [ "${_timeout:=0}" -gt "${CLAMD_STARTUP_TIMEOUT:=1800}" ]; then
echo
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,5 @@ require (
replace github.com/inspektor-gadget/inspektor-gadget => /home/afek/Projects/Armo/poc/inspektor-gadget

replace github.com/vishvananda/netns => github.com/inspektor-gadget/netns v0.0.5-0.20230524185006-155d84c555d6

replace github.com/goradd/maps => github.com/matthyx/maps v0.0.0-20241029072232-2f5d83d608a7
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/Q
github.com/gopacket/gopacket v1.2.0 h1:eXbzFad7f73P1n2EJHQlsKuvIMJjVXK5tXoSca78I3A=
github.com/gopacket/gopacket v1.2.0/go.mod h1:BrAKEy5EOGQ76LSqh7DMAr7z0NNPdczWm2GxCG7+I8M=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/goradd/maps v0.1.5 h1:Ut7BPJgNy5BYbleI3LswVJJquiM8X5uN0ZuZBHSdRUI=
github.com/goradd/maps v0.1.5/go.mod h1:E5X1CHMgfVm1qFTHgXpgVLVylO5wtlhZdB93dRGjnc0=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
Expand Down Expand Up @@ -566,6 +564,8 @@ github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3v
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matthyx/maps v0.0.0-20241029072232-2f5d83d608a7 h1:LAAFb3ra/vxiZcDY1zrbS29oqnB+N9MknuQZC1ju2+A=
github.com/matthyx/maps v0.0.0-20241029072232-2f5d83d608a7/go.mod h1:E5X1CHMgfVm1qFTHgXpgVLVylO5wtlhZdB93dRGjnc0=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand Down
16 changes: 10 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/kubescape/node-agent/pkg/objectcache/k8scache"
"github.com/kubescape/node-agent/pkg/objectcache/networkneighborhoodcache"
objectcachev1 "github.com/kubescape/node-agent/pkg/objectcache/v1"
"github.com/kubescape/node-agent/pkg/processmanager"
processmanagerv1 "github.com/kubescape/node-agent/pkg/processmanager/v1"
"github.com/kubescape/node-agent/pkg/relevancymanager"
relevancymanagerv1 "github.com/kubescape/node-agent/pkg/relevancymanager/v1"
rulebinding "github.com/kubescape/node-agent/pkg/rulebindingmanager"
Expand Down Expand Up @@ -193,26 +195,27 @@ func main() {
var networkManagerClient networkmanager.NetworkManagerClient
var dnsManagerClient dnsmanager.DNSManagerClient
var dnsResolver dnsmanager.DNSResolver
if cfg.EnableNetworkTracing {
if cfg.EnableNetworkTracing || cfg.EnableRuntimeDetection {
dnsManager := dnsmanager.CreateDNSManager()
dnsManagerClient = dnsManager
// NOTE: dnsResolver is set for threat detection.
dnsResolver = dnsManager
networkManagerClient = networkmanagerv2.CreateNetworkManager(ctx, cfg, clusterData.ClusterName, k8sClient, storageClient, dnsManager, preRunningContainersIDs, k8sObjectCache)
} else {
if cfg.EnableRuntimeDetection {
logger.L().Ctx(ctx).Fatal("Network tracing is disabled, but runtime detection is enabled. Network tracing is required for runtime detection.")
}
dnsManagerClient = dnsmanager.CreateDNSManagerMock()
dnsResolver = dnsmanager.CreateDNSManagerMock()
networkManagerClient = networkmanager.CreateNetworkManagerMock()
}

var ruleManager rulemanager.RuleManagerClient
var processManager processmanager.ProcessManagerClient
var objCache objectcache.ObjectCache
var ruleBindingNotify chan rulebinding.RuleBindingNotify

if cfg.EnableRuntimeDetection {
// create the process manager
processManager = processmanagerv1.CreateProcessManager(ctx)

// create ruleBinding cache
ruleBindingCache := rulebindingcachev1.NewCache(nodeName, k8sClient)
dWatcher.AddAdaptor(ruleBindingCache)
Expand All @@ -235,7 +238,7 @@ func main() {
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, nodeName)

// create runtimeDetection managers
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName)
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName, processManager, nil)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating RuleManager", helpers.Error(err))
}
Expand All @@ -244,6 +247,7 @@ func main() {
ruleManager = rulemanager.CreateRuleManagerMock()
objCache = objectcache.NewObjectCacheMock()
ruleBindingNotify = make(chan rulebinding.RuleBindingNotify, 1)
processManager = processmanager.CreateProcessManagerMock()
}

// Create the node profile manager
Expand All @@ -269,7 +273,7 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, nil)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, nil, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
41 changes: 40 additions & 1 deletion pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"regexp"
"runtime"
"strings"
"time"

"github.com/armosec/utils-k8s-go/wlid"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/kubescape/storage/pkg/registry/file/dynamicpathdetector"
storageUtils "github.com/kubescape/storage/pkg/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -34,6 +37,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const OpenDynamicThreshold = 50

var procRegex = regexp.MustCompile(`^/proc/\d+`)

type ApplicationProfileManager struct {
cfg config.Config
clusterName string
Expand Down Expand Up @@ -538,7 +545,34 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
return true
})
// record saved opens
toSaveOpens.Range(utils.SetInMap(am.savedOpens.Get(watchedContainer.K8sContainerID)))
savedOpens := am.savedOpens.Get(watchedContainer.K8sContainerID)
toSaveOpens.Range(utils.SetInMap(savedOpens))
// use a dynamic path detector to compress opens
analyzer := dynamicpathdetector.NewPathAnalyzer(OpenDynamicThreshold)
keys := savedOpens.Keys()
// first pass to learn the opens
for _, path := range keys {
_, _ = dynamicpathdetector.AnalyzeOpen(path, analyzer)
}
// second pass to compress the opens
for _, path := range keys {
result, err := dynamicpathdetector.AnalyzeOpen(path, analyzer)
if err != nil {
continue
}
if result != path {
// path becomes compressed
// we avoid a lock by using Pop to remove path and retrieve its flags
pathFlags := savedOpens.Pop(path)
if savedOpens.Has(result) {
// merge flags
savedOpens.Get(result).Append(pathFlags.ToSlice()...)
} else {
// create new entry
savedOpens.Set(result, pathFlags)
}
}
}
logger.L().Debug("ApplicationProfileManager - saved application profile",
helpers.Int("capabilities", len(capabilities)),
helpers.Int("endpoints", toSaveEndpoints.Len()),
Expand Down Expand Up @@ -675,6 +709,11 @@ func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string,
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
// deduplicate /proc/1234/* into /proc/.../* (quite a common case)
// we perform it here instead of waiting for compression
if strings.HasPrefix(path, "/proc/") {
path = procRegex.ReplaceAllString(path, "/proc/"+dynamicpathdetector.DynamicIdentifier)
}
// check if we already have this open
savedOpens := am.savedOpens.Get(k8sContainerID)
if savedOpens.Has(path) && savedOpens.Get(path).Contains(flags...) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"net/http"
"net/url"
"sort"
"strings"
"testing"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/node-agent/pkg/config"
Expand All @@ -19,6 +21,7 @@ import (
"github.com/kubescape/node-agent/pkg/seccompmanager"
"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/kubescape/storage/pkg/registry/file/dynamicpathdetector"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -274,3 +277,21 @@ func sortHTTPEndpoints(endpoints []v1beta1.HTTPEndpoint) {
return string(endpoints[i].Headers) < string(endpoints[j].Headers)
})
}

func BenchmarkReportFileOpen(b *testing.B) {
savedOpens := maps.SafeMap[string, mapset.Set[string]]{}
savedOpens.Set("/proc/"+dynamicpathdetector.DynamicIdentifier+"/foo/bar", mapset.NewSet("O_LARGEFILE", "O_RDONLY"))
paths := []string{"/proc/12345/foo/bar", "/bin/ls", "/etc/passwd"}
flags := []string{"O_CLOEXEC", "O_RDONLY"}
for i := 0; i < b.N; i++ {
for _, path := range paths {
if strings.HasPrefix(path, "/proc/") {
path = procRegex.ReplaceAllString(path, "/proc/"+dynamicpathdetector.DynamicIdentifier)
}
if savedOpens.Has(path) && savedOpens.Get(path).Contains(flags...) {
continue
}
}
}
b.ReportAllocs()
}
15 changes: 13 additions & 2 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
tracersshtype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/ssh/types"
tracersymlink "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/tracer"
tracersymlinktype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/types"
"github.com/kubescape/node-agent/pkg/processmanager"

"github.com/kubescape/node-agent/pkg/malwaremanager"
"github.com/kubescape/node-agent/pkg/metricsmanager"
"github.com/kubescape/node-agent/pkg/networkmanager"
Expand Down Expand Up @@ -153,11 +155,13 @@ type IGContainerWatcher struct {
ruleBindingPodNotify *chan rulebinding.RuleBindingNotify
// container runtime
runtime *containerutilsTypes.RuntimeConfig
// process manager
processManager processmanager.ProcessManagerClient
}

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], thirdPartyEnricher containerwatcher.ThirdPartyEnricher) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], thirdPartyEnricher containerwatcher.ThirdPartyEnricher, processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand Down Expand Up @@ -207,6 +211,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
ruleManager.ReportEvent(utils.ExecveEventType, &event)
malwareManager.ReportEvent(utils.ExecveEventType, &event)
metrics.ReportEvent(utils.ExecveEventType)
processManager.ReportEvent(utils.ExecveEventType, &event)
applicationProfileManager.ReportFileExec(k8sContainerID, path, event.Args)
relevancyManager.ReportFileExec(event.Runtime.ContainerID, k8sContainerID, path)

Expand Down Expand Up @@ -453,6 +458,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
thirdPartyTracers: mapset.NewSet[containerwatcher.CustomTracer](),
thirdPartyContainerReceivers: mapset.NewSet[containerwatcher.ContainerReceiver](),
thirdPartyEnricher: thirdPartyEnricher,
processManager: processManager,
}, nil
}

Expand Down Expand Up @@ -498,11 +504,16 @@ func (ch *IGContainerWatcher) UnregisterContainerReceiver(receiver containerwatc

func (ch *IGContainerWatcher) Start(ctx context.Context) error {
if !ch.running {

if err := ch.startContainerCollection(ctx); err != nil {
return fmt.Errorf("setting up container collection: %w", err)
}

// We want to populate the initial processes before starting the tracers but after retrieving the shims.
if err := ch.processManager.PopulateInitialProcesses(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("populating initial processes: %w", err)
}

if err := ch.startTracers(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("starting app behavior tracing: %w", err)
Expand Down
15 changes: 7 additions & 8 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ func (ch *IGContainerWatcher) containerCallback(notif containercollection.PubSub

k8sContainerID := utils.CreateK8sContainerID(notif.Container.K8s.Namespace, notif.Container.K8s.PodName, notif.Container.K8s.ContainerName)

if !ch.preRunningContainersIDs.Contains(notif.Container.Runtime.ContainerID) {
// container is not in preRunningContainersIDs, it is a new container
ch.timeBasedContainers.Add(notif.Container.Runtime.ContainerID)
}

switch notif.Type {
case containercollection.EventTypeAddContainer:
logger.L().Info("start monitor on container", helpers.String("container ID", notif.Container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))

if ch.running {
ch.timeBasedContainers.Add(notif.Container.Runtime.ContainerID)
} else {
ch.preRunningContainersIDs.Add(notif.Container.Runtime.ContainerID)
}
// Check if Pod has a label of max sniffing time
sniffingTime := utils.AddJitter(ch.cfg.MaxSniffingTime, ch.cfg.MaxJitterPercentage)
if podLabelMaxSniffingTime, ok := notif.Container.K8s.PodLabels[MaxSniffingTimeLabel]; ok {
Expand Down Expand Up @@ -87,6 +86,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
ch.networkManager.ContainerCallback,
ch.malwareManager.ContainerCallback,
ch.ruleManager.ContainerCallback,
ch.processManager.ContainerCallback,
}

for receiver := range ch.thirdPartyContainerReceivers.Iter() {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
return nil
}

func (ch *IGContainerWatcher) startRunningContainers() error {
func (ch *IGContainerWatcher) startRunningContainers() {
k8sClient, err := containercollection.NewK8sClient(ch.nodeName)
if err != nil {
logger.L().Fatal("creating IG Kubernetes client", helpers.Error(err))
Expand All @@ -139,7 +139,6 @@ func (ch *IGContainerWatcher) startRunningContainers() error {
for n := range *ch.ruleBindingPodNotify {
ch.addRunningContainers(k8sClient, &n)
}
return nil
}

func (ch *IGContainerWatcher) addRunningContainers(k8sClient IGK8sClient, notf *rulebindingmanager.RuleBindingNotify) {
Expand Down
Loading

0 comments on commit a27eff8

Please sign in to comment.