Skip to content

Commit

Permalink
Add support for FastCGI protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski committed Nov 28, 2024
1 parent 9efc255 commit 898ea5f
Show file tree
Hide file tree
Showing 54 changed files with 359 additions and 126 deletions.
1 change: 1 addition & 0 deletions bpf/http_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ typedef struct tp_info_pid {
tp_info_t tp;
u32 pid;
u8 valid;
u8 type;
} tp_info_pid_t;

// Here we keep the information that is sent on the ring buffer
Expand Down
59 changes: 51 additions & 8 deletions bpf/protocol_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ static __always_inline tcp_req_t *empty_tcp_req() {
return value;
}

static __always_inline void init_new_trace(tp_info_t *tp) {
new_trace_id(tp);
__builtin_memset(tp->parent_id, 0, sizeof(tp->span_id));
}

static __always_inline void
set_tcp_trace_info(u8 type, connection_info_t *conn, tp_info_t *tp, u32 pid) {
tp_info_pid_t *tp_p = tp_buf();

if (!tp_p) {
return;
}

tp_p->tp = *tp;
tp_p->tp.flags = 1;
tp_p->valid = 1;
tp_p->pid = pid; // used for avoiding finding stale server requests with client port reuse
tp_p->type = type;

set_trace_info_for_connection(conn, tp_p);
bpf_dbg_printk("Set traceinfo for conn");
dbg_print_http_connection_info(conn);

server_or_client_trace(type, conn, tp_p);
}

static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t *pid_conn,
void *u_buf,
int bytes_len,
Expand All @@ -54,15 +80,30 @@ static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t
task_pid(&req->pid);
bpf_probe_read(req->buf, K_TCP_MAX_LEN, u_buf);

tp_info_pid_t *server_tp = find_parent_trace(pid_conn);
req->tp.ts = bpf_ktime_get_ns();

if (server_tp && server_tp->valid && valid_trace(server_tp->tp.trace_id)) {
bpf_dbg_printk("Found existing server tp for client call");
__builtin_memcpy(
req->tp.trace_id, server_tp->tp.trace_id, sizeof(req->tp.trace_id));
__builtin_memcpy(
req->tp.parent_id, server_tp->tp.span_id, sizeof(req->tp.parent_id));
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
bpf_dbg_printk("TCP request start, direction = %d, ssl = %d", direction, ssl);

if (direction == TCP_SEND) { // Client
u8 found = find_trace_for_client_request(pid_conn, &req->tp);
bpf_dbg_printk("Looking up client trace info, found %d", found);
if (found) {
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
} else {
init_new_trace(&req->tp);
}

set_tcp_trace_info(EVENT_HTTP_CLIENT, &pid_conn->conn, &req->tp, pid_conn->pid);
} else { // Server
u8 found = find_trace_for_server_request(
&pid_conn->conn, &req->tp, pid_conn->pid, EVENT_HTTP_REQUEST);
bpf_dbg_printk("Looking up server trace info, found %d", found);
if (found) {
urand_bytes(req->tp.span_id, SPAN_ID_SIZE_BYTES);
} else {
init_new_trace(&req->tp);
}
set_tcp_trace_info(EVENT_HTTP_REQUEST, &pid_conn->conn, &req->tp, pid_conn->pid);
}

bpf_map_update_elem(&ongoing_tcp_req, pid_conn, req, BPF_ANY);
Expand Down Expand Up @@ -90,6 +131,8 @@ static __always_inline void handle_unknown_tcp_connection(pid_connection_info_t
bpf_clamp_umax(off, (K_TCP_MAX_LEN / 2));
bpf_probe_read(existing->buf + off, (K_TCP_MAX_LEN / 2), u_buf);
existing->len += bytes_len;
} else {
existing->len += bytes_len;
}
}

Expand Down
98 changes: 58 additions & 40 deletions bpf/trace_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,9 @@ static __always_inline u8 valid_trace(const unsigned char *trace_id) {
return *((u64 *)trace_id) != 0 || *((u64 *)(trace_id + 8)) != 0;
}

static __always_inline void server_or_client_trace(http_connection_metadata_t *meta,
connection_info_t *conn,
tp_info_pid_t *tp_p) {
if (!meta) {
return;
}
if (meta->type == EVENT_HTTP_REQUEST) {
static __always_inline void
server_or_client_trace(u8 type, connection_info_t *conn, tp_info_pid_t *tp_p) {
if (type == EVENT_HTTP_REQUEST) {
trace_key_t t_key = {0};
task_tid(&t_key.p_key);
t_key.extra_id = extra_runtime_id();
Expand Down Expand Up @@ -226,6 +222,52 @@ static __always_inline void server_or_client_trace(http_connection_metadata_t *m
}
}

static __always_inline u8 find_trace_for_server_request(connection_info_t *conn,
tp_info_t *tp,
u32 current_pid,
u8 current_type) {
u8 found_tp = 0;
tp_info_pid_t *existing_tp = bpf_map_lookup_elem(&incoming_trace_map, conn);
if (existing_tp) {
found_tp = 1;
bpf_dbg_printk("Found incoming (TCP) tp for server request");
__builtin_memcpy(tp->trace_id, existing_tp->tp.trace_id, sizeof(tp->trace_id));
__builtin_memcpy(tp->parent_id, existing_tp->tp.span_id, sizeof(tp->parent_id));
bpf_map_delete_elem(&incoming_trace_map, conn);
} else {
bpf_dbg_printk("Looking up tracemap for");
dbg_print_http_connection_info(conn);

existing_tp = trace_info_for_connection(conn);

bpf_dbg_printk("existing_tp %llx", existing_tp);

if (!disable_black_box_cp &&
correlated_requests(tp, current_pid, current_type, existing_tp)) {
found_tp = 1;
bpf_dbg_printk("Found existing correlated tp for server request");
__builtin_memcpy(tp->trace_id, existing_tp->tp.trace_id, sizeof(tp->trace_id));
__builtin_memcpy(tp->parent_id, existing_tp->tp.span_id, sizeof(tp->parent_id));
}
}

return found_tp;
}

static __always_inline u8 find_trace_for_client_request(pid_connection_info_t *p_conn,
tp_info_t *tp) {
u8 found_tp = 0;
tp_info_pid_t *server_tp = find_parent_trace(p_conn);
if (server_tp && server_tp->valid && valid_trace(server_tp->tp.trace_id)) {
found_tp = 1;
bpf_dbg_printk("Found existing server tp for client call");
__builtin_memcpy(tp->trace_id, server_tp->tp.trace_id, sizeof(tp->trace_id));
__builtin_memcpy(tp->parent_id, server_tp->tp.span_id, sizeof(tp->parent_id));
}

return found_tp;
}

static __always_inline void get_or_create_trace_info(http_connection_metadata_t *meta,
u32 pid,
connection_info_t *conn,
Expand All @@ -248,45 +290,17 @@ static __always_inline void get_or_create_trace_info(http_connection_metadata_t
u8 found_tp = 0;

if (meta) {
tp_p->type = meta->type;
if (meta->type == EVENT_HTTP_CLIENT) {
pid_connection_info_t p_conn = {.pid = pid};
__builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t));
tp_info_pid_t *server_tp = find_parent_trace(&p_conn);

if (server_tp && server_tp->valid && valid_trace(server_tp->tp.trace_id)) {
found_tp = 1;
bpf_dbg_printk("Found existing server tp for client call");
__builtin_memcpy(
tp_p->tp.trace_id, server_tp->tp.trace_id, sizeof(tp_p->tp.trace_id));
__builtin_memcpy(
tp_p->tp.parent_id, server_tp->tp.span_id, sizeof(tp_p->tp.parent_id));
}
found_tp = find_trace_for_client_request(&p_conn, &tp_p->tp);
} else {
//bpf_dbg_printk("Looking up existing trace for connection");
//dbg_print_http_connection_info(conn);

// For server requests, we first look for TCP info (setup by TC ingress) and then we fall back to black-box info.
tp_info_pid_t *existing_tp = bpf_map_lookup_elem(&incoming_trace_map, conn);
if (existing_tp) {
found_tp = 1;
bpf_dbg_printk("Found incoming (TCP) tp for server request");
__builtin_memcpy(
tp_p->tp.trace_id, existing_tp->tp.trace_id, sizeof(tp_p->tp.trace_id));
__builtin_memcpy(
tp_p->tp.parent_id, existing_tp->tp.span_id, sizeof(tp_p->tp.parent_id));
bpf_map_delete_elem(&incoming_trace_map, conn);
} else {
existing_tp = trace_info_for_connection(conn);

if (!disable_black_box_cp && correlated_requests(tp_p, existing_tp)) {
found_tp = 1;
bpf_dbg_printk("Found existing correlated tp for server request");
__builtin_memcpy(
tp_p->tp.trace_id, existing_tp->tp.trace_id, sizeof(tp_p->tp.trace_id));
__builtin_memcpy(
tp_p->tp.parent_id, existing_tp->tp.span_id, sizeof(tp_p->tp.parent_id));
}
}
found_tp = find_trace_for_server_request(conn, &tp_p->tp, tp_p->pid, tp_p->type);
}
}

Expand All @@ -307,7 +321,9 @@ static __always_inline void get_or_create_trace_info(http_connection_metadata_t
// for customers to enable it. Off by default.
if (!capture_header_buffer) {
set_trace_info_for_connection(conn, tp_p);
server_or_client_trace(meta, conn, tp_p);
if (meta) {
server_or_client_trace(meta->type, conn, tp_p);
}
return;
}

Expand Down Expand Up @@ -341,7 +357,9 @@ static __always_inline void get_or_create_trace_info(http_connection_metadata_t
#endif

set_trace_info_for_connection(conn, tp_p);
server_or_client_trace(meta, conn, tp_p);
if (meta) {
server_or_client_trace(meta->type, conn, tp_p);
}
}

#endif
13 changes: 9 additions & 4 deletions bpf/tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,21 @@ static __always_inline u64 current_immediate_epoch(u64 ts) {
return temp * NANOSECONDS_PER_IMM_EPOCH;
}

static __always_inline u8 correlated_requests(tp_info_pid_t *tp, tp_info_pid_t *existing_tp) {
static __always_inline u8 correlated_requests(tp_info_t *tp,
u32 current_pid,
u8 current_type,
tp_info_pid_t *existing_tp) {
if (!existing_tp) {
return 0;
}

// We check for correlated requests which are in order, but from different PIDs
// Same PID means that we had client port reuse, which might falsely match prior
// Same PID means that we had client port reuse (*unless one was client and the other
// was a server request, i.e. the type check), which might falsely match prior
// transaction if it happened during the same epoch.
if ((tp->tp.ts >= existing_tp->tp.ts) && (tp->pid != existing_tp->pid)) {
return current_epoch(tp->tp.ts) == current_epoch(existing_tp->tp.ts);
if ((tp->ts >= existing_tp->tp.ts) &&
((current_pid != existing_tp->pid) || (current_type != existing_tp->type))) {
return current_epoch(tp->ts) == current_epoch(existing_tp->tp.ts);
}

return 0;
Expand Down
133 changes: 133 additions & 0 deletions pkg/internal/ebpf/common/fast_cgi_detect_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package ebpfcommon

import (
"bytes"
"unsafe"

trace2 "go.opentelemetry.io/otel/trace"

"github.com/grafana/beyla/pkg/internal/request"
)

const fastCGIRequestHeaderLen = 24
const requestMethodKey = "REQUEST_METHOD"
const requestURIKey = "REQUEST_URI"
const scriptNameKey = "SCRIPT_NAME"
const responseError = 7 // FCGI_STDERR

func parseCGITable(b []byte) map[string]string {
res := map[string]string{}

for {
key := ""
val := ""
if len(b) <= 2 { // key len + val len
break
}

keyLen := int(b[0])
valLen := int(b[1])

if keyLen < 0 || valLen < 0 {
break
}

b = b[2:]

if keyLen > 0 && len(b) > keyLen {
key = string(b[:keyLen])
b = b[keyLen:]
}

if valLen > 0 && len(b) > valLen {
val = string(b[:valLen])
b = b[valLen:]
}

if key != "" {
res[key] = val
}
}

return res
}

func maybeFastCGI(b []byte) bool {
if len(b) <= fastCGIRequestHeaderLen {
return false
}

methodPos := bytes.Index(b, []byte(requestMethodKey))

return methodPos >= 0
}

func detectFastCGI(b, rb []byte) (string, string, int) {
b = b[fastCGIRequestHeaderLen:]

methodPos := bytes.Index(b, []byte(requestMethodKey))
if methodPos >= 0 {
kv := parseCGITable(b)

method, ok := kv[requestMethodKey]
if !ok {
return "", "", -1
}
uri, ok := kv[requestURIKey]
if !ok {
uri = kv[scriptNameKey]
}

// Translate the status code into HTTP, 200 OK, 500 ERR
status := 200

if len(rb) >= 2 {
if rb[1] == responseError {
status = 500
}
}

return method, uri, status
}
return "", "", -1
}

func TCPToFastCGIToSpan(trace *TCPRequestInfo, op, uri string, status int) request.Span {
peer := ""
hostname := ""
hostPort := 0

if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 {
peer, hostname = (*BPFConnInfo)(unsafe.Pointer(&trace.ConnInfo)).reqHostInfo()
hostPort = int(trace.ConnInfo.D_port)
}

reqType := request.EventTypeHTTPClient
if trace.Direction == 0 {
reqType = request.EventTypeHTTP
}

return request.Span{
Type: reqType,
Method: op,
Path: uri,
Peer: peer,
PeerPort: int(trace.ConnInfo.S_port),
Host: hostname,
HostPort: hostPort,
ContentLength: int64(trace.Len),
RequestStart: int64(trace.StartMonotimeNs),
Start: int64(trace.StartMonotimeNs),
End: int64(trace.EndMonotimeNs),
Status: status,
TraceID: trace2.TraceID(trace.Tp.TraceId),
SpanID: trace2.SpanID(trace.Tp.SpanId),
ParentSpanID: trace2.SpanID(trace.Tp.ParentId),
Flags: trace.Tp.Flags,
Pid: request.PidInfo{
HostPID: trace.Pid.HostPid,
UserPID: trace.Pid.UserPid,
Namespace: trace.Pid.Ns,
},
}
}
Loading

0 comments on commit 898ea5f

Please sign in to comment.