Skip to content

Commit

Permalink
Merge branch 'orisho/fix_sniffer_capture_any' of ssh://github.com/ott…
Browse files Browse the repository at this point in the history
…erize/intents-observer into orisho/fix_sniffer_capture_any
  • Loading branch information
orishoshan committed Sep 26, 2024
2 parents 046c2df + 72a46ae commit 6f8af07
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 45 deletions.
100 changes: 84 additions & 16 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import (
)

const (
podIPIndexField = "ip"
endpointIPPortIndexField = "ipPort"
serviceIPIndexField = "spec.ip"
externalIPIndexField = "spec.externalIPs"
portNumberIndexField = "service.spec.ports.nodePort"
nodeIPIndexField = "node.status.Addresses.ExternalIP"
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
apiServerName = "kubernetes"
apiServerNamespace = "default"
podIPIndexField = "ip"
podIPIncludingHostNetworkIndexField = "ipAndHostNetwork"
endpointIPPortIndexField = "ipPort"
serviceIPIndexField = "spec.ip"
externalIPIndexField = "spec.externalIPs"
nodePortNumberIndexField = "service.spec.ports.nodePort"
nodeIPIndexField = "node.status.Addresses.ExternalIP"
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
apiServerName = "kubernetes"
apiServerNamespace = "default"
)

type KubeFinder struct {
Expand Down Expand Up @@ -70,6 +71,23 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
return errors.Wrap(err)
}

err = k.mgr.GetCache().IndexField(ctx, &corev1.Pod{}, podIPIncludingHostNetworkIndexField, func(object client.Object) []string {
res := make([]string, 0)
pod := object.(*corev1.Pod)

if pod.DeletionTimestamp != nil {
return res
}

for _, ip := range pod.Status.PodIPs {
res = append(res, ip.IP)
}
return res
})
if err != nil {
return errors.Wrap(err)
}

err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, serviceIPIndexField, func(object client.Object) []string {
res := make([]string, 0)
svc := object.(*corev1.Service)
Expand Down Expand Up @@ -99,13 +117,15 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
return errors.Wrap(err)
}

err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, portNumberIndexField, func(object client.Object) []string {
err = k.mgr.GetCache().IndexField(ctx, &corev1.Service{}, nodePortNumberIndexField, func(object client.Object) []string {
// node ports are unique per service - so it can be used for indexing services
ports := sets.New[string]()
svc := object.(*corev1.Service)
if svc.DeletionTimestamp != nil {
return nil
}
if svc.Spec.Type != corev1.ServiceTypeNodePort {
// Only node port and load balancer typed services use node ports
if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}

Expand Down Expand Up @@ -252,22 +272,22 @@ func (k *KubeFinder) ResolveIPToControlPlane(ctx context.Context, ip string) (*c
}

func (k *KubeFinder) ResolveIPToExternalAccessService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
nodePortService, ok, err := k.resolveNodePortService(ctx, ip, port)
nodePortService, ok, err := k.resolveServiceByNodeIPAndPort(ctx, ip, port)
if err != nil {
return nil, false, errors.Wrap(err)
}
if ok {
return nodePortService, true, nil
}

loadBalancerService, ok, err := k.resolveLoadBalancerService(ctx, ip, port)
loadBalancerService, ok, err := k.resolveLoadBalancerServiceByExternalIP(ctx, ip, port)
if err != nil {
return nil, false, errors.Wrap(err)
}
return loadBalancerService, ok, nil
}

func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
func (k *KubeFinder) resolveLoadBalancerServiceByExternalIP(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
var services corev1.ServiceList
err := k.client.List(ctx, &services, client.MatchingFields{externalIPIndexField: ip})
if err != nil {
Expand All @@ -288,7 +308,7 @@ func (k *KubeFinder) resolveLoadBalancerService(ctx context.Context, ip string,
return &service, true, nil
}

func (k *KubeFinder) resolveNodePortService(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
func (k *KubeFinder) resolveServiceByNodeIPAndPort(ctx context.Context, ip string, port int) (*corev1.Service, bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
if err != nil {
Expand All @@ -304,7 +324,7 @@ func (k *KubeFinder) resolveNodePortService(ctx context.Context, ip string, port

portString := fmt.Sprintf("%d", port)
var services corev1.ServiceList
err = k.client.List(ctx, &services, client.MatchingFields{portNumberIndexField: portString})
err = k.client.List(ctx, &services, client.MatchingFields{nodePortNumberIndexField: portString})
if err != nil {
return nil, false, errors.Wrap(err)
}
Expand Down Expand Up @@ -442,3 +462,51 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
dstSvcIdentity.KubernetesService = lo.ToPtr(svc.Name)
return dstSvcIdentity, true, nil
}

func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) {
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted.

isNode, err := k.IsNodeIP(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isNode {
return true, nil
}

isPod, err := k.IsPodIp(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isPod {
return true, nil
}

_, isControlPlane, err := k.ResolveIPToControlPlane(ctx, ip)
if err != nil {
return false, errors.Wrap(err)
}
if isControlPlane {
return true, nil
}

return false, nil
}

func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
var pods corev1.PodList
err := k.client.List(ctx, &pods, client.MatchingFields{podIPIncludingHostNetworkIndexField: ip})
if err != nil {
return false, errors.Wrap(err)
}
return len(pods.Items) > 0, nil
}

func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
if err != nil {
return false, errors.Wrap(err)
}
return len(nodes.Items) > 0, nil
}
13 changes: 10 additions & 3 deletions src/mapper/pkg/resolvers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src *model.RecordedDestinationsForSrc) (model.OtterizeServiceIdentity, error) {
svc, ok, err := r.kubeFinder.ResolveIPToControlPlane(ctx, src.SrcIP)
if err != nil {
return model.OtterizeServiceIdentity{}, errors.Errorf("could not resolve %s to service: %w", src.SrcIP, err)
Expand All @@ -34,6 +34,15 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
return model.OtterizeServiceIdentity{}, errors.Errorf("found pod %s (by ip %s) doesn't match captured hostname %s, ignoring", srcPod.Name, src.SrcIP, src.SrcHostname)
}

// This function requires "src" to be a pointer.
// If at some point this function will be called with a non-pointer "src"
// It may cause a bug because the function will not be able to modify the "src" object of the caller.
r.filterTargetsAccordingToPodCreationTime(src, srcPod)

return r.resolveInClusterIdentity(ctx, srcPod)
}

func (r *Resolver) filterTargetsAccordingToPodCreationTime(src *model.RecordedDestinationsForSrc, srcPod *corev1.Pod) {
filteredDestinations := make([]model.Destination, 0)
for _, dest := range src.Destinations {
if srcPod.CreationTimestamp.After(dest.LastSeen) {
Expand All @@ -43,8 +52,6 @@ func (r *Resolver) discoverInternalSrcIdentity(ctx context.Context, src model.Re
filteredDestinations = append(filteredDestinations, dest)
}
src.Destinations = filteredDestinations

return r.resolveInClusterIdentity(ctx, srcPod)
}

func (r *Resolver) resolveInClusterIdentity(ctx context.Context, pod *corev1.Pod) (model.OtterizeServiceIdentity, error) {
Expand Down
Loading

0 comments on commit 6f8af07

Please sign in to comment.