Skip to content

Commit

Permalink
Merge pull request #364 from kubescape/feature/private
Browse files Browse the repository at this point in the history
Feature/private
  • Loading branch information
amitschendel authored Sep 18, 2024
2 parents 09407b5 + 273ae09 commit dd2c45a
Show file tree
Hide file tree
Showing 52 changed files with 408 additions and 387 deletions.
1 change: 1 addition & 0 deletions .github/workflows/component-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
Test_07_RuleBindingApplyTest,
Test_08_ApplicationProfilePatching,
Test_10_MalwareDetectionTest,
Test_11_EndpointTest,
# Test_10_DemoTest
# Test_11_DuplicationTest
]
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"syscall"

"github.com/kubescape/node-agent/internal/validator"
"github.com/kubescape/node-agent/pkg/applicationprofilemanager"
applicationprofilemanagerv1 "github.com/kubescape/node-agent/pkg/applicationprofilemanager/v1"
"github.com/kubescape/node-agent/pkg/config"
Expand Down Expand Up @@ -44,6 +43,7 @@ import (
seccompmanagerv1 "github.com/kubescape/node-agent/pkg/seccompmanager/v1"
"github.com/kubescape/node-agent/pkg/storage/v1"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/node-agent/pkg/validator"
"github.com/kubescape/node-agent/pkg/watcher/dynamicwatcher"
"github.com/kubescape/node-agent/pkg/watcher/seccompprofilewatcher"

Expand Down Expand Up @@ -269,7 +269,7 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/containerwatcher/container_watcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,37 @@ package containerwatcher

import (
"context"

containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher"
tracercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/tracer-collection"
"github.com/kubescape/node-agent/pkg/utils"
)

type ContainerWatcher interface {
Ready() bool
Start(ctx context.Context) error
Stop()
GetTracerCollection() *tracercollection.TracerCollection
GetContainerCollection() *containercollection.ContainerCollection
GetSocketEnricher() *socketenricher.SocketEnricher
GetContainerSelector() *containercollection.ContainerSelector
RegisterCustomTracer(tracer CustomTracer) error
UnregisterCustomTracer(tracer CustomTracer) error
RegisterContainerReceiver(receiver ContainerReceiver)
UnregisterContainerReceiver(receiver ContainerReceiver)
}

type CustomTracer interface {
Start() error
Stop() error
Name() string
}

type EventReceiver interface {
ReportEvent(eventType utils.EventType, event utils.K8sEvent)
}

type ContainerReceiver interface {
ContainerCallback(notif containercollection.PubSubEvent)
}
48 changes: 48 additions & 0 deletions pkg/containerwatcher/container_watcher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package containerwatcher

import (
"context"

containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher"
tracercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/tracer-collection"
)

type ContainerWatcherMock struct{}
Expand All @@ -16,4 +20,48 @@ func (c ContainerWatcherMock) Start(_ context.Context) error {

func (c ContainerWatcherMock) Stop() {}

func (c ContainerWatcherMock) RegisterCustomTracer(_ CustomTracer) error {
return nil
}

func (c ContainerWatcherMock) UnregisterCustomTracer(_ CustomTracer) error {
return nil
}

func (c ContainerWatcherMock) RegisterContainerReceiver(_ ContainerReceiver) {}

func (c ContainerWatcherMock) UnregisterContainerReceiver(_ ContainerReceiver) {}

func (c ContainerWatcherMock) GetTracerCollection() *tracercollection.TracerCollection {
return nil
}

func (c ContainerWatcherMock) GetContainerCollection() *containercollection.ContainerCollection {
return nil
}

func (c ContainerWatcherMock) GetSocketEnricher() *socketenricher.SocketEnricher {
return nil
}

func (c ContainerWatcherMock) GetContainerSelector() *containercollection.ContainerSelector {
return nil
}

var _ ContainerWatcher = (*ContainerWatcherMock)(nil)

type CustomTracerMock struct{}

func (c CustomTracerMock) Start() error {
return nil
}

func (c CustomTracerMock) Stop() error {
return nil
}

func (c CustomTracerMock) Name() string {
return ""
}

var _ CustomTracer = (*CustomTracerMock)(nil)
116 changes: 100 additions & 16 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
containerutilsTypes "github.com/inspektor-gadget/inspektor-gadget/pkg/container-utils/types"
tracerseccomp "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/advise/seccomp/tracer"
Expand Down Expand Up @@ -108,6 +109,10 @@ type IGContainerWatcher struct {
httpTracer *tracerhttp.Tracer
kubeIPInstance operators.OperatorInstance
kubeNameInstance operators.OperatorInstance
// Third party tracers
thirdPartyTracers mapset.Set[containerwatcher.CustomTracer]
// Third party container receivers
thirdPartyContainerReceivers mapset.Set[containerwatcher.ContainerReceiver]

// Worker pools
capabilitiesWorkerPool *ants.PoolWithFunc
Expand Down Expand Up @@ -144,7 +149,7 @@ type IGContainerWatcher struct {

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]]) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand All @@ -162,7 +167,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.CapabilitiesEventType)
k8sContainerID := utils.CreateK8sContainerID(event.K8s.Namespace, event.K8s.PodName, event.K8s.ContainerName)
applicationProfileManager.ReportCapability(k8sContainerID, event.CapName)
ruleManager.ReportCapability(event)
ruleManager.ReportEvent(utils.CapabilitiesEventType, &event)

