Skip to content

Commit

Permalink
improve handling of container tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
piax93 committed Nov 27, 2024
1 parent a1776b5 commit 5ee9704
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 71 deletions.
5 changes: 3 additions & 2 deletions example_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ _net_filters: &net_filters
# Some configuration fields supported by all probes:
# filters: list of network filters (see above for schema); they act on the destination
# address for tcp_connect and udp_session, local listening address for net_listen
# container_labels: list of label filters to only capture events generated from processes in containers
# container_poll_interval: how often to poll for running containers (in seconds)
# container_labels: list of label glob patterns to only capture events generated from processes in containers;
# each entry is in the format `label1=patter1,label2=pattern2,...` where commas are treated like
# and AND, while different entries are OR'ed to each other
# excludeports: list of ports to be filtered out (cannot be used with includeports)
# includeports: list of ports for which events will be logged (filters out all the others) (cannot be used with excludeports)
# plugins: map of plugins to enable for the probe (check README for more details)
Expand Down
200 changes: 182 additions & 18 deletions pidtree_bcc/containers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
import enum
import fnmatch
import json
import logging
import os
import shlex
import subprocess
import time
from functools import lru_cache
from itertools import chain
from typing import Callable
from typing import Generator
from typing import Iterable
from typing import List
from typing import NamedTuple
from typing import Set
from typing import Tuple


class ContainerEventType(enum.Enum):
start = 'start'
stop = 'stop'


class ContainerEvent(NamedTuple):
event_type: ContainerEventType
container_id: str

def __bool__(self) -> bool:
return bool(self.container_id)


class MountNSInfo(NamedTuple):
container_id: str
container_name: str
ns_id: int
event_type: ContainerEventType = ContainerEventType.start


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -40,43 +70,177 @@ def list_containers(filter_labels: List[str] = None) -> List[str]:
return []


def extract_container_name(inspect_data: dict) -> str:
""" Extracts name from container information, falling back to labels if needed.
This is needed because the "Name" field is basically always empty for containerd.
:param dict inspect_data: container information
:return: container name
"""
name = inspect_data.get('Name', '').lstrip('/')
return (
name
if name
else inspect_data.get('Config', {}).get('Labels', {}).get('io.kubernetes.pod.name', '')
)


@lru_cache(maxsize=2048)
def get_container_mntns_id(sha: str) -> int:
def inspect_container(sha: str) -> dict:
""" Inspect container
:param str sha: container hash ID
:return: inspect data
"""
output = subprocess.check_output(
(detect_containerizer_client(), 'inspect', sha),
encoding='utf8',
)
return json.loads(output)[0]


@lru_cache(maxsize=20000)
def get_container_mntns_id(sha: str, second_try: bool = False) -> int:
""" Get mount namespace ID for a container
:param str sha: container hash ID
:return: mount namespace ID
"""
try:
output = subprocess.check_output(
(
detect_containerizer_client(), 'inspect',
'-f', r'{{.State.Pid}}', sha,
), encoding='utf8',
)
pid = int(output.splitlines()[0])
inspect_data = inspect_container(sha)
main_pid = inspect_data['State']['Pid']
if main_pid == 0 and not second_try:
# when detecting containers from the events stream, we may be
# "too fast" and there is no process associated to the container yet
time.sleep(0.5)
return get_container_mntns_id(sha, second_try=True)
except Exception as e:
logging.error(f'Issue inspecting container {sha}: {e}')
return -1
try:
return os.stat(f'/proc/{pid}/ns/mnt').st_ino
return os.stat(f'/proc/{main_pid}/ns/mnt').st_ino
except Exception as e:
logging.error(f'Issue reading mntns ID for {pid}: {e}')
logging.error(f'Issue reading mntns ID for {main_pid}: {e}')
return -1


def list_container_mnt_namespaces(filter_labels: List[str] = None) -> Set[int]:
def filter_containers_with_label_patterns(
container_ids: Iterable[str],
patterns: Iterable[str],
) -> List[Tuple[str, str]]:
""" Given a list of container IDs, find the ones with labels matching any of the patterns
:param Iterable[str] container_ids: collection of container IDs
:param Iterable[str] patterns: collection of label patterns, with entries in the format `<label_name>=<pattern>,...`
:return: filtered list of container IDs
"""
result = []
unpacked_patterns = [
[pattern.split('=', 1) for pattern in pattern_set.split(',')]
for pattern_set in patterns
]
for container_id in container_ids:
try:
container_info = inspect_container(container_id)
labels = container_info.get('Config', {}).get('Labels', {})
if labels and any(
all(
label_key in labels
and fnmatch.fnmatch(labels[label_key], pattern)
for label_key, pattern in pattern_set
)
for pattern_set in unpacked_patterns
):
result.append((container_id, extract_container_name(container_info)))
except Exception as e:
logging.error(f'Issue inspecting container {container_id}: {e}')
return result


