Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter kafka topics, redis queues and database tables based on patterns #1392

Open
esara opened this issue Nov 22, 2024 · 0 comments
Open

filter kafka topics, redis queues and database tables based on patterns #1392

esara opened this issue Nov 22, 2024 · 0 comments

Comments

@esara
Copy link
Contributor

esara commented Nov 22, 2024

Beyla currently supports filtering on http paths https://grafana.com/docs/beyla/latest/configure/options/#routes-decorator
It would be nice to have the same option to filter out certain topics, queues and tables and we dont want to instrument.

    beyla-config.yml: |
      kafka:
        ignored_topics:
          - __cluster_metadata
          - __consumer_offsets
          - __transaction_state
      redis:
        ignored_statements:
          - ping
      sql:
        ignored_tables:
          - pg_statements

Something like this

$ git diff
diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go
index cb55fba0..03e32871 100644
--- a/pkg/beyla/config.go
+++ b/pkg/beyla/config.go
@@ -16,6 +16,7 @@ import (
        "github.com/grafana/beyla/pkg/export/instrumentations"
        "github.com/grafana/beyla/pkg/export/otel"
        "github.com/grafana/beyla/pkg/export/prom"
+       common "github.com/grafana/beyla/pkg/internal/ebpf/common"
        "github.com/grafana/beyla/pkg/internal/filter"
        "github.com/grafana/beyla/pkg/internal/imetrics"
        "github.com/grafana/beyla/pkg/internal/infraolly/process"
@@ -116,6 +117,9 @@ var DefaultConfig = Config{
                },
        },
        Routes:       &transform.RoutesConfig{Unmatch: transform.UnmatchHeuristic},
+       Kafka:        &common.KafkaConfig{},
+       Redis:        &common.RedisConfig{},
+       SQL:          &common.SqlConfig{},
        NetworkFlows: defaultNetworkConfig,
        Processes: process.CollectConfig{
                RunMode:  process.RunModePrivileged,
@@ -141,6 +145,9 @@ type Config struct {
        Attributes Attributes `yaml:"attributes"`
        // Routes is an optional node. If not set, data will be directly forwarded to exporters.
        Routes       *transform.RoutesConfig       `yaml:"routes"`
+       Kafka        *common.KafkaConfig           `yaml:"kafka"`
+       Redis        *common.RedisConfig           `yaml:"redis"`
+       SQL          *common.SqlConfig             `yaml:"sql"`
        NameResolver *transform.NameResolverConfig `yaml:"name_resolver"`
        Metrics      otel.MetricsConfig            `yaml:"otel_metrics_export"`
        Traces       otel.TracesConfig             `yaml:"otel_traces_export"`
diff --git a/pkg/internal/ebpf/common/common.go b/pkg/internal/ebpf/common/common.go
index 3060a24c..8b406555 100644
--- a/pkg/internal/ebpf/common/common.go
+++ b/pkg/internal/ebpf/common/common.go
@@ -67,6 +67,32 @@ type MisclassifiedEvent struct {
        TCPInfo   *TCPRequestInfo
 }

+// KafkaConfig allows grouping Kafka topics for a given pattern.
+type KafkaConfig struct {
+       Topics             []string `yaml:"topics"`
+       IgnoreTopics       []string `yaml:"ignored_topics"`
+       Destinations       []string `yaml:"destinations"`
+       IgnoreDestinations []string `yaml:"ignored_destinations"`
+}
+
+// RedisConfig allows grouping Redis statements for a given pattern.
+type RedisConfig struct {
+       Operations       []string `yaml:"operations"`
+       IgnoreOperations []string `yaml:"ignored_operations"`
+       Statements       []string `yaml:"statements"`
+       IgnoreStatements []string `yaml:"ignored_statements"`
+}
+
+// SqlConfig allows grouping SQL statements for a given pattern.
+type SqlConfig struct {
+       Operations       []string `yaml:"operations"`
+       IgnoreOperations []string `yaml:"ignored_operations"`
+       Statements       []string `yaml:"statements"`
+       IgnoreStatements []string `yaml:"ignored_statements"`
+       Tables           []string `yaml:"tables"`
+       IgnoreTables     []string `yaml:"ignored_tables"`
+}
+
 var MisclassifiedEvents = make(chan MisclassifiedEvent)

 func ptlog() *slog.Logger { return slog.With("component", "ebpf.ProcessTracer") }
diff --git a/pkg/internal/ebpf/common/tcp_detect_transform.go b/pkg/internal/ebpf/common/tcp_detect_transform.go
index 7f6ddc5a..41b42618 100644
--- a/pkg/internal/ebpf/common/tcp_detect_transform.go
+++ b/pkg/internal/ebpf/common/tcp_detect_transform.go
@@ -3,7 +3,6 @@ package ebpfcommon
 import (
        "bytes"
        "encoding/binary"
-
        "github.com/cilium/ebpf/ringbuf"

        "github.com/grafana/beyla/pkg/internal/request"
@@ -38,7 +37,11 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (reque
        op, table, sql := detectSQLBytes(b)
        switch {
        case validSQL(op, table):
-               return TCPToSQLToSpan(&event, op, table, sql), false, nil
+
+               // Filter SQL events based on operation, table and sql statement
+               if op != "" && table != "" && sql != "" {
+                       return TCPToSQLToSpan(&event, op, table, sql), false, nil
+               }
        case isRedis(b) && isRedis(event.Rbuf[:rl]):
                op, text, ok := parseRedisRequest(string(b))

@@ -57,7 +60,10 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (reque
                                status = redisStatus(event.Rbuf[:rl])
                        }

-                       return TCPToRedisToSpan(&event, op, text, status), false, nil
+                       // Filter kafka events based on operations and queue
+                       if op != "" && text != "" {
+                               return TCPToRedisToSpan(&event, op, text, status), false, nil
+                       }
                }
        default:
                // Kafka and gRPC can look very similar in terms of bytes. We can mistake one for another.
@@ -67,7 +73,10 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (reque
                } else {
                        k, err := ProcessPossibleKafkaEvent(&event, b, event.Rbuf[:rl])
                        if err == nil {
-                               return TCPToKafkaToSpan(&event, k), false, nil
+                               // Filter kafka events based on destination and topic
+                               if k != nil && k.ClientID != "" && k.Topic != "" {
+                                       return TCPToKafkaToSpan(&event, k), false, nil
+                               }
                        }
                }
        }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants