diff --git a/README.md b/README.md index 546d39e..dd9a82f 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ username associated with the process. - Listening port `port` - Network protocol `protocol` (e.g. tcp) - Configurable to also periodically provide snapshots of all listening processes +- Best effort tracking of UDP sessions with configurability and output + similar to the ones of TCP outbound connections. - Optional plugin system for enriching events in userland - Included `sourceipmap` plugin for mapping source address - Included `loginuidmap` plugin for adding loginuid info to process tree @@ -77,7 +79,7 @@ Pidtree-bcc implements a modular probe system which allows multiple eBPF program to be compiled and run in parallel. Probe loading is handled via the top-level keys in the configuration (see [`example_config.yml`](example_config.yml)). -Currently, this repository implements the `tcp_connect` and `net_listen` probes. +Currently, this repository implements the `tcp_connect`, `net_listen` and `udp_session` probes. It is possible to extend this system with external packages via the `--extra-probe-path` command line parameter. diff --git a/example_config.yml b/example_config.yml index d011142..6dcd8ee 100644 --- a/example_config.yml +++ b/example_config.yml @@ -1,22 +1,26 @@ --- +_net_filters: &net_filters + - subnet_name: 10 + network: 10.0.0.0 + network_mask: 255.0.0.0 + description: "all RFC 1918 10/8" + - subnet_name: 17216 + network: 172.16.0.0 + network_mask: 255.240.0.0 + description: "all RFC 1918 172.16/12" + - subnet_name: 169254 + network: 169.254.0.0 + network_mask: 255.255.0.0 + description: "all 169.254/16 loopback" + - subnet_name: 127 + network: 127.0.0.0 + network_mask: 255.0.0.0 + description: "all 127/8 loopback" + +udp_session: + filters: *net_filters tcp_connect: - filters: - - subnet_name: 10 - network: 10.0.0.0 - network_mask: 255.0.0.0 - description: "all RFC 1918 10/8" - - subnet_name: 17216 - network: 172.16.0.0 - network_mask: 255.240.0.0 - description: "all RFC 1918 172.16/12" - - subnet_name: 169254 - network: 169.254.0.0 - network_mask: 255.255.0.0 - description: "all 169.254/16 loopback" - - subnet_name: 127 - network: 127.0.0.0 - network_mask: 255.0.0.0 - description: "all 127/8 loopback" + filters: *net_filters plugins: sourceipmap: enabled: True diff --git a/itest/example_config.yml b/itest/example_config.yml index fda572d..9b5aa35 100644 --- a/itest/example_config.yml +++ b/itest/example_config.yml @@ -1,42 +1,46 @@ --- +_net_filters: &net_filters + - subnet_name: 0_0_0_0__2 + network: 0.0.0.0 + network_mask: 192.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 64_0_0_0__3 + network: 64.0.0.0 + network_mask: 224.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 96_0_0_0__4 + network: 96.0.0.0 + network_mask: 240.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 112_0_0_0__5 + network: 112.0.0.0 + network_mask: 248.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 120_0_0_0__6 + network: 120.0.0.0 + network_mask: 252.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 124_0_0_0__7 + network: 124.0.0.0 + network_mask: 254.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 126_0_0_0__8 + network: 126.0.0.0 + network_mask: 255.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 128_0_0_0__1 + network: 128.0.0.0 + network_mask: 128.0.0.0 + description: "Non-loopback subnet section" + - subnet_name: 127_0_0_0__16 + network: 127.0.0.0 + network_mask: 255.255.0.0 + description: "127.0/16 to get rid of the noise" + tcp_connect: - filters: - - subnet_name: 0_0_0_0__2 - network: 0.0.0.0 - network_mask: 192.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 64_0_0_0__3 - network: 64.0.0.0 - network_mask: 224.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 96_0_0_0__4 - network: 96.0.0.0 - network_mask: 240.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 112_0_0_0__5 - network: 112.0.0.0 - network_mask: 248.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 120_0_0_0__6 - network: 120.0.0.0 - network_mask: 252.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 124_0_0_0__7 - network: 124.0.0.0 - network_mask: 254.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 126_0_0_0__8 - network: 126.0.0.0 - network_mask: 255.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 128_0_0_0__1 - network: 128.0.0.0 - network_mask: 128.0.0.0 - description: "Non-loopback subnet section" - - subnet_name: 127_0_0_0__16 - network: 127.0.0.0 - network_mask: 255.255.0.0 - description: "127.0/16 to get rid of the noise" + filters: *net_filters net_listen: excludeports: - 31337 +udp_session: + filters: *net_filters diff --git a/itest/itest.sh b/itest/itest.sh index 7d30066..58d2a6b 100755 --- a/itest/itest.sh +++ b/itest/itest.sh @@ -16,6 +16,7 @@ TIMEOUT=$(( SPIN_UP_TIME + 5 )) TEST_CASES=( "tcp_connect:create_connect_event:nc -w 1 127.1.33.7 $TEST_CONNECT_PORT" "net_listen:create_listen_event:nc -w $TEST_LISTEN_TIMEOUT -lnp $TEST_LISTEN_PORT" + "udp_session:create_udp_event:nc -w 1 -u 127.1.33.7 $TEST_CONNECT_PORT" ) function is_port_used { @@ -42,6 +43,16 @@ function create_listen_event { nc -w $TEST_LISTEN_TIMEOUT -lnp $TEST_LISTEN_PORT } +function create_udp_event { + echo "Creating test UDP listener" + nc -u -w $TEST_LISTEN_TIMEOUT -l -p $TEST_CONNECT_PORT & > /dev/null + listener_pid=$! + sleep 1 + echo "Making test UDP connection" + echo "Hello World!" | nc -w 1 -u 127.1.33.7 $TEST_CONNECT_PORT + wait $listener_pid +} + function cleanup { echo "CLEANUP: Caught EXIT" set +eE diff --git a/pidtree_bcc/probes/__init__.py b/pidtree_bcc/probes/__init__.py index d6bae36..ca78318 100644 --- a/pidtree_bcc/probes/__init__.py +++ b/pidtree_bcc/probes/__init__.py @@ -9,7 +9,8 @@ from typing import Mapping from bcc import BPF -from jinja2 import Template +from jinja2 import Environment +from jinja2 import FileSystemLoader from pidtree_bcc.plugins import load_plugins from pidtree_bcc.utils import find_subclass @@ -64,7 +65,8 @@ class variable defining a list of config fields. template_config = {k: template_config[k] for k in self.TEMPLATE_VARS} else: template_config.pop('plugins', None) - self.expanded_bpf_text = Template(self.BPF_TEXT).render(**template_config) + jinja_env = Environment(loader=FileSystemLoader(os.path.dirname(module_src))) + self.expanded_bpf_text = jinja_env.from_string(self.BPF_TEXT).render(**template_config) def _process_events(self, cpu: Any, data: Any, size: Any, from_bpf: bool = True): """ BPF event callback @@ -76,6 +78,8 @@ def _process_events(self, cpu: Any, data: Any, size: Any, from_bpf: bool = True) """ event = self.bpf['events'].event(data) if from_bpf else data event = self.enrich_event(event) + if not event: + return event['timestamp'] = datetime.utcnow().isoformat() + 'Z' event['probe'] = self.probe_name for event_plugin in self.plugins: diff --git a/pidtree_bcc/probes/net_listen.j2 b/pidtree_bcc/probes/net_listen.j2 index 5f08579..e694bfb 100644 --- a/pidtree_bcc/probes/net_listen.j2 +++ b/pidtree_bcc/probes/net_listen.j2 @@ -1,3 +1,4 @@ +{%- import 'utils.j2' as utils -%} #include #include @@ -11,20 +12,7 @@ struct listen_bind_t { u8 protocol; }; -static u8 get_socket_protocol(struct sock *sk) -{ - // I'd love to be the one to have figured this out, I'm not - // https://github.com/iovisor/bcc/blob/v0.16.0/tools/tcpaccept.py#L115 - u8 protocol; - int gso_max_segs_offset = offsetof(struct sock, sk_gso_max_segs); - int sk_lingertime_offset = offsetof(struct sock, sk_lingertime); - if (sk_lingertime_offset - gso_max_segs_offset == 4) { - protocol = *(u8 *)((u64)&sk->sk_gso_max_segs - 3); - } else { - protocol = *(u8 *)((u64)&sk->sk_wmem_queued - 3); - } - return protocol; -} +{{ utils.get_proto_func() }} static void net_listen_event(struct pt_regs *ctx) { diff --git a/pidtree_bcc/probes/tcp_connect.j2 b/pidtree_bcc/probes/tcp_connect.j2 index 82390f7..11da6fa 100644 --- a/pidtree_bcc/probes/tcp_connect.j2 +++ b/pidtree_bcc/probes/tcp_connect.j2 @@ -1,12 +1,8 @@ +{%- import 'utils.j2' as utils -%} #include #include -// IPs and masks are given in integer notation with their dotted notation in the comment -{% for filter in filters %} -// {{ filter.get("description", filter["subnet_name"]) }} -#define subnet_{{ filter["subnet_name"] }} {{ ip_to_int(filter["network"]) }} // {{ filter["network"] }} -#define subnet_{{ filter["subnet_name"] }}_mask {{ ip_to_int(filter["network_mask"]) }} // {{ filter["network_mask"] }} -{% endfor %} +{{ utils.net_filter_masks(filters, ip_to_int) }} BPF_HASH(currsock, u32, struct sock *); BPF_PERF_OUTPUT(events); @@ -44,35 +40,7 @@ int kretprobe__tcp_v4_connect(struct pt_regs *ctx) u16 dport = 0; bpf_probe_read(&daddr, sizeof(daddr), &skp->__sk_common.skc_daddr); bpf_probe_read(&dport, sizeof(dport), &skp->__sk_common.skc_dport); - // - // For each filter, drop the packet iff - // - a filter's subnet matches AND - // - the port is not one of the filter's excepted ports AND - // - the port is one of the filter's included ports, if they exist - // - if (0 // for easier templating - {% for filter in filters -%} - || ( ( - subnet_{{ filter["subnet_name"] }} - & subnet_{{ filter["subnet_name"] }}_mask - ) == (daddr & subnet_{{ filter["subnet_name"] }}_mask) - && ( 1 == 1 // For easier templating - {% for port in filter.get('except_ports', []) -%} - && ntohs({{ port }}) != dport - {% endfor -%} - ) - && ( - {% if filter.get('include_ports') -%} - 0 - {% for port in filter.get('include_ports', []) -%} - || ntohs({{ port }}) == dport - {% endfor -%} - {% else -%} - 1 - {% endif -%} - ) - ) - {% endfor %} ) { + {{ utils.net_filter_if_excluded(filters) | indent(4) }} { currsock.delete(&pid); return 0; } diff --git a/pidtree_bcc/probes/udp_session.j2 b/pidtree_bcc/probes/udp_session.j2 new file mode 100644 index 0000000..e0585ae --- /dev/null +++ b/pidtree_bcc/probes/udp_session.j2 @@ -0,0 +1,90 @@ +{%- import 'utils.j2' as utils -%} +#include +#include + +#define SESSION_START 1 +#define SESSION_CONTINUE 2 +#define SESSION_END 3 + +{{ utils.net_filter_masks(filters, ip_to_int) }} + +struct udp_socket_tuple { + u32 pid; + u64 sock_pointer; +}; + +struct udp_session_event { + u8 type; + u32 pid; + u64 sock_pointer; + u32 daddr; + u16 dport; +}; + +BPF_PERF_OUTPUT(events); +BPF_HASH(tracing, struct udp_socket_tuple, u8); + +{{ utils.get_proto_func() }} + +// We probe only the entrypoint as looking at return codes doesn't have much value +// since UDP does not do any checks for successfull communications. The only errors +// which may arise from this function would be due to the kernel running out of memory, +// and you have bigger problems than precisely tracing UDP connections at that point. +int kprobe__udp_sendmsg(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) +{ + if(sk->__sk_common.skc_family != AF_INET) return 0; + + // Destination info will either be embedded in the socket if `connect` + // was called or specified in the message + struct sockaddr_in* sin = msg->msg_name; + u32 daddr = sin->sin_addr.s_addr ? sin->sin_addr.s_addr : sk->sk_daddr; + u16 dport = sin->sin_port ? sin->sin_port : sk->sk_dport; + + {{ utils.net_filter_if_excluded(filters) | indent(4) }} { + return 0; + } + + // Check if we are already tracing this session + u32 pid = bpf_get_current_pid_tgid(); + struct udp_socket_tuple sock_tuple = {}; + sock_tuple.pid = pid; + sock_tuple.sock_pointer = (u64) sk; + u8 trace_flag = tracing.lookup(&sock_tuple) != 0 ? SESSION_CONTINUE : SESSION_START; + + struct udp_session_event session = {}; + session.pid = pid; + session.type = trace_flag; + session.sock_pointer = sock_tuple.sock_pointer; + bpf_probe_read(&session.daddr, sizeof(u32), &daddr); + bpf_probe_read(&session.dport, sizeof(u16), &dport); + session.dport = ntohs(session.dport); + events.perf_submit(ctx, &session, sizeof(session)); + if(trace_flag == SESSION_START) { + // We don't care about the actual value in the map + // any u8 var would be fine + tracing.update(&sock_tuple, &trace_flag); + } + + return 0; +} + +// Again, we don't care about the `close` call being successfull, we treat +// the invocation as the end of the session regardless +int kprobe__inet_release(struct pt_regs *ctx, struct socket *sock) { + u8 protocol = get_socket_protocol(sock->sk); + if(protocol != IPPROTO_UDP) return 0; + + u32 pid = bpf_get_current_pid_tgid(); + struct udp_socket_tuple sock_tuple = {}; + sock_tuple.pid = pid; + sock_tuple.sock_pointer = (u64) sock->sk; + if(tracing.lookup(&sock_tuple) != 0) { + struct udp_session_event session = {}; + session.pid = pid; + session.type = SESSION_END; + session.sock_pointer = sock_tuple.sock_pointer; + events.perf_submit(ctx, &session, sizeof(session)); + tracing.delete(&sock_tuple); + } + return 0; +} diff --git a/pidtree_bcc/probes/udp_session.py b/pidtree_bcc/probes/udp_session.py new file mode 100644 index 0000000..e2495db --- /dev/null +++ b/pidtree_bcc/probes/udp_session.py @@ -0,0 +1,106 @@ +import time +import traceback +from collections import namedtuple +from multiprocessing import SimpleQueue +from threading import Lock +from typing import Any +from typing import Union + +from pidtree_bcc.probes import BPFProbe +from pidtree_bcc.utils import crawl_process_tree +from pidtree_bcc.utils import int_to_ip +from pidtree_bcc.utils import ip_to_int +from pidtree_bcc.utils import never_crash + + +SessionEventWrapper = namedtuple('SessionEndEvent', ('type', 'pid', 'sock_pointer')) + + +class UDPSessionProbe(BPFProbe): + + CONFIG_DEFAULTS = { + 'ip_to_int': ip_to_int, + 'filters': [], + } + SESSION_MAX_DURATION_DEFAULT = 120 + SESSION_START = 1 + SESSION_CONTINUE = 2 + SESSION_END = 3 + + def __init__(self, output_queue: SimpleQueue, config: dict = {}): + super().__init__(output_queue, config) + self.session_tracking = {} + self.thread_lock = Lock() + self.SIDECARS.append(( + self._session_expiration_worker, + (config.get('session_max_duration', self.SESSION_MAX_DURATION_DEFAULT),), + )) + + def enrich_event(self, event: Any) -> Union[dict, None]: + """ Parses UDP session event and adds process tree data + + :param Any event: BPF event + :return: event dictionary with process tree at session end + """ + with self.thread_lock: + return self._enrich_event_impl(event) + + def _enrich_event_impl(self, event: Any) -> Union[dict, None]: + """ Actual `enrich_event` implementation, separated for cleaner thread locking code """ + now = time.monotonic() + sock_key = (event.pid, event.sock_pointer) + if event.type == self.SESSION_START: + try: + error = '' + proctree = crawl_process_tree(event.pid) + except Exception: + error = traceback.format_exc() + proctree = [] + self.session_tracking[sock_key] = { + 'pid': event.pid, + 'proctree': proctree, + 'destinations': {(event.daddr, event.dport): [now, 1]}, + 'error': error, + 'last_update': now, + } + elif event.type == self.SESSION_CONTINUE: + dest_key = (event.daddr, event.dport) + session_data = self.session_tracking[sock_key] + if dest_key not in session_data['destinations']: + session_data['destinations'][dest_key] = [now, 1] + else: + session_data['destinations'][dest_key][1] += 1 + session_data['last_update'] = now + else: + session_data = self.session_tracking.pop(sock_key) + session_data.pop('last_update') + session_data['destinations'] = [ + { + 'daddr': int_to_ip(addr_port[0]), + 'port': addr_port[1], + 'duration': now - begin_count[0], + 'msg_count': begin_count[1], + } + for addr_port, begin_count in session_data['destinations'].items() + ] + return session_data + + @never_crash + def _session_expiration_worker(self, session_max_duration: int): + """ Handler function for session expiration thread. + Removes from tracking sessions older than the specified max duration + + :param int session_max_duration: max session duration in seconds + """ + while True: + time.sleep(session_max_duration) + expired = [] + now = time.monotonic() + with self.thread_lock: + for sock_key, session_data in self.session_tracking.items(): + if now - session_data['last_update'] > session_max_duration: + session_data['error'] = 'session_max_duration_exceeded' + expired.append(sock_key) + for sock_key in expired: + end_event = SessionEventWrapper(self.SESSION_END, *sock_key) + self._process_events(None, end_event, None, False) diff --git a/pidtree_bcc/probes/utils.j2 b/pidtree_bcc/probes/utils.j2 new file mode 100644 index 0000000..3d503a3 --- /dev/null +++ b/pidtree_bcc/probes/utils.j2 @@ -0,0 +1,57 @@ +{% macro get_proto_func() -%} +static u8 get_socket_protocol(struct sock *sk) +{ + // I'd love to be the one to have figured this out, I'm not + // https://github.com/iovisor/bcc/blob/v0.16.0/tools/tcpaccept.py#L115 + u8 protocol; + int gso_max_segs_offset = offsetof(struct sock, sk_gso_max_segs); + int sk_lingertime_offset = offsetof(struct sock, sk_lingertime); + if (sk_lingertime_offset - gso_max_segs_offset == 4) { + protocol = *(u8 *)((u64)&sk->sk_gso_max_segs - 3); + } else { + protocol = *(u8 *)((u64)&sk->sk_wmem_queued - 3); + } + return protocol; +} +{%- endmacro %} + +{% macro net_filter_masks(filters, ip_to_int) -%} +// IPs and masks are given in integer notation with their dotted notation in the comment +{% for filter in filters %} +// {{ filter.get("description", filter["subnet_name"]) }} +#define subnet_{{ filter["subnet_name"] }} {{ ip_to_int(filter["network"]) }} // {{ filter["network"] }} +#define subnet_{{ filter["subnet_name"] }}_mask {{ ip_to_int(filter["network_mask"]) }} // {{ filter["network_mask"] }} +{% endfor %} +{%- endmacro %} + +{% macro net_filter_if_excluded(filters, daddr_var='daddr', dport_var='dport') -%} +// +// For each filter, drop the packet iff +// - a filter's subnet matches AND +// - the port is not one of the filter's excepted ports AND +// - the port is one of the filter's included ports, if they exist +// +if (0 // for easier templating +{% for filter in filters -%} + || ( + ( + subnet_{{ filter["subnet_name"] }} + & subnet_{{ filter["subnet_name"] }}_mask + ) == ({{ daddr_var }} & subnet_{{ filter["subnet_name"] }}_mask) + {%- if filter.get('except_ports') %} + && (1 // for easier templating + {% for port in filter['except_ports'] -%} + && ntohs({{ port }}) != {{ dport_var }} + {%- endfor %} + ) + {%- endif %} + {%- if filter.get('include_ports') %} + && (0 // for easier templating + {% for port in filter['include_ports'] -%} + || ntohs({{ port }}) == {{ dport_var }} + {%- endfor %} + ) + {%- endif %} + ) +{% endfor %}) +{%- endmacro %} diff --git a/tests/udp_session_probe_test.py b/tests/udp_session_probe_test.py new file mode 100644 index 0000000..3c7b9e2 --- /dev/null +++ b/tests/udp_session_probe_test.py @@ -0,0 +1,74 @@ +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from pidtree_bcc.probes.udp_session import SessionEventWrapper +from pidtree_bcc.probes.udp_session import UDPSessionProbe + + +@patch('pidtree_bcc.probes.udp_session.crawl_process_tree') +@patch('pidtree_bcc.probes.udp_session.time') +def test_udp_session_enrich_event(mock_time, mock_crawl): + probe = UDPSessionProbe(None) + mock_time.monotonic.side_effect = range(3) + mock_crawl.return_value = [ + {'pid': 123, 'cmdline': 'some_program', 'username': 'foo'}, + {'pid': 50, 'cmdline': 'bash', 'username': 'foo'}, + {'pid': 1, 'cmdline': 'init', 'username': 'root'}, + ] + assert probe.enrich_event( + MagicMock(type=1, pid=123, sock_pointer=1, daddr=168430090, dport=1337), + ) is None + assert probe.enrich_event( + MagicMock(type=2, pid=123, sock_pointer=1, daddr=16777343, dport=1337), + ) is None + assert probe.enrich_event( + MagicMock(type=3, pid=123, sock_pointer=1), + ) == { + 'pid': 123, + 'proctree': [ + {'pid': 123, 'cmdline': 'some_program', 'username': 'foo'}, + {'pid': 50, 'cmdline': 'bash', 'username': 'foo'}, + {'pid': 1, 'cmdline': 'init', 'username': 'root'}, + ], + 'destinations': [ + { + 'daddr': '10.10.10.10', + 'port': 1337, + 'duration': 2, + 'msg_count': 1, + }, + { + 'daddr': '127.0.0.1', + 'port': 1337, + 'duration': 1, + 'msg_count': 1, + }, + ], + 'error': '', + } + mock_crawl.assert_called_once_with(123) + + +@patch('pidtree_bcc.probes.udp_session.time') +def test_udp_session_expiration_worker(mock_time): + mock_time.sleep.side_effect = [None, Exception('foobar')] # to stop inf loop + mock_time.monotonic.return_value = 200 + probe = UDPSessionProbe(None) + probe.session_tracking = { + (1, 1): {'last_update': 180}, + (2, 2): {'last_update': 0}, + (3, 3): {'last_update': 190}, + } + with patch.object(probe, '_process_events') as mock_process: + # never_crash uses functools.wraps so we can extract the wrapped method + undecorated_method = probe._session_expiration_worker.__wrapped__ + # assert we catch the inf loop stopping exception + with pytest.raises(Exception, match='foobar'): + # the undecorated method is not bound to the object, + # so we need to pass `probe` as `self` + undecorated_method(probe, 120) + mock_process.assert_called_once_with( + None, SessionEventWrapper(3, 2, 2), None, False, + )