Skip to content

Commit

Permalink
fix race condition between deleteResources and saveProfile
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Feb 5, 2024
1 parent bedee23 commit 964d507
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/kubescape/backend v0.0.16
github.com/kubescape/go-logger v0.0.22
github.com/kubescape/k8s-interface v0.0.159-0.20240128085543-a829d861c684
github.com/kubescape/storage v0.0.61
github.com/kubescape/storage v0.0.66
github.com/panjf2000/ants/v2 v2.9.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ github.com/kubescape/go-logger v0.0.22 h1:gle7wH6emOiGv9ljdpVi82pWLQ3jGucrUucvil
github.com/kubescape/go-logger v0.0.22/go.mod h1:x3HBpZo3cMT/WIdy18BxvVVd5D0e/PWFVk/HiwBNu3g=
github.com/kubescape/k8s-interface v0.0.159-0.20240128085543-a829d861c684 h1:TaNa599lecZK5oUBKhWn9OgJj4rE1NwIsj698+a3Pl8=
github.com/kubescape/k8s-interface v0.0.159-0.20240128085543-a829d861c684/go.mod h1:5sz+5Cjvo98lTbTVDiDA4MmlXxeHSVMW/wR0V3hV4K8=
github.com/kubescape/storage v0.0.61 h1:T6NIZP+80ILKLVOV9KFU/0OuH4unoLGXo4mO0zwv6/Q=
github.com/kubescape/storage v0.0.61/go.mod h1:uMwudLhZCPgjf4JEbRSUZ20JmQJitLVrexZb7S1N4b0=
github.com/kubescape/storage v0.0.66 h1:mp3bh2bTh1ki+9VeofU+hoKYPwvquRGh7WYU1Oltkqo=
github.com/kubescape/storage v0.0.66/go.mod h1:U27QNwTwRrOoFRQL7Whz6WC5rxonytuwHg4G/wEvH/A=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs=
Expand Down
50 changes: 33 additions & 17 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/kubescape/k8s-interface/names"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
storageUtils "github.com/kubescape/storage/pkg/utils"
"go.opentelemetry.io/otel"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +38,7 @@ type ApplicationProfileManager struct {
cfg config.Config
clusterName string
ctx context.Context
containerMutexes *storageUtils.Mutex[string] // key is k8sContainerID
trackedContainers mapset.Set[string] // key is k8sContainerID
savedCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
savedExecs maps.SafeMap[string, *maps.SafeMap[string, mapset.Set[string]]] // key is k8sContainerID
Expand All @@ -60,6 +62,7 @@ func CreateApplicationProfileManager(ctx context.Context, cfg config.Config, clu
ctx: ctx,
k8sClient: k8sClient,
storageClient: storageClient,
containerMutexes: storageUtils.NewMapMutex[string](),
trackedContainers: mapset.NewSet[string](),
}, nil
}
Expand Down Expand Up @@ -133,6 +136,10 @@ func (am *ApplicationProfileManager) ensureInstanceID(ctx context.Context, conta
}