// Report capabilities to event receivers
reportEventToThirdPartyTracers(utils.CapabilitiesEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating capabilities worker pool: %w", err)
Expand All @@ -189,8 +197,11 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.ExecveEventType)
applicationProfileManager.ReportFileExec(k8sContainerID, path, event.Args)
relevancyManager.ReportFileExec(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportFileExec(event)
malwareManager.ReportFileExec(k8sContainerID, event)
ruleManager.ReportEvent(utils.ExecveEventType, &event)
malwareManager.ReportEvent(utils.ExecveEventType, &event)

// Report exec events to event receivers
reportEventToThirdPartyTracers(utils.ExecveEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating exec worker pool: %w", err)
Expand All @@ -217,8 +228,11 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
metrics.ReportEvent(utils.OpenEventType)
applicationProfileManager.ReportFileOpen(k8sContainerID, path, event.Flags)
relevancyManager.ReportFileOpen(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportFileOpen(event)
malwareManager.ReportFileOpen(k8sContainerID, event)
ruleManager.ReportEvent(utils.OpenEventType, &event)
malwareManager.ReportEvent(utils.OpenEventType, &event)

// Report open events to event receivers
reportEventToThirdPartyTracers(utils.OpenEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating open worker pool: %w", err)
Expand All @@ -238,7 +252,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
}
metrics.ReportEvent(utils.NetworkEventType)
networkManagerClient.ReportNetworkEvent(k8sContainerID, event)
ruleManager.ReportNetworkEvent(event)
ruleManager.ReportEvent(utils.NetworkEventType, &event)

// Report network events to event receivers
reportEventToThirdPartyTracers(utils.NetworkEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating network worker pool: %w", err)
Expand All @@ -263,7 +280,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli

metrics.ReportEvent(utils.DnsEventType)
dnsManagerClient.ReportDNSEvent(event)
ruleManager.ReportDNSEvent(event)
ruleManager.ReportEvent(utils.DnsEventType, &event)

// Report DNS events to event receivers
reportEventToThirdPartyTracers(utils.DnsEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating dns worker pool: %w", err)
Expand All @@ -275,7 +295,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}
metrics.ReportEvent(utils.RandomXEventType)
ruleManager.ReportRandomxEvent(event)
ruleManager.ReportEvent(utils.RandomXEventType, &event)

// Report randomx events to event receivers
reportEventToThirdPartyTracers(utils.RandomXEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating randomx worker pool: %w", err)
Expand All @@ -287,7 +310,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}
metrics.ReportEvent(utils.SymlinkEventType)
ruleManager.ReportSymlinkEvent(event)
ruleManager.ReportEvent(utils.SymlinkEventType, &event)

// Report symlink events to event receivers
reportEventToThirdPartyTracers(utils.SymlinkEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating symlink worker pool: %w", err)
Expand All @@ -299,7 +325,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}
metrics.ReportEvent(utils.HardlinkEventType)
ruleManager.ReportHardlinkEvent(event)
ruleManager.ReportEvent(utils.HardlinkEventType, &event)

// Report hardlink events to event receivers
reportEventToThirdPartyTracers(utils.HardlinkEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating hardlink worker pool: %w", err)
Expand All @@ -311,7 +340,10 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}
metrics.ReportEvent(utils.SSHEventType)
ruleManager.ReportSSHEvent(event)
ruleManager.ReportEvent(utils.SSHEventType, &event)

// Report ssh events to event receivers
reportEventToThirdPartyTracers(utils.SSHEventType, &event, thirdPartyEventReceivers)
})
if err != nil {
return nil, fmt.Errorf("creating ssh worker pool: %w", err)
Expand All @@ -334,6 +366,8 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli

metrics.ReportEvent(utils.HTTPEventType)
applicationProfileManager.ReportHTTPEvent(k8sContainerID, &event)

reportEventToThirdPartyTracers(utils.HTTPEventType, &event, thirdPartyEventReceivers)
})

if err != nil {
Expand Down Expand Up @@ -386,13 +420,55 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
httpWorkerChan: make(chan *tracerhttptype.Event, 500000),

// cache
ruleBindingPodNotify: ruleBindingPodNotify,
timeBasedContainers: mapset.NewSet[string](),
ruleManagedPods: mapset.NewSet[string](),
runtime: runtime,
ruleBindingPodNotify: ruleBindingPodNotify,
timeBasedContainers: mapset.NewSet[string](),
ruleManagedPods: mapset.NewSet[string](),
runtime: runtime,
thirdPartyTracers: mapset.NewSet[containerwatcher.CustomTracer](),
thirdPartyContainerReceivers: mapset.NewSet[containerwatcher.ContainerReceiver](),
}, nil
}

