Skip to content

Commit

Permalink
[plugins][feat] support deferred edges between kubernetes and AWS (#1293
Browse files Browse the repository at this point in the history
)

* make naming consistent to other plugins

* further deserialisation

* add deferred edges k8s <-> aws

* kubernetes_nodes can be connected two ways

* add test

* lint

* clean up
  • Loading branch information
anjafr authored Nov 21, 2022
1 parent bedaf8e commit 5700c09
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 11 deletions.
33 changes: 31 additions & 2 deletions plugins/aws_k8s/resoto_plugin_aws_k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ def _getattr(obj: Any, attr: str) -> Any:
return functools.reduce(_getattr, [obj] + attr.split("."))


def link_k8s_node_to_aws_nodegroup(graph: Graph, resource: BaseResource) -> None:
def link_k8s_node_to_aws_nodegroup_or_ec2_instance(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_node":
if (labels := rgetattr(resource, "labels", {})) and (nodegroup := labels.get("eks.amazonaws.com/nodegroup")):
graph.add_deferred_edge(
BySearchCriteria(f"is(aws_eks_nodegroup) and reported.id={nodegroup}"),
ByNodeId(resource.chksum),
)
if (pid := rgetattr(resource, "node_spec.provider_id", None)) and (pid.startswith("aws://")):
_, ec2_zone_and_instance_id = pid.split("aws://")
ec2_instance_id = ec2_zone_and_instance_id.split("/")[2]
graph.add_deferred_edge(
BySearchCriteria(f"is(aws_ec2_instance) and reported.id={ec2_instance_id}"),
ByNodeId(resource.chksum),
)


def link_k8s_cluster_to_eks_cluster(graph: Graph, resource: BaseResource) -> None:
Expand All @@ -31,6 +38,26 @@ def link_k8s_cluster_to_eks_cluster(graph: Graph, resource: BaseResource) -> Non
)


def link_service_to_elb(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_service":
if (rgetattr(resource, "service_spec.type", None) == "LoadBalancer") and (
ingresses := rgetattr(resource, "service_status.load_balancer.ingress", None)
):
for ingress in ingresses:
if elb_hostname := rgetattr(ingress, "hostname", None):
graph.add_deferred_edge(
BySearchCriteria(f"is(aws_elb) and reported.id={elb_hostname}"), ByNodeId(resource.chksum)
)


def link_pv_to_ebs_volume(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_persistent_volume":
if vol_id := rgetattr(resource, "persistent_volume_spec.aws_elastic_block_store.volume_id", None):
graph.add_deferred_edge(
BySearchCriteria(f"is(aws_ec2_volume) and reported.id={vol_id}"), ByNodeId(resource.chksum)
)


class AWSK8sCollectorPlugin(BasePostCollectPlugin):
name = "aws_k8s"
activate_with: Set[str] = {"aws", "k8s"}
Expand All @@ -39,7 +66,9 @@ def post_collect(self, graph: Graph) -> None:
log.info("plugin: collecting AWS to k8s edges")
for node in graph.nodes:
if isinstance(node, BaseResource):
link_k8s_node_to_aws_nodegroup(graph, node)
link_k8s_node_to_aws_nodegroup_or_ec2_instance(graph, node)
link_k8s_cluster_to_eks_cluster(graph, node)
link_pv_to_ebs_volume(graph, node)
link_service_to_elb(graph, node)
else:
log.warn(f"Node {node} is not a BaseResource")
3 changes: 2 additions & 1 deletion plugins/aws_k8s/test/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from resotolib.baseresources import BaseResource
from resotolib.graph import Graph
from typing import ClassVar
from types import SimpleNamespace


class KubernetesNode(BaseResource):
kind: ClassVar[str] = "kubernetes_node"
labels = {"eks.amazonaws.com/nodegroup": "test-nodegroup"}
node_spec = SimpleNamespace(provider_id="aws:///eu-central-1a/123")

def delete(self, graph: Graph) -> bool:
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _getattr(obj: Any, attr: str) -> Any:
return functools.reduce(_getattr, [obj] + attr.split("."))


def link_do_droplet_to_node(graph: Graph, resource: BaseResource) -> None:
def link_node_to_do_droplet(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_node":
if (pid := rgetattr(resource, "node_spec.provider_id", None)) and (pid.startswith("digitalocean://")):
_, droplet_id = pid.split("digitalocean://")
Expand All @@ -24,7 +24,7 @@ def link_do_droplet_to_node(graph: Graph, resource: BaseResource) -> None:
)


def link_do_lb_to_service(graph: Graph, resource: BaseResource) -> None:
def link_service_to_do_lb(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_service":
if lb_id := resource.tags.get("kubernetes.digitalocean.com/load-balancer-id"):
graph.add_deferred_edge(
Expand All @@ -33,7 +33,7 @@ def link_do_lb_to_service(graph: Graph, resource: BaseResource) -> None:
)


def link_do_volume_to_pv(graph: Graph, resource: BaseResource) -> None:
def link_pv_to_do_volume(graph: Graph, resource: BaseResource) -> None:
if resource.kind == "kubernetes_persistent_volume":
if (
(csi := rgetattr(resource, "persistent_volume_spec.csi", None))
Expand All @@ -53,6 +53,6 @@ def post_collect(self, graph: Graph) -> None:
log.info("plugin: collecting DigitalOcean to k8s edges")
for node in graph.nodes:
node = cast(BaseResource, node)
link_do_droplet_to_node(graph, node)
link_do_lb_to_service(graph, node)
link_do_volume_to_pv(graph, node)
link_node_to_do_droplet(graph, node)
link_service_to_do_lb(graph, node)
link_pv_to_do_volume(graph, node)
16 changes: 14 additions & 2 deletions plugins/k8s/resoto_plugin_k8s/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,12 +1264,24 @@ class KubernetesPersistentVolumeStatus:
reason: Optional[str] = field(default=None)


@define(eq=False, slots=False)
class KubernetesPersistentVolumeSpecAwsElasticBlockStore:
kind: ClassVar[str] = "kubernetes_persistent_volume_spec_aws_elastic_block_store"
mapping: ClassVar[Dict[str, Bender]] = {
"volume_id": S("volumeID"),
"fs_type": S("fsType"),
}
volume_id: Optional[str] = field(default=None)
fs_type: Optional[str] = field(default=None)


@define
class KubernetesPersistentVolumeSpec:
kind: ClassVar[str] = "kubernetes_persistent_volume_spec"
mapping: ClassVar[Dict[str, Bender]] = {
"access_modes": S("accessModes", default=[]),
"aws_elastic_block_store": S("awsElasticBlockStore"),
"aws_elastic_block_store": S("awsElasticBlockStore")
>> Bend(KubernetesPersistentVolumeSpecAwsElasticBlockStore.mapping),
"azure_disk": S("azureDisk"),
"azure_file": S("azureFile"),
"capacity": S("capacity"),
Expand Down Expand Up @@ -1300,7 +1312,7 @@ class KubernetesPersistentVolumeSpec:
"vsphere_volume": S("vsphereVolume"),
}
access_modes: List[str] = field(factory=list)
aws_elastic_block_store: Optional[str] = field(default=None)
aws_elastic_block_store: Optional[KubernetesPersistentVolumeSpecAwsElasticBlockStore] = field(default=None)
azure_disk: Optional[str] = field(default=None)
azure_file: Optional[str] = field(default=None)
capacity: Optional[str] = field(default=None)
Expand Down

0 comments on commit 5700c09

Please sign in to comment.