Skip to content

Commit

Permalink
Patch only changes
Browse files Browse the repository at this point in the history
Signed-off-by: David Wertenteil <[email protected]>
  • Loading branch information
David Wertenteil committed Jan 21, 2024
1 parent c065c20 commit 6e345e3
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 39 deletions.
3 changes: 0 additions & 3 deletions pkg/dnsmanager/dns_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (

"github.com/goradd/maps"
tracerdnstype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/dns/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

// DNSManager is used to manage DNS events and save IP resolutions. It exposes an API to resolve IP address to domain name.
Expand All @@ -28,7 +26,6 @@ func (dm *DNSManager) ProcessDNSEvent(dnsEvent tracerdnstype.Event) {
dm.addressToDomainMap.Set(address, dnsEvent.DNSName)
}
} else {
logger.L().Debug("DNS event has no addresses, using net.LookupIP instead", helpers.String("dnsName", dnsEvent.DNSName))
addresses, err := net.LookupIP(dnsEvent.DNSName)
if err != nil {
return
Expand Down
103 changes: 69 additions & 34 deletions pkg/networkmanager/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (am *NetworkManager) handleContainerStarted(ctx context.Context, container
networkNeighbors.Spec.LabelSelector = *selector
am.status = networkNeighbors.GetAnnotations()[helpersv1.StatusMetadataKey]
if err = am.storageClient.PatchNetworkNeighborsMatchLabels(networkNeighbors.GetName(), networkNeighbors.GetNamespace(), networkNeighbors); err != nil {
logger.L().Warning("NetworkManager - failed to update network neighbor", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))
logger.L().Warning("NetworkManager - failed to update network neighbor labels", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))
}
logger.L().Debug("NetworkManager - updated network neighbor labels", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", k8sContainerID))
}
Expand Down Expand Up @@ -301,45 +301,62 @@ func (am *NetworkManager) handleNetworkEvents(ctx context.Context, container *co
return
}

name := generateNetworkNeighborsNameFromWlid(parentWlid)
namespace := wlid.GetNamespaceFromWlid(parentWlid)
networkNeighborsExists := false
networkNeighbors, err := am.storageClient.GetNetworkNeighbors(namespace, name)
if err == nil {
networkNeighborsExists = true
}

networkEvents := am.containerAndPodToEventsMap.Get(container.Runtime.ContainerID + container.K8s.PodName)
// no events to handle
if networkEvents == nil {
// no events to handle
if am.status == helpersv1.Initializing {
// if we are in initializing state, we need to update the CRD to ready
if err := am.storageClient.PatchNetworkNeighborsIngressAndEgress(generateNetworkNeighborsNameFromWlid(parentWlid), wlid.GetNamespaceFromWlid(parentWlid), &v1beta1.NetworkNeighbors{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
helpersv1.StatusMetadataKey: helpersv1.Ready,
if networkNeighborsExists {
// update status to ready
if networkNeighbors.GetAnnotations()[helpersv1.StatusMetadataKey] == helpersv1.Initializing {
// if we are in initializing state, we need to update the CRD to ready
if err := am.storageClient.PatchNetworkNeighborsIngressAndEgress(name, namespace, &v1beta1.NetworkNeighbors{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
helpersv1.StatusMetadataKey: helpersv1.Ready,
},
},
},
}); err != nil {
logger.L().Warning("NetworkManager - failed to patch network neighbor status", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
} else {
logger.L().Debug("NetworkManager - status updated to ready", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
}); err != nil {
logger.L().Warning("NetworkManager - failed to patch network neighbor status", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
} else {
logger.L().Debug("NetworkManager - status updated to ready", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
}
}
return
}
// TODO: dns enrichment

// update CRD based on events
networkNeighborsSpec := am.generateNetworkNeighborsEntries(container.K8s.Namespace, networkEvents)
// send PATCH command using entries generated from events
if err := am.storageClient.PatchNetworkNeighborsIngressAndEgress(generateNetworkNeighborsNameFromWlid(parentWlid), wlid.GetNamespaceFromWlid(parentWlid), &v1beta1.NetworkNeighbors{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
helpersv1.StatusMetadataKey: helpersv1.Ready,
},
},
Spec: networkNeighborsSpec,
}); err != nil {
// check if error is because crd wasn't created
if !strings.Contains(err.Error(), "not found") {
logger.L().Warning("NetworkManager - failed to update network neighbor", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
}
networkNeighborsSpec := am.generateNetworkNeighborsEntries(container.K8s.Namespace, networkEvents, networkNeighbors.Spec)

// if the error is not found, we need to create it
if networkNeighborsExists {
// patch only if there are changes
if len(networkNeighbors.Spec.Egress) > 0 || len(networkNeighbors.Spec.Ingress) > 0 {
// send PATCH command using entries generated from events
nn := &v1beta1.NetworkNeighbors{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
helpersv1.StatusMetadataKey: helpersv1.Ready,
},
},
Spec: networkNeighborsSpec,
}
if err := am.storageClient.PatchNetworkNeighborsIngressAndEgress(name, namespace, nn); err != nil {
logger.L().Warning("NetworkManager - failed to patch network neighbor", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
return
} else {
logger.L().Debug("NetworkManager - patched network neighbor", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
}
} else {
// if not found, we need to create it
// this can happen if the storage wasn't available when the container started
parentWL, err := am.getParentWorkloadFromContainer(container)
if err != nil {
Expand All @@ -365,20 +382,34 @@ func (am *NetworkManager) handleNetworkEvents(ctx context.Context, container *co
logger.L().Warning("NetworkManager - failed to create network neighbor", helpers.String("reason", err.Error()), helpers.String("container ID", container.Runtime.ContainerID), helpers.String("parent wlid", parentWlid))
return
}
logger.L().Debug("NetworkManager - created network neighbor", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))
}
logger.L().Debug("NetworkManager - updated network neighbor", helpers.String("container ID", container.Runtime.ContainerID), helpers.String("k8s workload", watchedContainer.K8sContainerID))

// remove events from map
am.containerAndPodToEventsMap.Delete(container.Runtime.ContainerID + container.K8s.PodName)
}

func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, networkEvents mapset.Set[NetworkEvent]) v1beta1.NetworkNeighborsSpec {
func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, networkEvents mapset.Set[NetworkEvent], currSpec v1beta1.NetworkNeighborsSpec) v1beta1.NetworkNeighborsSpec {
var networkNeighborsSpec v1beta1.NetworkNeighborsSpec

// auxiliary maps to avoid duplicates
ingressIdentifiersMap := make(map[string]v1beta1.NetworkNeighbor)
egressIdentifiersMap := make(map[string]v1beta1.NetworkNeighbor)

// auxiliary maps to avoid duplicates
currIngressIdentifiersMap := make(map[string]bool)
currEgressIdentifiersMap := make(map[string]bool)

for i := range currSpec.Egress {
if identifier, err := generateNeighborsIdentifier(currSpec.Egress[i]); err == nil {
currEgressIdentifiersMap[identifier] = true
}
}
for i := range currSpec.Ingress {
if identifier, err := generateNeighborsIdentifier(currSpec.Ingress[i]); err == nil {
currIngressIdentifiersMap[identifier] = true
}
}

networkEventsIterator := networkEvents.Iterator()
if networkEventsIterator == nil {
return networkNeighborsSpec
Expand Down Expand Up @@ -439,7 +470,7 @@ func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, netw
}
}

saveNeighborEntry(networkEvent, neighborEntry, egressIdentifiersMap, ingressIdentifiersMap)
saveNeighborEntry(networkEvent, neighborEntry, egressIdentifiersMap, ingressIdentifiersMap, currEgressIdentifiersMap, currIngressIdentifiersMap)

}

Expand All @@ -457,7 +488,7 @@ func (am *NetworkManager) generateNetworkNeighborsEntries(namespace string, netw
}

// saveNeighborEntry encapsulates the logic of generating identifiers and adding the neighborEntry to the map
func saveNeighborEntry(networkEvent NetworkEvent, neighborEntry v1beta1.NetworkNeighbor, egressIdentifiersMap map[string]v1beta1.NetworkNeighbor, ingressIdentifiersMap map[string]v1beta1.NetworkNeighbor) {
func saveNeighborEntry(networkEvent NetworkEvent, neighborEntry v1beta1.NetworkNeighbor, egressIdentifiersMap, ingressIdentifiersMap map[string]v1beta1.NetworkNeighbor, currEgressIdentifiersMap, currIngressIdentifiersMap map[string]bool) {

portIdentifier := generatePortIdentifierFromEvent(networkEvent)

Expand All @@ -480,6 +511,10 @@ func saveNeighborEntry(networkEvent NetworkEvent, neighborEntry v1beta1.NetworkN
logger.L().Debug("failed to hash identifier", helpers.String("identifier", identifier), helpers.String("error", err.Error()))
identifier = uuid.New().String()
}
if ok := currEgressIdentifiersMap[identifier]; ok {
// if identifier already exists, we don't need to add it again
return
}

if networkEvent.PktType == outgoingPktType {
addToMap(egressIdentifiersMap, identifier, portIdentifier, neighborEntry)
Expand Down
5 changes: 3 additions & 2 deletions pkg/networkmanager/network_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func TestGenerateNetworkNeighborsEntries(t *testing.T) {
networkEventsSet.Add(ne)
}
t.Run(fmt.Sprintf("Input: %+v", tc.networkEvents), func(t *testing.T) {
result := am.generateNetworkNeighborsEntries(tc.namespace, networkEventsSet)
result := am.generateNetworkNeighborsEntries(tc.namespace, networkEventsSet, v1beta1.NetworkNeighborsSpec{})

assert.Equal(t, len(result.Ingress), len(tc.expectedSpec.Ingress), "Ingress IP address is not equal in test %s", tc.name)
found := 0
Expand Down Expand Up @@ -1172,7 +1172,8 @@ func TestSaveNeighborEntry(t *testing.T) {

for _, tc := range tests {
t.Run(fmt.Sprintf("Input: %+v", tc.networkEvent), func(t *testing.T) {
saveNeighborEntry(tc.networkEvent, tc.neighborEntry, tc.egressIdentifiersMap, tc.ingressIdentifiersMap)
curr := make(map[string]bool)
saveNeighborEntry(tc.networkEvent, tc.neighborEntry, tc.egressIdentifiersMap, tc.ingressIdentifiersMap, curr, curr)
assert.Equal(t, tc.expectedEgressMap, tc.egressIdentifiersMap)
assert.Equal(t, tc.expectedIngressMap, tc.ingressIdentifiersMap)
})
Expand Down

0 comments on commit 6e345e3

Please sign in to comment.