def list_container_mnt_namespaces(
patterns: Iterable[str] = None,
generator: Callable[[], List[str]] = list_containers,
) -> Set[MountNSInfo]:
""" Get collection of mount namespace IDs for running containers matching label filters
:param List[str] filter_labels: list of label values, either `<label_name>` or `<label_name>=<label_value>`
:return: set of mount namespace IDs
:param Iterable[str] filter_labels: list of label values, `<label_name>=<pattern>,...`
:param Callable[[], List[str]] generator: method to call to generate container ID list
:return: set of mount namespace info
"""
patterns = patterns if patterns else []
return {
mntns_id
for mntns_id in (
get_container_mntns_id(container_id)
for container_id in list_containers(filter_labels)
mntns_info
for mntns_info in (
MountNSInfo(container_id, name, get_container_mntns_id(container_id))
for container_id, name in filter_containers_with_label_patterns(generator(), patterns)
if container_id
)
if mntns_id > 0
if mntns_info.ns_id > 0
}


def monitor_container_mnt_namespaces(patterns: Iterable[str] = None) -> Generator[MountNSInfo, None, None]:
""" Listens to containerizer events for new containers being created, and grab their namespace info
:param Iterable[str] filter_labels: list of label values, `<label_name>=<pattern>,...`
:return: set of mount namespace info
"""
for event in monitor_container_events():
if event.event_type == ContainerEventType.start:
yield from list_container_mnt_namespaces(patterns, lambda: [event.container_id])
else:
yield MountNSInfo(event.container_id, '', 0, event.event_type)


def _tail_subprocess_json(cmd: str, shell: bool = False) -> Generator[dict, None, None]:
""" Run command and tail output line by line, parsing it as JSON
:param Iterable[str] cmd: command to run
:return: yield dicts, stops on errors
"""
try:
with subprocess.Popen(
args=cmd if shell else shlex.split(cmd),
stdout=subprocess.PIPE,
encoding='utf-8',
shell=shell,
) as proc:
for line in proc.stdout:
if not line.strip():
continue
yield json.loads(line)
except Exception as e:
logging.error(f'Error while running {cmd}: {e}')


def monitor_container_events() -> Generator[ContainerEvent, None, None]:
""" Listens to containerizer events for new containers being created
:return: yields container IDs
"""
client_cli = detect_containerizer_client()
if client_cli == 'docker':
use_shell = False
event_filters = '--filter type=container --filter event=start --filter event=die'

def event_extractor(event): return ContainerEvent(
ContainerEventType.start if event.get('status', '') == 'start' else ContainerEventType.stop,
event.get('id', ''),
)
else:
use_shell = True
event_filters = "| grep -E '/tasks/(start|delete)'"

def event_extractor(event): return ContainerEvent(
ContainerEventType.start if event.get('Topic', '').endswith('start') else ContainerEventType.stop,
event.get('ID', ''),
)

cmd = f"{client_cli} events --format '{{{{json .}}}}' {event_filters}"
while True:
event_gen = _tail_subprocess_json(cmd, use_shell)
for event in event_gen:
res = event_extractor(event)
if not res:
continue
yield res
15 changes: 10 additions & 5 deletions pidtree_bcc/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,20 @@ def load_port_filters_into_map(
ebpf_map[ctypes.c_int(0)] = ctypes.c_uint8(mode.value)


def load_intset_into_map(intset: Set[int], ebpf_map: Any, do_diff: bool = False):
def load_intset_into_map(intset: Set[int], ebpf_map: Any, do_diff: bool = False, delete: bool = False):
""" Loads set of int values into eBPF map
:param Set[int] intset: input values
:param Any ebpf_map: array in which filters are loaded
:param bool do_diff: diff input with existing values, removing excess entries
:param bool delete: remove values rather than adding them
"""
current_state = set((k.value for k, _ in ebpf_map.items()) if do_diff else [])
for val in intset:
ebpf_map[ctypes.c_int(val)] = ctypes.c_uint8(1)
for val in (current_state - intset):
if delete:
to_delete = intset
else:
current_state = set((k.value for k, _ in ebpf_map.items()) if do_diff else [])
to_delete = current_state - intset
for val in intset:
ebpf_map[ctypes.c_int(val)] = ctypes.c_uint8(1)
for val in to_delete:
del ebpf_map[ctypes.c_int(val)]
56 changes: 43 additions & 13 deletions pidtree_bcc/probes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from jinja2 import FileSystemLoader

from pidtree_bcc.config import enumerate_probe_configs
from pidtree_bcc.containers import ContainerEventType
from pidtree_bcc.containers import list_container_mnt_namespaces
from pidtree_bcc.containers import monitor_container_mnt_namespaces
from pidtree_bcc.filtering import load_filters_into_map
from pidtree_bcc.filtering import load_intset_into_map
from pidtree_bcc.filtering import load_port_filters_into_map
Expand Down Expand Up @@ -56,7 +58,7 @@ class BPFProbe:
NET_FILTER_MAP_SIZE_SCALING = 512
PORT_FILTER_MAP_NAME = 'port_filter_map'
MNTNS_FILTER_MAP_NAME = 'mntns_filter_map'
DEFAULT_CONTAINER_POLL_INTERVAL = 30 # seconds
CONTAINER_BASELINE_INTERVAL = 30 * 60 # seconds

def __init__(
self,
Expand Down Expand Up @@ -103,10 +105,9 @@ class variable defining a list of config fields.
self.SIDECARS.append((self._poll_config_changes, (config_change_queue,)))
self.container_labels_filter = template_config.get('container_labels')
if self.container_labels_filter:
self.SIDECARS.append((
self._monitor_running_containers,
(template_config.get('container_poll_interval', self.DEFAULT_CONTAINER_POLL_INTERVAL),),
))
self.container_name_mapping = {}
self.container_idns_mapping = {}
self.SIDECARS.append((self._monitor_running_containers, tuple()))

def build_probe_config(self, probe_config: dict, hotswap_only: bool = False) -> dict:
""" Load probe configuration values
Expand Down Expand Up @@ -212,15 +213,38 @@ def _poll_config_changes(self, config_queue: SimpleQueue):
self.reload_filters()

@never_crash
def _monitor_running_containers(self, poll_interval: int):
def _monitor_running_containers(self):
""" Polls running containers, filtering by label to keep mntns filtering map updated """
monitored_mntns = set()
while True:
mntns = list_container_mnt_namespaces(self.container_labels_filter)
if mntns != monitored_mntns:
monitored_mntns = mntns
load_intset_into_map(mntns, self.bpf[self.MNTNS_FILTER_MAP_NAME], True)
time.sleep(poll_interval)
last_baseline = time.time()
self._running_containers_baseline()
for mntns_info in monitor_container_mnt_namespaces(self.container_labels_filter):
if (now := time.time()) - last_baseline > self.CONTAINER_BASELINE_INTERVAL:
self._running_containers_baseline()
last_baseline = now
continue
if mntns_info.event_type == ContainerEventType.stop:
if (ns_id := self.container_idns_mapping.pop(mntns_info.container_id, None)):
load_intset_into_map(
{ns_id},
self.bpf[self.MNTNS_FILTER_MAP_NAME],
delete=True,
)
self.container_name_mapping.pop(ns_id, None)
continue
load_intset_into_map({mntns_info.ns_id}, self.bpf[self.MNTNS_FILTER_MAP_NAME])
self.container_name_mapping[mntns_info.ns_id] = mntns_info.container_name
self.container_idns_mapping[mntns_info.container_id] = mntns_info.ns_id

def _running_containers_baseline(self):
""" Create baseline for monitored containers """
ns_infos = list_container_mnt_namespaces(self.container_labels_filter)
load_intset_into_map(
{mntns_info.ns_id for mntns_info in ns_infos},
self.bpf[self.MNTNS_FILTER_MAP_NAME],
do_diff=True,
)
self.container_name_mapping.update({mntns_info.ns_id: mntns_info.container_name for mntns_info in ns_infos})
self.container_idns_mapping.update({mntns_info.container_id: mntns_info.ns_id for mntns_info in ns_infos})

def reload_filters(self, is_init: bool = False):
""" Load filters
Expand Down Expand Up @@ -252,6 +276,12 @@ def start_polling(self):
while True:
poll_func()

def enrich_container_name(self, event: Any, formatted_event: dict) -> dict:
""" Updates in place event dict with name of container related to the event """
if self.container_labels_filter:
formatted_event['container_name'] = self.container_name_mapping.get(event.mntns_id, '')
return formatted_event

def enrich_event(self, event: Any) -> dict:
""" Transform raw BPF event data into dictionary,
possibly adding more interesting data to it.
Expand Down
6 changes: 6 additions & 0 deletions pidtree_bcc/probes/net_listen.j2
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ struct listen_bind_t {
u32 laddr;
u16 port;
u8 protocol;
{%- if container_labels %}
u64 mntns_id;
{% endif -%}
};

{{ utils.net_filter_trie_init(NET_FILTER_MAP_NAME, PORT_FILTER_MAP_NAME, size=NET_FILTER_MAP_SIZE, max_ports=NET_FILTER_MAX_PORT_RANGES) }}
Expand Down Expand Up @@ -54,6 +57,9 @@ static void net_listen_event(struct pt_regs *ctx)
listen.port = port;
listen.laddr = laddr;
listen.protocol = get_socket_protocol(sk);
{% if container_labels -%}
listen.mntns_id = get_mntns_id();
{% endif -%}
events.perf_submit(ctx, &listen, sizeof(listen));
currsock.delete(&pid);
}
Expand Down
Loading

0 comments on commit 5ee9704

Please sign in to comment.