func (am *ApplicationProfileManager) deleteResources(watchedContainer *utils.WatchedContainerData) {
// make sure we don't run deleteResources and saveProfile at the same time
am.containerMutexes.TryLock(watchedContainer.K8sContainerID)
defer am.containerMutexes.Unlock(watchedContainer.K8sContainerID)
// delete resources
watchedContainer.UpdateDataTicker.Stop()
am.trackedContainers.Remove(watchedContainer.K8sContainerID)
am.savedCapabilities.Delete(watchedContainer.K8sContainerID)
Expand Down Expand Up @@ -171,6 +178,19 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
ctx, span := otel.Tracer("").Start(ctx, "ApplicationProfileManager.saveProfile")
defer span.End()

// make sure we don't run deleteResources and saveProfile at the same time
am.containerMutexes.TryLock(watchedContainer.K8sContainerID)
defer am.containerMutexes.Unlock(watchedContainer.K8sContainerID)

// verify the container hasn't already been deleted
if !am.trackedContainers.Contains(watchedContainer.K8sContainerID) {
logger.L().Ctx(ctx).Debug("ApplicationProfileManager - container isn't tracked, not saving profile",
helpers.Int("container index", watchedContainer.ContainerIndex),
helpers.String("container ID", watchedContainer.ContainerID),
helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}

if watchedContainer.InstanceID == nil {
logger.L().Ctx(ctx).Error("ApplicationProfileManager - instanceID is nil",
helpers.Int("container index", watchedContainer.ContainerIndex),
Expand Down Expand Up @@ -204,11 +224,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
// check if we have new activities to save
savedSyscalls, ok := am.savedSyscalls.Load(watchedContainer.K8sContainerID)
if !ok {
// fallback to empty set
savedSyscalls = mapset.NewSet[string]()
}
savedSyscalls := am.savedSyscalls.Get(watchedContainer.K8sContainerID)
toSaveSyscalls := mapset.NewSet[string](observedSyscalls...).Difference(savedSyscalls)
if !toSaveSyscalls.IsEmpty() {
newActivity := &v1beta1.ApplicationActivity{
Expand Down Expand Up @@ -247,10 +263,10 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
execs := make(map[string]mapset.Set[string])
opens := make(map[string]mapset.Set[string])
// get capabilities from IG
if v, ok := am.toSaveCapabilities.Load(watchedContainer.K8sContainerID); ok {
if toSaveCapabilities := am.toSaveCapabilities.Get(watchedContainer.K8sContainerID); toSaveCapabilities.Cardinality() > 0 {
// remove capabilities to save in a thread safe way using Pop
for {
capability, continuePop := v.Pop()
capability, continuePop := toSaveCapabilities.Pop()
if continuePop {
capabilities = append(capabilities, capability)
} else {
Expand All @@ -259,11 +275,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
}
}
// get pointer to execs map from IG
toSaveExecs, ok := am.toSaveExecs.Load(watchedContainer.K8sContainerID)
if !ok {
// fallback to empty map
toSaveExecs = new(maps.SafeMap[string, mapset.Set[string]])
}
toSaveExecs := am.toSaveExecs.Get(watchedContainer.K8sContainerID)
// point IG to a new exec map
am.toSaveExecs.Set(watchedContainer.K8sContainerID, new(maps.SafeMap[string, mapset.Set[string]]))
// prepare execs map
Expand All @@ -275,11 +287,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
return true
})
// get pointer to opens map from IG
toSaveOpens, ok := am.toSaveOpens.Load(watchedContainer.K8sContainerID)
if !ok {
// fallback to empty map
toSaveOpens = new(maps.SafeMap[string, mapset.Set[string]])
}
toSaveOpens := am.toSaveOpens.Get(watchedContainer.K8sContainerID)
// point IG to a new opens map
am.toSaveOpens.Set(watchedContainer.K8sContainerID, new(maps.SafeMap[string, mapset.Set[string]]))
// prepare opens map
Expand All @@ -291,6 +299,14 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
return true
})
// new profile activity
// the process tries to use JSON patching to avoid conflicts between updates on the same profile from different containers
// 0. create both a patch and a new profile
// 1. try to apply the patch
// 2a. the profile doesn't exist - create the new profile
// 2b. the patch was invalid - get existing object to fix the patch
// 3a. the profile is missing Containers or InitContainers - ADD one with the container profile at the right index
// 3b. the profile is missing the container profile - ADD the container profile at the right index
// 3c. default - patch the container ourselves and REPLACE it at the right index
if len(capabilities) > 0 || len(execs) > 0 || len(opens) > 0 {
// calculate patch
profileOperations := utils.CreatePatchOperations(capabilities, execs, opens, watchedContainer.ContainerType.String(), watchedContainer.ContainerIndex)
Expand Down

0 comments on commit 964d507

Please sign in to comment.