func (ch *IGContainerWatcher) GetContainerCollection() *containercollection.ContainerCollection {
return ch.containerCollection
}

func (ch *IGContainerWatcher) GetTracerCollection() *tracercollection.TracerCollection {
return ch.tracerCollection
}

func (ch *IGContainerWatcher) GetSocketEnricher() *socketenricher.SocketEnricher {
return ch.socketEnricher
}

func (ch *IGContainerWatcher) GetContainerSelector() *containercollection.ContainerSelector {
return &ch.containerSelector
}

func (ch *IGContainerWatcher) RegisterCustomTracer(tracer containerwatcher.CustomTracer) error {
for t := range ch.thirdPartyTracers.Iter() {
if t.Name() == tracer.Name() {
return fmt.Errorf("tracer with name %s already registered", tracer.Name())
}
}

ch.thirdPartyTracers.Add(tracer)
return nil
}

func (ch *IGContainerWatcher) UnregisterCustomTracer(tracer containerwatcher.CustomTracer) error {
ch.thirdPartyTracers.Remove(tracer)
return nil
}

func (ch *IGContainerWatcher) RegisterContainerReceiver(receiver containerwatcher.ContainerReceiver) {
ch.thirdPartyContainerReceivers.Add(receiver)
}

func (ch *IGContainerWatcher) UnregisterContainerReceiver(receiver containerwatcher.ContainerReceiver) {
ch.thirdPartyContainerReceivers.Remove(receiver)
}

func (ch *IGContainerWatcher) Start(ctx context.Context) error {
if !ch.running {

Expand Down Expand Up @@ -425,3 +501,11 @@ func (ch *IGContainerWatcher) Stop() {
func (ch *IGContainerWatcher) Ready() bool {
return ch.running
}

func reportEventToThirdPartyTracers(eventType utils.EventType, event utils.K8sEvent, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]]) {
if thirdPartyEventReceivers != nil && thirdPartyEventReceivers.Has(eventType) {
for receiver := range thirdPartyEventReceivers.Get(eventType).Iter() {
receiver.ReportEvent(eventType, event)
}
}
}
20 changes: 20 additions & 0 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
ch.ruleManager.ContainerCallback,
}

for receiver := range ch.thirdPartyContainerReceivers.Iter() {
containerEventFuncs = append(containerEventFuncs, receiver.ContainerCallback)
}

// Define the different options for the container collection instance
opts := []containercollection.ContainerCollectionOption{
// Get Notifications from the container collection
Expand Down Expand Up @@ -267,6 +271,14 @@ func (ch *IGContainerWatcher) startTracers() error {
logger.L().Error("error starting ssh tracing", helpers.Error(err))
return err
}

// Start third party tracers
for tracer := range ch.thirdPartyTracers.Iter() {
if err := tracer.Start(); err != nil {
logger.L().Error("error starting custom tracer", helpers.String("tracer", tracer.Name()), helpers.Error(err))
return err
}
}
}

if ch.cfg.EnableHttpDetection {
Expand Down Expand Up @@ -346,6 +358,14 @@ func (ch *IGContainerWatcher) stopTracers() error {
logger.L().Error("error stopping ssh tracing", helpers.Error(err))
errs = errors.Join(errs, err)
}

// Stop third party tracers
for tracer := range ch.thirdPartyTracers.Iter() {
if err := tracer.Stop(); err != nil {
logger.L().Error("error stopping custom tracer", helpers.String("tracer", tracer.Name()), helpers.Error(err))
errs = errors.Join(errs, err)
}
}
}

if ch.cfg.EnableHttpDetection {
Expand Down
Loading

0 comments on commit dd2c45a

Please sign in to comment.