diff --git a/sentryflow/collector/collectorHandler.go b/sentryflow/collector/collectorHandler.go index 85f885f..58a8ef3 100644 --- a/sentryflow/collector/collectorHandler.go +++ b/sentryflow/collector/collectorHandler.go @@ -3,98 +3,91 @@ package collector import ( - "errors" "fmt" - cfg "github.com/5GSEC/SentryFlow/config" - "google.golang.org/grpc" "log" "net" - "sync" + + "github.com/5gsec/SentryFlow/config" + "google.golang.org/grpc" ) -// Ch global reference for Collector Handler -var Ch *Handler +// == // + +// ColH global reference for Collector Handler +var ColH *ColHandler // init Function func init() { - Ch = NewCollectorHandler() + ColH = NewCollectorHandler() } -// Handler Structure -type Handler struct { - collectors []collectorInterface - - listener net.Listener +// ColHandler Structure +type ColHandler struct { + colService net.Listener grpcServer *grpc.Server - wg sync.WaitGroup + collectors []collectorInterface } // NewCollectorHandler Function -func NewCollectorHandler() *Handler { - ch := &Handler{ +func NewCollectorHandler() *ColHandler { + ch := &ColHandler{ collectors: make([]collectorInterface, 0), } return ch } -// InitGRPCServer Function -func (h *Handler) InitGRPCServer() error { - listenAddr := fmt.Sprintf("%s:%s", cfg.GlobalCfg.OtelGRPCListenAddr, cfg.GlobalCfg.OtelGRPCListenPort) +// == // - // Start listening - lis, err := net.Listen("tcp", listenAddr) +// StartCollector Function +func StartCollector() bool { + // Make a string with the given collector address and port + collectorService := fmt.Sprintf("%s:%s", config.GlobalConfig.CollectorAddr, config.GlobalConfig.CollectorPort) + + // Start listening gRPC port + colService, err := net.Listen("tcp", collectorService) if err != nil { - msg := fmt.Sprintf("unable to listen at %s: %v", listenAddr, err) - return errors.New(msg) + log.Fatalf("[Collector] Unable to listen at %s: %v", collectorService, err) + return false } - // Create gRPC Server, register services - server := grpc.NewServer() + ColH.colService = colService + log.Printf("[Collector] Listening Collector gRPC (%s)", collectorService) - h.listener = lis - h.grpcServer = server + // Create gRPC Service + gRPCServer := grpc.NewServer() + ColH.grpcServer = gRPCServer - // initialize collectors - err = h.initCollectors() - if err != nil { - log.Printf("[Collector] Unable to initialize collector: %v", err) - } + // initialize OpenTelemetry collector + ColH.collectors = append(ColH.collectors, newOpenTelemetryLogsServer()) + + // initialize Envoy collectors for AccessLogs and Metrics + ColH.collectors = append(ColH.collectors, newEnvoyAccessLogsServer()) + ColH.collectors = append(ColH.collectors, newEnvoyMetricsServer()) // register services - h.registerServices() + for _, col := range ColH.collectors { + col.registerService(ColH.grpcServer) + } - log.Printf("[Collector] Server listening at %s", listenAddr) - return nil -} + log.Printf("[Collector] Initialized Collector gRPC") -// initCollectors Function -func (h *Handler) initCollectors() error { - // @todo make configuration determine which collector to start or not - h.collectors = append(h.collectors, newOtelLogServer()) - h.collectors = append(h.collectors, newEnvoyMetricsServer()) - h.collectors = append(h.collectors, newEnvoyAccessLogsServer()) + // Serve gRPC Service + go ColH.grpcServer.Serve(ColH.colService) - return nil -} + log.Printf("[Collector] Serving Collector gRPC") -// registerServices Function -func (h *Handler) registerServices() { - for _, col := range h.collectors { - col.registerService(h.grpcServer) - log.Printf("[Collector] Successfully registered services") - } + return true } -// Serve Function -func (h *Handler) Serve() error { - log.Printf("[Collector] Starting gRPC server") - return h.grpcServer.Serve(h.listener) -} +// StopCollector Function +func StopCollector() bool { + ColH.grpcServer.GracefulStop() -// Stop Function -func (h *Handler) Stop() { - log.Printf("[Collector] Stopped gRPC server") - h.grpcServer.GracefulStop() + log.Printf("[Collector] Gracefully stopped Collector gRPC") + + return true } + +// == // diff --git a/sentryflow/collector/envoy.go b/sentryflow/collector/envoy.go index f5e7caf..2103d54 100644 --- a/sentryflow/collector/envoy.go +++ b/sentryflow/collector/envoy.go @@ -5,13 +5,41 @@ package collector import ( "io" "log" + "strconv" - "github.com/5GSEC/SentryFlow/core" - envoyAls "github.com/envoyproxy/go-control-plane/envoy/service/accesslog/v3" + "github.com/5gsec/SentryFlow/k8s" + "github.com/5gsec/SentryFlow/processor" + "github.com/5gsec/SentryFlow/protobuf" + "github.com/5gsec/SentryFlow/types" + + envoyAccLogsData "github.com/envoyproxy/go-control-plane/envoy/data/accesslog/v3" + envoyAccLogs "github.com/envoyproxy/go-control-plane/envoy/service/accesslog/v3" envoyMetrics "github.com/envoyproxy/go-control-plane/envoy/service/metrics/v3" + "google.golang.org/grpc" ) +// == // + +// EnvoyAccessLogsServer Structure +type EnvoyAccessLogsServer struct { + envoyAccLogs.UnimplementedAccessLogServiceServer + collectorInterface +} + +// newEnvoyAccessLogsServer Function +func newEnvoyAccessLogsServer() *EnvoyAccessLogsServer { + ret := &EnvoyAccessLogsServer{} + return ret +} + +// registerService Function +func (evyAccLogs *EnvoyAccessLogsServer) registerService(server *grpc.Server) { + envoyAccLogs.RegisterAccessLogServiceServer(server, evyAccLogs) +} + +// == // + // EnvoyMetricsServer Structure type EnvoyMetricsServer struct { envoyMetrics.UnimplementedMetricsServiceServer @@ -25,78 +53,175 @@ func newEnvoyMetricsServer() *EnvoyMetricsServer { } // registerService Function -func (ems *EnvoyMetricsServer) registerService(server *grpc.Server) { - envoyMetrics.RegisterMetricsServiceServer(server, ems) +func (evyMetrics *EnvoyMetricsServer) registerService(server *grpc.Server) { + envoyMetrics.RegisterMetricsServiceServer(server, evyMetrics) } -// StreamMetrics Function -func (ems *EnvoyMetricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error { - event, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - log.Printf("[Envoy] Something went on wrong when receiving event: %v", err) - return err +// == // + +// generateAPILogsFromEnvoy Function +func generateAPILogsFromEnvoy(entry *envoyAccLogsData.HTTPAccessLogEntry) *protobuf.APILog { + comm := entry.GetCommonProperties() + timeStamp := comm.GetStartTime().Seconds + + srcInform := entry.GetCommonProperties().GetDownstreamRemoteAddress().GetSocketAddress() + srcIP := srcInform.GetAddress() + srcPort := strconv.Itoa(int(srcInform.GetPortValue())) + src := k8s.LookupK8sResource(srcIP) + + dstInform := entry.GetCommonProperties().GetUpstreamRemoteAddress().GetSocketAddress() + dstIP := dstInform.GetAddress() + dstPort := strconv.Itoa(int(dstInform.GetPortValue())) + dst := k8s.LookupK8sResource(dstIP) + + request := entry.GetRequest() + response := entry.GetResponse() + + protocol := entry.GetProtocolVersion().String() + method := request.GetRequestMethod().String() + path := request.GetPath() + resCode := response.GetResponseCode().GetValue() + + envoyAPILog := &protobuf.APILog{ + Id: 0, // @todo zero for now + TimeStamp: strconv.FormatInt(timeStamp, 10), + + SrcNamespace: src.Namespace, + SrcName: src.Name, + SrcLabel: src.Labels, + SrcIP: srcIP, + SrcPort: srcPort, + SrcType: types.K8sResourceTypeToString(src.Type), + + DstNamespace: dst.Namespace, + DstName: dst.Name, + DstLabel: dst.Labels, + DstIP: dstIP, + DstPort: dstPort, + DstType: types.K8sResourceTypeToString(dst.Type), + + Protocol: protocol, + Method: method, + Path: path, + ResponseCode: int32(resCode), } - err = event.ValidateAll() - if err != nil { - log.Printf("[Envoy] Failed to validate stream: %v", err) + return envoyAPILog +} + +// StreamAccessLogs Function +func (evyAccLogs *EnvoyAccessLogsServer) StreamAccessLogs(stream envoyAccLogs.AccessLogService_StreamAccessLogsServer) error { + for { + event, err := stream.Recv() + if err == io.EOF { + return nil + } else if err != nil { + log.Printf("[EnvoyAPILogs] Something went on wrong when receiving event: %v", err) + return err + } + + if event.GetHttpLogs() != nil { + for _, entry := range event.GetHttpLogs().LogEntry { + envoyAPILog := generateAPILogsFromEnvoy(entry) + processor.InsertAPILog(envoyAPILog) + } + } } +} - // @todo parse this event entry into our format - identifier := event.GetIdentifier() - identifier.GetNode().GetMetadata() +// == // - if identifier != nil { - log.Printf("[Envoy] Received EnvoyMetric - ID: %s, %s", identifier.GetNode().GetId(), identifier.GetNode().GetCluster()) - metaData := identifier.GetNode().GetMetadata().AsMap() +// generateMetricsFromEnvoy Function +func generateMetricsFromEnvoy(event *envoyMetrics.StreamMetricsMessage, metaData map[string]interface{}) *protobuf.EnvoyMetrics { + envoyMetrics := &protobuf.EnvoyMetrics{ + TimeStamp: "", - envoyMetric := core.GenerateMetricFromEnvoy(event, metaData) + Namespace: metaData["NAMESPACE"].(string), + Name: metaData["NAME"].(string), + IPAddress: metaData["INSTANCE_IPS"].(string), + Labels: k8s.LookupK8sResource(metaData["INSTANCE_IPS"].(string)).Labels, - core.Lh.InsertLog(envoyMetric) + Metrics: make(map[string]*protobuf.MetricValue), } - return nil -} + envoyMetrics.Metrics["GAUGE"] = &protobuf.MetricValue{ + Value: make(map[string]string), + } -// EnvoyAccessLogsServer Structure -type EnvoyAccessLogsServer struct { - envoyAls.UnimplementedAccessLogServiceServer - collectorInterface -} + envoyMetrics.Metrics["COUNTER"] = &protobuf.MetricValue{ + Value: make(map[string]string), + } -// newEnvoyAccessLogsServer Function -func newEnvoyAccessLogsServer() *EnvoyAccessLogsServer { - ret := &EnvoyAccessLogsServer{} - return ret -} + envoyMetrics.Metrics["HISTOGRAM"] = &protobuf.MetricValue{ + Value: make(map[string]string), + } -// registerService Function -func (eas *EnvoyAccessLogsServer) registerService(server *grpc.Server) { - envoyAls.RegisterAccessLogServiceServer(server, eas) -} + envoyMetrics.Metrics["SUMMARY"] = &protobuf.MetricValue{ + Value: make(map[string]string), + } -// StreamAccessLogs Function -func (eas *EnvoyAccessLogsServer) StreamAccessLogs(stream envoyAls.AccessLogService_StreamAccessLogsServer) error { - for { - event, err := stream.Recv() - if err == io.EOF { - return nil - } + for _, metric := range event.GetEnvoyMetrics() { + metricType := metric.GetType().String() + metricName := metric.GetName() - if err != nil { - log.Printf("[Envoy] Something went on wrong when receiving event: %v", err) - return err + if envoyMetrics.Metrics[metricType].Value == nil { + continue } - // Check HTTP logs - if event.GetHttpLogs() != nil { - for _, entry := range event.GetHttpLogs().LogEntry { - envoyAccessLog := core.GenerateAccessLogsFromEnvoy(entry) - core.Lh.InsertLog(envoyAccessLog) + for _, metricDetail := range metric.GetMetric() { + var metricValue string + + if envoyMetrics.TimeStamp == "" { + envoyMetrics.TimeStamp = strconv.FormatInt(metricDetail.GetTimestampMs(), 10) + } + + if metricType == "GAUGE" { + metricValue = strconv.FormatFloat(metricDetail.GetGauge().GetValue(), 'f', -1, 64) } + + if metricType == "COUNTER" { + metricValue = strconv.FormatFloat(metricDetail.GetCounter().GetValue(), 'f', -1, 64) + } + + if metricType == "HISTOGRAM" { + metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) + } + + if metricType == "SUMMARY" { + metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) + } + + envoyMetrics.Metrics[metricType].Value[metricName] = metricValue } } + + return envoyMetrics } + +// StreamMetrics Function +func (evyMetrics *EnvoyMetricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error { + event, err := stream.Recv() + if err == io.EOF { + return nil + } else if err != nil { + log.Printf("[EnvoyMetrics] Failed to receive an event: %v", err) + return err + } + + err = event.ValidateAll() + if err != nil { + log.Printf("[EnvoyMetrics] Failed to validate an event: %v", err) + } + + identifier := event.GetIdentifier() + if identifier != nil { + log.Printf("[EnvoyMetrics] Received EnvoyMetrics (%s, %s)", identifier.GetNode().GetId(), identifier.GetNode().GetCluster()) + metaData := identifier.GetNode().GetMetadata().AsMap() + envoyMetrics := generateMetricsFromEnvoy(event, metaData) + processor.InsertMetrics(envoyMetrics) + } + + return nil +} + +// == // diff --git a/sentryflow/collector/interface.go b/sentryflow/collector/interface.go index 154d83c..a610c4f 100644 --- a/sentryflow/collector/interface.go +++ b/sentryflow/collector/interface.go @@ -2,9 +2,15 @@ package collector -import "google.golang.org/grpc" +import ( + "google.golang.org/grpc" +) + +// == // // collectorInterface Interface type collectorInterface interface { registerService(server *grpc.Server) } + +// == // diff --git a/sentryflow/collector/opentelemetry.go b/sentryflow/collector/opentelemetry.go index a69439b..532e1ae 100644 --- a/sentryflow/collector/opentelemetry.go +++ b/sentryflow/collector/opentelemetry.go @@ -4,41 +4,140 @@ package collector import ( "context" - "github.com/5GSEC/SentryFlow/core" + "strconv" + "strings" + + "github.com/5gsec/SentryFlow/k8s" + "github.com/5gsec/SentryFlow/processor" + "github.com/5gsec/SentryFlow/protobuf" + "github.com/5gsec/SentryFlow/types" otelLogs "go.opentelemetry.io/proto/otlp/collector/logs/v1" "google.golang.org/grpc" ) -// OtelLogServer structure -type OtelLogServer struct { +// == // + +// OpenTelemetryLogsServer structure +type OpenTelemetryLogsServer struct { otelLogs.UnimplementedLogsServiceServer collectorInterface } -// newOtelLogServer Function -func newOtelLogServer() *OtelLogServer { - ret := &OtelLogServer{} +// newOpenTelemetryLogsServer Function +func newOpenTelemetryLogsServer() *OpenTelemetryLogsServer { + ret := &OpenTelemetryLogsServer{} return ret } // registerService Function -func (ols *OtelLogServer) registerService(server *grpc.Server) { - otelLogs.RegisterLogsServiceServer(server, ols) +func (otlLogs *OpenTelemetryLogsServer) registerService(server *grpc.Server) { + otelLogs.RegisterLogsServiceServer(server, otlLogs) } -// Export Function -func (ols *OtelLogServer) Export(_ context.Context, req *otelLogs.ExportLogsServiceRequest) (*otelLogs.ExportLogsServiceResponse, error) { - // This is for Log.Export in OpenTelemetry format - als := core.GenerateAccessLogsFromOtel(req.String()) +// == // + +// generateAPILogsFromOtel Function +func generateAPILogsFromOtel(logText string) []*protobuf.APILog { + apiLogs := make([]*protobuf.APILog, 0) + + // Preprocess redundant chars + logText = strings.ReplaceAll(logText, `\"`, "") + logText = strings.ReplaceAll(logText, `}`, "") + + // Split logs by log_records, this is single access log instance + parts := strings.Split(logText, "log_records") + if len(parts) == 0 { + return nil + } + + // Ignore the first entry (the metadata "resource_logs:{resource:{ scope_logs:{" part) + for _, accessLog := range parts[0:] { + var srcIP string + var srcPort string + var dstIP string + var dstPort string + + if len(accessLog) == 0 { + continue + } + + index := strings.Index(accessLog, "string_value:\"") + if index == -1 { + continue + } + + words := strings.Fields(accessLog[index+len("string_value:\""):]) + + timeStamp := words[0] + method := words[1] + path := words[2] + protocol := words[3] + resCode, _ := strconv.ParseInt(words[4], 10, 64) + + srcInform := words[21] - for _, al := range als { - core.Lh.InsertLog(al) + // Extract the left and right words based on the colon delimiter (ADDR:PORT) + colonIndex := strings.LastIndex(srcInform, ":") + if colonIndex > 0 && colonIndex < len(srcInform)-1 { + srcIP = strings.TrimSpace(srcInform[:colonIndex]) + srcPort = strings.TrimSpace(srcInform[colonIndex+1:]) + } + src := k8s.LookupK8sResource(srcIP) + + dstInform := words[20] + + // Extract the left and right words based on the colon delimiter (ADDR:PORT) + colonIndex = strings.LastIndex(dstInform, ":") + if colonIndex > 0 && colonIndex < len(dstInform)-1 { + dstIP = strings.TrimSpace(dstInform[:colonIndex]) + dstPort = strings.TrimSpace(dstInform[colonIndex+1:]) + } + dst := k8s.LookupK8sResource(dstIP) + + // Create APILog + apiLog := protobuf.APILog{ + Id: 0, // @todo zero for now + TimeStamp: timeStamp, + + SrcNamespace: src.Namespace, + SrcName: src.Name, + SrcLabel: src.Labels, + SrcIP: srcIP, + SrcPort: srcPort, + SrcType: types.K8sResourceTypeToString(src.Type), + + DstNamespace: dst.Namespace, + DstName: dst.Name, + DstLabel: dst.Labels, + DstIP: dstIP, + DstPort: dstPort, + DstType: types.K8sResourceTypeToString(dst.Type), + + Protocol: protocol, + Method: method, + Path: path, + ResponseCode: int32(resCode), + } + + apiLogs = append(apiLogs, &apiLog) + } + + return apiLogs +} + +// Export Function for Log.Export in OpenTelemetry format +func (otlLogs *OpenTelemetryLogsServer) Export(_ context.Context, req *otelLogs.ExportLogsServiceRequest) (*otelLogs.ExportLogsServiceResponse, error) { + apiLogs := generateAPILogsFromOtel(req.String()) + for _, apiLog := range apiLogs { + processor.InsertAPILog(apiLog) } - // For now, we will not consider partial success + // @todo not consider partial success ret := otelLogs.ExportLogsServiceResponse{ PartialSuccess: nil, } return &ret, nil } + +// == // diff --git a/sentryflow/config/config.go b/sentryflow/config/config.go index 12bb9dd..4e7af74 100644 --- a/sentryflow/config/config.go +++ b/sentryflow/config/config.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "log" - "os" "strings" "github.com/spf13/viper" @@ -14,30 +13,23 @@ import ( // SentryFlowConfig structure type SentryFlowConfig struct { - OtelGRPCListenAddr string // IP address to use for OTEL gRPC - OtelGRPCListenPort string // Port to use for OTEL gRPC + CollectorAddr string // Address for Collector gRPC + CollectorPort string // Port for Collector gRPC - CustomExportListenAddr string // IP address to use for custom exporter gRPC - CustomExportListenPort string // Port to use for custom exporter gRPC + ExporterAddr string // IP address to use for exporter gRPC + ExporterPort string // Port to use for exporter gRPC - PatchNamespace bool // Enable/Disable patching namespace for Istio injection - PatchRestartDeployments bool // Enable/Disable restarting deployments after patching + PatchingNamespaces bool // Enable/Disable patching namespaces with 'istio-injection' + RestartingPatchedDeployments bool // Enable/Disable restarting deployments after patching - AIEngineService string - AIEngineServicePort string - AIEngineBatchSize int + AggregationPeriod int // Period for aggregating metrics + CleanUpPeriod int // Period for cleaning up outdated metrics - MetricsDBFileName string // String value of MetricsDB file (sqlite3 db file) - MetricsDBAggregationTime int // Value of APILog Aggregation Time - MetricsDBClearTime int // Value of APIMetric DB Clear time - APIMetricsSendTime int // Value of APIMetric send time - - CollectorEnableOpenTelemetry bool // Enable/Disable OpenTelemetry Collector - Debug bool // Enable/Disable SentryFlow debug mode + Debug bool // Enable/Disable SentryFlow debug mode } -// GlobalCfg Global configuration for SentryFlow -var GlobalCfg SentryFlowConfig +// GlobalConfig Global configuration for SentryFlow +var GlobalConfig SentryFlowConfig // init Function func init() { @@ -46,39 +38,35 @@ func init() { // Config const const ( - OtelGRPCListenAddr string = "otelGRPCListenAddr" - OtelGRPCListenPort string = "otelGRPCListenPort" - CustomExportListenAddr string = "customExportListenAddr" - CustomExportListenPort string = "customExportListenPort" - PatchNamespace string = "patchNamespace" - PatchRestartDeployments string = "patchRestartDeployments" - AIEngineService string = "aiEngineService" - AIEngineServicePort string = "aiEngineServicePort" - AIEngineBatchSize string = "aiEngineBatchSize" - MetricsDBFileName string = "metricsDBFileName" - MetricsDBAggregationTime string = "metricsDBAggregationTime" - MetricsDBClearTime string = "metricsDBClearTime" - APIMetricsSendTime string = "apiMetricsSendTime" - CollectorEnableOpenTelemetry string = "collectorEnableOpenTelemetry" - Debug string = "debug" + CollectorAddr string = "collectorAddr" + CollectorPort string = "collectorPort" + + ExporterAddr string = "exporterAddr" + ExporterPort string = "exporterPort" + + PatchingNamespaces string = "patchingNamespaces" + RestartingPatchedDeployments string = "restartingPatchedDeployments" + + AggregationPeriod string = "aggregationPeriod" + CleanUpPeriod string = "cleanUpPeriod" + + Debug string = "debug" ) func readCmdLineParams() { - otelGRPCListenAddrStr := flag.String(OtelGRPCListenAddr, "0.0.0.0", "OTEL gRPC server listen address") - otelGRPCListenPortStr := flag.String(OtelGRPCListenPort, "4317", "OTEL gRPC server listen port") - customExportListenAddrStr := flag.String(CustomExportListenAddr, "0.0.0.0", "Custom export gRPC server listen address") - customExportListenPortStr := flag.String(CustomExportListenPort, "8080", "Custom export gRPC server listen port") - patchNamespaceB := flag.Bool(PatchNamespace, false, "Enable/Disable patching Istio injection to all namespaces") - patchRestartDeploymentsB := flag.Bool(PatchRestartDeployments, false, "Enable/Disable restarting deployments in all namespaces") - aiEngineServiceStr := flag.String(AIEngineService, "ai-engine.sentryflow.svc.cluster.local", "Service address for SentryFlow AI Engine") - aiEngineServicePortStr := flag.String(AIEngineServicePort, "5000", "Service Port for SentryFlow AI Engine") - aiEngineBatchSizeInt := flag.Int(AIEngineBatchSize, 5, "Batch size for SentryFlow AI Engine") - metricsDBFileNameStr := flag.String(MetricsDBFileName, "/etc/sentryflow/metrics.db", "File name for local metrics DB") - metricsDBAggregationTimeInt := flag.Int(MetricsDBAggregationTime, 10, "Term time between aggregations") - metricsDBClearTimeInt := flag.Int(MetricsDBClearTime, 600, "Metrics DB Clear Time") - APIMetricsSendTimeInt := flag.Int(APIMetricsSendTime, 10, "APIMetric send term") - collectorEnableOpenTelemetryB := flag.Bool(CollectorEnableOpenTelemetry, true, "Enable/Disable OpenTelemetry Collector") - configDebugB := flag.Bool(Debug, false, "Enable/Disable debugging mode using logs") + collectorAddrStr := flag.String(CollectorAddr, "0.0.0.0", "Address for Collector gRPC") + collectorPortStr := flag.String(CollectorPort, "4317", "Port for Collector gRPC") + + exporterAddrStr := flag.String(ExporterAddr, "0.0.0.0", "Address for Exporter gRPC") + exporterPortStr := flag.String(ExporterPort, "8080", "Port for Exporter gRPC") + + patchingNamespacesB := flag.Bool(PatchingNamespaces, false, "Enable patching 'istio-injection' to all namespaces") + restartingPatchedDeploymentsB := flag.Bool(RestartingPatchedDeployments, false, "Enable restarting the deployments in all patched namespaces") + + aggregationPeriodInt := flag.Int(AggregationPeriod, 1, "Period for aggregating metrics") + cleanUpPeriodInt := flag.Int(CleanUpPeriod, 5, "Period for cleanning up outdated metrics") + + configDebugB := flag.Bool(Debug, false, "Enable debugging mode") var flags []string flag.VisitAll(func(f *flag.Flag) { @@ -89,20 +77,18 @@ func readCmdLineParams() { flag.Parse() - viper.SetDefault(OtelGRPCListenAddr, *otelGRPCListenAddrStr) - viper.SetDefault(OtelGRPCListenPort, *otelGRPCListenPortStr) - viper.SetDefault(CustomExportListenAddr, *customExportListenAddrStr) - viper.SetDefault(CustomExportListenPort, *customExportListenPortStr) - viper.SetDefault(PatchNamespace, *patchNamespaceB) - viper.SetDefault(PatchRestartDeployments, *patchRestartDeploymentsB) - viper.SetDefault(AIEngineService, *aiEngineServiceStr) - viper.SetDefault(AIEngineServicePort, *aiEngineServicePortStr) - viper.SetDefault(AIEngineBatchSize, *aiEngineBatchSizeInt) - viper.SetDefault(MetricsDBFileName, *metricsDBFileNameStr) - viper.SetDefault(MetricsDBAggregationTime, *metricsDBAggregationTimeInt) - viper.SetDefault(MetricsDBClearTime, *metricsDBClearTimeInt) - viper.SetDefault(APIMetricsSendTime, *APIMetricsSendTimeInt) - viper.SetDefault(CollectorEnableOpenTelemetry, *collectorEnableOpenTelemetryB) + viper.SetDefault(CollectorAddr, *collectorAddrStr) + viper.SetDefault(CollectorPort, *collectorPortStr) + + viper.SetDefault(ExporterAddr, *exporterAddrStr) + viper.SetDefault(ExporterPort, *exporterPortStr) + + viper.SetDefault(PatchingNamespaces, *patchingNamespacesB) + viper.SetDefault(RestartingPatchedDeployments, *restartingPatchedDeploymentsB) + + viper.SetDefault(AggregationPeriod, *aggregationPeriodInt) + viper.SetDefault(CleanUpPeriod, *cleanUpPeriodInt) + viper.SetDefault(Debug, *configDebugB) } @@ -114,26 +100,21 @@ func LoadConfig() error { // Read environment variable, those are upper-cased viper.AutomaticEnv() - // todo: read configuration from config file - _ = os.Getenv("SENTRYFLOW_CFG") - - GlobalCfg.OtelGRPCListenAddr = viper.GetString(OtelGRPCListenAddr) - GlobalCfg.OtelGRPCListenPort = viper.GetString(OtelGRPCListenPort) - GlobalCfg.CustomExportListenAddr = viper.GetString(CustomExportListenAddr) - GlobalCfg.CustomExportListenPort = viper.GetString(CustomExportListenPort) - GlobalCfg.PatchNamespace = viper.GetBool(PatchNamespace) - GlobalCfg.PatchRestartDeployments = viper.GetBool(PatchRestartDeployments) - GlobalCfg.AIEngineService = viper.GetString(AIEngineService) - GlobalCfg.AIEngineServicePort = viper.GetString(AIEngineServicePort) - GlobalCfg.AIEngineBatchSize = viper.GetInt(AIEngineBatchSize) - GlobalCfg.MetricsDBFileName = viper.GetString(MetricsDBFileName) - GlobalCfg.MetricsDBAggregationTime = viper.GetInt(MetricsDBAggregationTime) - GlobalCfg.MetricsDBClearTime = viper.GetInt(MetricsDBClearTime) - GlobalCfg.APIMetricsSendTime = viper.GetInt(APIMetricsSendTime) - GlobalCfg.CollectorEnableOpenTelemetry = viper.GetBool(CollectorEnableOpenTelemetry) - GlobalCfg.Debug = viper.GetBool(Debug) - - log.Printf("Configuration [%+v]", GlobalCfg) + GlobalConfig.CollectorAddr = viper.GetString(CollectorAddr) + GlobalConfig.CollectorPort = viper.GetString(CollectorPort) + + GlobalConfig.ExporterAddr = viper.GetString(ExporterAddr) + GlobalConfig.ExporterPort = viper.GetString(ExporterPort) + + GlobalConfig.PatchingNamespaces = viper.GetBool(PatchingNamespaces) + GlobalConfig.RestartingPatchedDeployments = viper.GetBool(RestartingPatchedDeployments) + + GlobalConfig.AggregationPeriod = viper.GetInt(AggregationPeriod) + GlobalConfig.CleanUpPeriod = viper.GetInt(CleanUpPeriod) + + GlobalConfig.Debug = viper.GetBool(Debug) + + log.Printf("Configuration [%+v]", GlobalConfig) return nil } diff --git a/sentryflow/core/k8sHandler.go b/sentryflow/core/k8sHandler.go deleted file mode 100644 index 07f7931..0000000 --- a/sentryflow/core/k8sHandler.go +++ /dev/null @@ -1,544 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package core - -import ( - "context" - "log" - "sync" - "time" - - "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/types" - "gopkg.in/yaml.v2" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -// K8s global reference for Kubernetes Handler -var K8s *K8sHandler - -// init Function -func init() { - K8s = NewK8sHandler() -} - -// K8sHandler Structure -type K8sHandler struct { - config *rest.Config - clientSet *kubernetes.Clientset - - listWatchers map[string]*cache.ListWatch - informers map[string]cache.Controller - podMap map[string]*corev1.Pod // This map is NOT thread safe, meaning that race condition might occur - svcMap map[string]*corev1.Service // This map is NOT thread safe, meaning that race condition might occur -} - -// NewK8sHandler Function -func NewK8sHandler() *K8sHandler { - kh := &K8sHandler{ - listWatchers: make(map[string]*cache.ListWatch), - podMap: make(map[string]*corev1.Pod), - svcMap: make(map[string]*corev1.Service), - informers: make(map[string]cache.Controller), - } - - return kh -} - -// InitK8sClient Function -func (kh *K8sHandler) InitK8sClient() bool { - var err error - - // Initialize in cluster config - kh.config, err = rest.InClusterConfig() - if err != nil { - return false - } - - // Initialize Kubernetes clientSet - kh.clientSet, err = kubernetes.NewForConfig(kh.config) - if err != nil { - return false - } - - watchTargets := []string{"pods", "services"} - - // Look for existing resources in the cluster, create map - kh.initExistingResources() - - // Initialize watchers and informers for services and pods - // This will not run the informers yet - kh.initWatchers(watchTargets) - kh.initInformers() - - return true -} - -// initWatchers initializes watchers for pods and services in cluster -func (kh *K8sHandler) initWatchers(watchTargets []string) { - // Initialize watch for pods and services - for _, target := range watchTargets { - watcher := cache.NewListWatchFromClient( - kh.clientSet.CoreV1().RESTClient(), - target, - corev1.NamespaceAll, - fields.Everything(), - ) - kh.listWatchers[target] = watcher - } -} - -// initExistingResources will create a mapping table for existing services and pods into IPs -// This is required since informers are NOT going to see existing resources until they are updated, created or deleted -// Todo: Refactor this function, this is kind of messy -func (kh *K8sHandler) initExistingResources() { - // List existing Pods - podList, err := kh.clientSet.CoreV1().Pods(corev1.NamespaceAll).List(context.TODO(), v1.ListOptions{}) - if err != nil { - log.Print("Error listing Pods:", err.Error()) - } - - // Add existing Pods to the podMap - for _, pod := range podList.Items { - currentPod := pod - kh.podMap[pod.Status.PodIP] = ¤tPod - log.Printf("[K8s] Add existing pod %s: %s/%s", pod.Status.PodIP, pod.Namespace, pod.Name) - } - - // List existing Services - serviceList, err := kh.clientSet.CoreV1().Services(corev1.NamespaceAll).List(context.TODO(), v1.ListOptions{}) - if err != nil { - log.Print("Error listing Services:", err.Error()) - } - - // Add existing Services to the svcMap - for _, service := range serviceList.Items { - currentService := service // This will solve G601 for gosec - - // Check if the service has a LoadBalancer type - if service.Spec.Type == "LoadBalancer" { - for _, lbIngress := range service.Status.LoadBalancer.Ingress { - lbIP := lbIngress.IP - if lbIP != "" { - kh.svcMap[lbIP] = ¤tService - log.Printf("[K8s] Add existing service (LoadBalancer) %s: %s/%s", lbIP, service.Namespace, service.Name) - } - } - } else { - kh.svcMap[service.Spec.ClusterIP] = ¤tService - if len(service.Spec.ExternalIPs) != 0 { - for _, eIP := range service.Spec.ExternalIPs { - kh.svcMap[eIP] = ¤tService - log.Printf("[K8s] Add existing service %s: %s/%s", eIP, service.Namespace, service.Name) - } - } - } - } -} - -// initInformers initializes informers for services and pods in cluster -func (kh *K8sHandler) initInformers() { - // Create Pod controller informer - _, pc := cache.NewInformer( - kh.listWatchers["pods"], - &corev1.Pod{}, - time.Second*0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { // Add pod information - pod := obj.(*corev1.Pod) - kh.podMap[pod.Status.PodIP] = pod - }, - UpdateFunc: func(oldObj, newObj interface{}) { // Update pod information - newPod := newObj.(*corev1.Pod) - kh.podMap[newPod.Status.PodIP] = newPod - }, - DeleteFunc: func(obj interface{}) { // Remove deleted pod information - pod := obj.(*corev1.Pod) - delete(kh.podMap, pod.Status.PodIP) - }, - }, - ) - - kh.informers["pods"] = pc - - // Create Service controller informer - _, sc := cache.NewInformer( - kh.listWatchers["services"], - &corev1.Service{}, - time.Second*0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { // Add service information - service := obj.(*corev1.Service) - - if service.Spec.Type == "LoadBalancer" { - for _, lbIngress := range service.Status.LoadBalancer.Ingress { - lbIP := lbIngress.IP - if lbIP != "" { - kh.svcMap[lbIP] = service - } - } - } else { - kh.svcMap[service.Spec.ClusterIP] = service - if len(service.Spec.ExternalIPs) != 0 { - for _, eIP := range service.Spec.ExternalIPs { - kh.svcMap[eIP] = service - } - } - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { // Update service information - newService := newObj.(*corev1.Service) - if newService.Spec.Type == "LoadBalancer" { - for _, lbIngress := range newService.Status.LoadBalancer.Ingress { - lbIP := lbIngress.IP - if lbIP != "" { - kh.svcMap[lbIP] = newService - } - } - } else { - kh.svcMap[newService.Spec.ClusterIP] = newService - if len(newService.Spec.ExternalIPs) != 0 { - for _, eIP := range newService.Spec.ExternalIPs { - kh.svcMap[eIP] = newService - } - } - } - }, - DeleteFunc: func(obj interface{}) { - service := obj.(*corev1.Service) - if service.Spec.Type == "LoadBalancer" { - for _, lbIngress := range service.Status.LoadBalancer.Ingress { - lbIP := lbIngress.IP - if lbIP != "" { - delete(kh.svcMap, lbIP) - } - } - } else { - delete(kh.svcMap, service.Spec.ClusterIP) // Remove deleted service information - if len(service.Spec.ExternalIPs) != 0 { - for _, eIP := range service.Spec.ExternalIPs { - delete(kh.svcMap, eIP) - } - } - } - }, - }, - ) - - kh.informers["services"] = sc -} - -// RunInformers starts running informers -func (kh *K8sHandler) RunInformers(stopCh chan struct{}, wg *sync.WaitGroup) { - wg.Add(1) - for name, informer := range kh.informers { - name := name - informer := informer - go func() { - log.Printf("[K8s] Started informers for %s", name) - informer.Run(stopCh) - - defer wg.Done() - }() - } - - log.Printf("[K8s] Started all informers") -} - -// lookupIPAddress Function -func (kh *K8sHandler) lookupIPAddress(ipAddr string) interface{} { - // Look for pod map first - pod, ok := kh.podMap[ipAddr] - if ok { - return pod - } - - // Look for service map - service, ok := kh.svcMap[ipAddr] - if ok { - return service - } - - return nil -} - -// LookupNetworkedResource Function -func LookupNetworkedResource(srcIP string) types.K8sNetworkedResource { - ret := types.K8sNetworkedResource{ - Name: "Unknown", - Namespace: "Unknown", - Labels: make(map[string]string), - Type: types.K8sResourceTypeUnknown, - } - - // Find Kubernetes resource from source IP (service or a pod) - raw := K8s.lookupIPAddress(srcIP) - - // Currently supports Service or Pod - switch raw.(type) { - case *corev1.Pod: - pod, ok := raw.(*corev1.Pod) - if ok { - ret.Name = pod.Name - ret.Namespace = pod.Namespace - ret.Labels = pod.Labels - ret.Type = types.K8sResourceTypePod - } - case *corev1.Service: - svc, ok := raw.(*corev1.Service) - if ok { - ret.Name = svc.Name - ret.Namespace = svc.Namespace - ret.Labels = svc.Labels - ret.Type = types.K8sResourceTypeService - } - default: - ret.Type = types.K8sResourceTypeUnknown - } - - return ret -} - -// PatchIstioConfigMap patches the Istio's configmap for meshConfig -// This will make istio know that there is an exporter with envoyOtelAls -func (kh *K8sHandler) PatchIstioConfigMap() error { - // Get the ConfigMap istio-system/istio - configMap, err := kh.clientSet.CoreV1(). - ConfigMaps("istio-system"). - Get(context.Background(), "istio", v1.GetOptions{}) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to retrieve configmap istio-system/istio :%v", err) - return err - } - - // Define a map to represent the structure of the mesh configuration - var meshConfig map[string]interface{} - - // Unmarshal the YAML string into the map - meshConfigStr := configMap.Data["mesh"] - err = yaml.Unmarshal([]byte(meshConfigStr), &meshConfig) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to unmarshall configmap istio-system/istio :%v", err) - return err - } - - _, eeaExist := meshConfig["enableEnvoyAccessLogService"] - if eeaExist { - log.Printf("Overwrite the contents of \"enableEnvoyAccessLogService\"") - } - meshConfig["enableEnvoyAccessLogService"] = true - - _, ealExist := meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyAccessLogService"] - if ealExist { - log.Printf("Overwrite the contents of \"defaultConfig.envoyAccessLogService\"") - } - meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyAccessLogService"] = map[string]string{ - "address": "sentryflow.sentryflow.svc.cluster.local:4317", - } - - _, emExist := meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyMetricsService"] - if emExist { - log.Printf("Overwrite the contents of \"defaultConfig.envoyMetricsService\"") - } - meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyMetricsService"] = map[string]string{ - "address": "sentryflow.sentryflow.svc.cluster.local:4317", - } - - // Work with defaultProviders.accessLogs - dp, exists := meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"] - if !exists { // Add defaultProviders.accessLogs if it does not exist - meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"] = []string{"sentryflow"} - } else { // Just add a new entry sentryflow if it exists - dpSlice := dp.([]interface{}) // @todo find better solution for this - duplicate := false - for _, entry := range dpSlice { - if entry == "sentryflow" { - // If "sentryflow" already exists, do nothing - log.Printf("[Patcher] istio-system/istio ConfigMap has " + - "sentryflow under defaultProviders.accessLogs, ignoring... ") - duplicate = true - break - } - } - - // If "sentryflow" does not exist, append it - if !duplicate { - dpSlice = append(dpSlice, "sentryflow") - meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"] = dpSlice - } - } - - // ExtensionProvider for our service - eps := map[interface{}]interface{}{ - "name": "sentryflow", - "envoyOtelAls": map[interface{}]interface{}{ - "service": "sentryflow.sentryflow.svc.cluster.local", - "port": config.GlobalCfg.OtelGRPCListenPort, - }, - } - - // Work with extensionProviders - ep, exists := meshConfig["extensionProviders"] - if !exists { - // Create extensionProviders as a slice containing only the eps map - meshConfig["extensionProviders"] = []map[interface{}]interface{}{eps} - } else { - // Check if eps already exists in extensionProviders - epSlice, ok := ep.([]interface{}) - if !ok { - // handle the case where ep is not []interface{} - log.Printf("[Patcher] istio-system/istio ConfigMap extensionProviders has unexpected type") - } - - duplicate := false - for _, entry := range epSlice { - entryMap, ok := entry.(map[interface{}]interface{}) - if !ok { - // handle the case where an entry is not map[interface{}]interface{} - log.Printf("[Patcher] istio-system/istio ConfigMap extensionProviders entry has unexpected type") - } - if entryMap["name"] == eps["name"] { - // If "sentryflow" already exists, do nothing - log.Printf("[Patcher] istio-system/istio ConfigMap has sentryflow under extensionProviders, ignoring... ") - duplicate = true - break - } - } - - // Append eps to the existing slice - if !duplicate { - meshConfig["extensionProviders"] = append(ep.([]map[interface{}]interface{}), eps) - } - } - - // Update the ConfigMap data with the modified meshConfig - updatedMeshConfig, err := yaml.Marshal(meshConfig) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to marshal updated meshConfig to YAML: %v", err) - return err - } - - // Convert the []byte to string - configMap.Data["mesh"] = string(updatedMeshConfig) - - // Preview changes, for debugging - if config.GlobalCfg.Debug { - log.Printf("[PATCH] Patching istio-system/istio ConfigMap as: \n%v", configMap) - } - - // Patch the ConfigMap back to the cluster - updatedConfigMap, err := kh.clientSet.CoreV1(). - ConfigMaps("istio-system"). - Update(context.Background(), configMap, v1.UpdateOptions{}) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to update configmap istio-system/istio :%v", err) - return err - } - - // Update successful - if config.GlobalCfg.Debug { - log.Printf("[Patcher] Updated istio-system/istio ConfigMap as: \n%v", updatedConfigMap) - } - return nil -} - -// PatchNamespaces patches namespaces for adding istio injection -func (kh *K8sHandler) PatchNamespaces() error { - // Get the list of namespaces - namespaces, err := kh.clientSet.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{}) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to list namespaces: %v", err) - return err - } - - // Loop through each namespace and update it with the desired labels - // @todo make this skip adding labeles to namespaces which are defined in the config - for _, ns := range namespaces.Items { - currentNs := ns - - // We are not going to inject sidecars to sentryflow namespace - if currentNs.Name == "sentryflow" { - continue - } - - // Add istio-injection="enabled" for namespaces - currentNs.Labels["istio-injection"] = "enabled" - - // Update the namespace in the cluster - updatedNamespace, err := kh.clientSet.CoreV1().Namespaces().Update(context.TODO(), ¤tNs, v1.UpdateOptions{ - FieldManager: "patcher", - }) - if err != nil { - log.Printf("[Patcher] Unable to update namespace %s: %v", currentNs.Name, err) - return err - } - - log.Printf("[Patcher] Updated Namespace: %s\n", updatedNamespace.Name) - } - - return nil -} - -// PatchRestartDeployments restarts the deployments in namespaces which were applied with "istio-injection": "enabled" -func (kh *K8sHandler) PatchRestartDeployments() error { - // Get the list of all deployments in all namespaces - deployments, err := kh.clientSet.AppsV1().Deployments("").List(context.Background(), v1.ListOptions{}) - if err != nil { - // Handle error - log.Fatalf("[Patcher] Unable to list deployments: %v", err) - return err - } - - // Iterate over each deployment and restart it - for _, deployment := range deployments.Items { - // We are not going to inject sidecars to sentryflow namespace - if deployment.Namespace == "sentryflow" { - continue - } - - // Restart the deployment - err := kh.restartDeployment(deployment.Namespace, deployment.Name) - if err != nil { - // Handle error - log.Printf("[Patcher] Unable to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) - continue - } - - log.Printf("[Patcher] Deployment %s/%s restarted", deployment.Namespace, deployment.Name) - } - - return nil -} - -// restartDeployment performs a rolling restart for a deployment in the specified namespace -// @todo: fix this, this DOES NOT restart deployments -func (kh *K8sHandler) restartDeployment(namespace string, deploymentName string) error { - deploymentClient := kh.clientSet.AppsV1().Deployments(namespace) - - // Get the deployment to retrieve the current spec - deployment, err := deploymentClient.Get(context.Background(), deploymentName, v1.GetOptions{}) - if err != nil { - return err - } - - // Trigger a rolling restart by updating the deployment's labels or annotations - deployment.Spec.Template.ObjectMeta.Labels["restartedAt"] = v1.Now().String() - - // Update the deployment to trigger the rolling restart - _, err = deploymentClient.Update(context.TODO(), deployment, v1.UpdateOptions{}) - if err != nil { - return err - } - - return nil -} diff --git a/sentryflow/core/logHandler.go b/sentryflow/core/logHandler.go deleted file mode 100644 index 1528994..0000000 --- a/sentryflow/core/logHandler.go +++ /dev/null @@ -1,299 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package core - -import ( - "log" - "strconv" - "strings" - "sync" - - "github.com/5GSEC/SentryFlow/exporter" - "github.com/5GSEC/SentryFlow/metrics" - "github.com/5GSEC/SentryFlow/protobuf" - "github.com/5GSEC/SentryFlow/types" - accesslogv3 "github.com/envoyproxy/go-control-plane/envoy/data/accesslog/v3" - metricv3 "github.com/envoyproxy/go-control-plane/envoy/service/metrics/v3" -) - -// Lh global reference for LogHandler -var Lh *LogHandler - -// init Function -func init() { - Lh = NewLogHandler() -} - -// LogHandler Structure -type LogHandler struct { - stopChan chan struct{} - logChan chan interface{} -} - -// aggregationLog Structure -type aggregationLog struct { - Logs []*protobuf.APILog - Labels map[string]string - Annotations map[string]string -} - -// NewLogHandler Structure -func NewLogHandler() *LogHandler { - lh := &LogHandler{ - stopChan: make(chan struct{}), - logChan: make(chan interface{}), - } - - return lh -} - -// StartLogProcessor Function -func StartLogProcessor(wg *sync.WaitGroup) { - go Lh.logProcessingRoutine(wg) -} - -// StopLogProcessor Function -func StopLogProcessor() { - Lh.stopChan <- struct{}{} -} - -// InsertLog Function -func (lh *LogHandler) InsertLog(data interface{}) { - lh.logChan <- data -} - -// logProcessingRoutine Function -func (lh *LogHandler) logProcessingRoutine(wg *sync.WaitGroup) { - wg.Add(1) - for { - select { - case l, ok := <-lh.logChan: - if !ok { - log.Printf("[Error] Unable to process log") - } - - // Check new log's type - switch l.(type) { - case *protobuf.APILog: - go processAccessLog(l.(*protobuf.APILog)) - case *protobuf.EnvoyMetric: - go processEnvoyMetric(l.(*protobuf.EnvoyMetric)) - } - - case <-lh.stopChan: - wg.Done() - return - } - } -} - -// processAccessLog Function -func processAccessLog(al *protobuf.APILog) { - // Send AccessLog to exporter first - exporter.InsertAccessLog(al) - - // Then send AccessLog to metrics - metrics.InsertAccessLog(al) -} - -// processEnvoyMetric Function -func processEnvoyMetric(em *protobuf.EnvoyMetric) { - exporter.InsertEnvoyMetric(em) -} - -// GenerateAccessLogsFromOtel Function -func GenerateAccessLogsFromOtel(logText string) []*protobuf.APILog { - // @todo this needs more optimization, this code is kind of messy - // Create an array of AccessLogs for returning gRPC comm - var index int - ret := make([]*protobuf.APILog, 0) - - // Preprocess redundant chars - logText = strings.ReplaceAll(logText, `\"`, "") - logText = strings.ReplaceAll(logText, `}`, "") - - // Split logs by log_records, this is single access log instance - parts := strings.Split(logText, "log_records") - if len(parts) == 0 { - return nil - } - - // Ignore the first entry, this was the metadata "resource_logs:{resource:{ scope_logs:{" part. - for _, al := range parts[0:] { - if len(al) == 0 { - continue - } - - index = strings.Index(al, "string_value:\"") - if index == -1 { - continue - } - - result := al[index+len("string_value:\""):] - words := strings.Fields(result) - - method := words[1] - path := words[2] - protocolName := words[3] - timeStamp := words[0] - resCode, _ := strconv.ParseInt(words[4], 10, 64) - - srcInform := words[21] - dstInform := words[20] - - var srcIP string - var dstIP string - var srcPort string - var dstPort string - var colonIndex int - - // Extract the left and right words based on the colon delimiter (ADDR:PORT) - colonIndex = strings.LastIndex(srcInform, ":") - if colonIndex > 0 && colonIndex < len(srcInform)-1 { - srcIP = strings.TrimSpace(srcInform[:colonIndex]) - srcPort = strings.TrimSpace(srcInform[colonIndex+1:]) - } - - colonIndex = strings.LastIndex(dstInform, ":") - if colonIndex > 0 && colonIndex < len(dstInform)-1 { - dstIP = strings.TrimSpace(dstInform[:colonIndex]) - dstPort = strings.TrimSpace(dstInform[colonIndex+1:]) - } - - // Lookup using K8s API - src := LookupNetworkedResource(srcIP) - dst := LookupNetworkedResource(dstIP) - - // Create AccessLog in our gRPC format - cur := protobuf.APILog{ - TimeStamp: timeStamp, - Id: 0, // do 0 for now, we are going to write it later - SrcNamespace: src.Namespace, - SrcName: src.Name, - SrcLabel: src.Labels, - SrcIP: srcIP, - SrcPort: srcPort, - SrcType: types.K8sResourceTypeToString(src.Type), - DstNamespace: dst.Namespace, - DstName: dst.Name, - DstLabel: dst.Labels, - DstIP: dstIP, - DstPort: dstPort, - DstType: types.K8sResourceTypeToString(dst.Type), - Protocol: protocolName, - Method: method, - Path: path, - ResponseCode: int32(resCode), - } - - ret = append(ret, &cur) - } - - return ret -} - -// GenerateAccessLogsFromEnvoy Function -func GenerateAccessLogsFromEnvoy(entry *accesslogv3.HTTPAccessLogEntry) *protobuf.APILog { - srcInform := entry.GetCommonProperties().GetDownstreamRemoteAddress().GetSocketAddress() - srcIP := srcInform.GetAddress() - srcPort := strconv.Itoa(int(srcInform.GetPortValue())) - src := LookupNetworkedResource(srcIP) - - dstInform := entry.GetCommonProperties().GetUpstreamRemoteAddress().GetSocketAddress() - dstIP := dstInform.GetAddress() - dstPort := strconv.Itoa(int(dstInform.GetPortValue())) - dst := LookupNetworkedResource(dstIP) - - req := entry.GetRequest() - res := entry.GetResponse() - comm := entry.GetCommonProperties() - proto := entry.GetProtocolVersion() - - timeStamp := comm.GetStartTime().Seconds - path := req.GetPath() - method := req.GetRequestMethod().String() - protocolName := proto.String() - resCode := res.GetResponseCode().GetValue() - - envoyAccessLog := &protobuf.APILog{ - TimeStamp: strconv.FormatInt(timeStamp, 10), - Id: 0, // do 0 for now, we are going to write it later - SrcNamespace: src.Namespace, - SrcName: src.Name, - SrcLabel: src.Labels, - SrcIP: srcIP, - SrcPort: srcPort, - SrcType: types.K8sResourceTypeToString(src.Type), - DstNamespace: dst.Namespace, - DstName: dst.Name, - DstLabel: dst.Labels, - DstIP: dstIP, - DstPort: dstPort, - DstType: types.K8sResourceTypeToString(dst.Type), - Protocol: protocolName, - Method: method, - Path: path, - ResponseCode: int32(resCode), - } - - return envoyAccessLog -} - -// GenerateMetricFromEnvoy Function -func GenerateMetricFromEnvoy(event *metricv3.StreamMetricsMessage, metaData map[string]interface{}) *protobuf.EnvoyMetric { - pod := LookupNetworkedResource(metaData["INSTANCE_IPS"].(string)) - envoyMetric := &protobuf.EnvoyMetric{ - PodIP: metaData["INSTANCE_IPS"].(string), - Name: metaData["NAME"].(string), - Namespace: metaData["NAMESPACE"].(string), - Labels: pod.Labels, - TimeStamp: "", - Metric: make(map[string]*protobuf.MetricValue), - } - - envoyMetric.Metric["GAUGE"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - envoyMetric.Metric["COUNTER"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - envoyMetric.Metric["HISTOGRAM"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - envoyMetric.Metric["SUMMARY"] = &protobuf.MetricValue{ - Value: make(map[string]string), - } - - for _, metric := range event.GetEnvoyMetrics() { - metricType := metric.GetType().String() - metricName := metric.GetName() - - if envoyMetric.Metric[metricType].Value == nil { - continue - } - - var metricValue string - - for _, metricDetail := range metric.GetMetric() { - if envoyMetric.TimeStamp == "" { - envoyMetric.TimeStamp = strconv.FormatInt(metricDetail.GetTimestampMs(), 10) - } - if metricType == "GAUGE" { - metricValue = strconv.FormatFloat(metricDetail.GetGauge().GetValue(), 'f', -1, 64) - } - if metricType == "COUNTER" { - metricValue = strconv.FormatFloat(metricDetail.GetCounter().GetValue(), 'f', -1, 64) - } - if metricType == "HISTOGRAM" { - metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) - } - if metricType == "SUMMARY" { - metricValue = strconv.FormatUint(metricDetail.GetHistogram().GetSampleCount(), 10) - } - - envoyMetric.Metric[metricType].Value[metricName] = metricValue - } - } - - return envoyMetric -} diff --git a/sentryflow/core/sentryflow.go b/sentryflow/core/sentryflow.go index d759c06..e9189c8 100644 --- a/sentryflow/core/sentryflow.go +++ b/sentryflow/core/sentryflow.go @@ -4,13 +4,20 @@ package core import ( "log" + "os" + "os/signal" "sync" + "syscall" - cfg "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/exporter" - "github.com/5GSEC/SentryFlow/metrics" + "github.com/5gsec/SentryFlow/collector" + "github.com/5gsec/SentryFlow/config" + "github.com/5gsec/SentryFlow/exporter" + "github.com/5gsec/SentryFlow/k8s" + "github.com/5gsec/SentryFlow/processor" ) +// == // + // StopChan Channel var StopChan chan struct{} @@ -19,128 +26,147 @@ func init() { StopChan = make(chan struct{}) } -// SentryFlowDaemon Structure -type SentryFlowDaemon struct { - WgDaemon *sync.WaitGroup +// SentryFlowService Structure +type SentryFlowService struct { + waitGroup *sync.WaitGroup } -// NewSentryFlowDaemon Function -func NewSentryFlowDaemon() *SentryFlowDaemon { - dm := new(SentryFlowDaemon) - - dm.WgDaemon = new(sync.WaitGroup) - - return dm +// NewSentryFlow Function +func NewSentryFlow() *SentryFlowService { + sf := new(SentryFlowService) + sf.waitGroup = new(sync.WaitGroup) + return sf } -// DestroySentryFlowDaemon Function -func (dm *SentryFlowDaemon) DestroySentryFlowDaemon() { - //metrics.StartAIEngine() - log.Printf("[SentryFlow] Started AI Engine connection") -} +// DestroySentryFlow Function +func (sf *SentryFlowService) DestroySentryFlow() { + close(StopChan) -// watchK8s Function -func (dm *SentryFlowDaemon) watchK8s() { - K8s.RunInformers(StopChan, dm.WgDaemon) -} + // Remove SentryFlow collector config from Kubernetes + if k8s.UnpatchIstioConfigMap() { + log.Print("[SentryFlow] Unpatched Istio ConfigMap") + } -// logProcessor Function -func (dm *SentryFlowDaemon) logProcessor() { - StartLogProcessor(dm.WgDaemon) - log.Printf("[SentryFlow] Started log processor") -} + // Stop collector + if collector.StopCollector() { + log.Print("[SentryFlow] Stopped Collectors") + } -// metricAnalyzer Function -func (dm *SentryFlowDaemon) metricAnalyzer() { - metrics.StartMetricsAnalyzer(dm.WgDaemon) - log.Printf("[SentryFlow] Started metric analyzer") -} + // Stop Log Processor + if processor.StopLogProcessor() { + log.Print("[SentryFlow] Stopped Log Processors") + } -// exporterServer Function -func (dm *SentryFlowDaemon) exporterServer() { - // Initialize and start exporter server - err := exporter.Exp.InitExporterServer() - if err != nil { - log.Fatalf("[SentryFlow] Unable to initialize Exporter Server: %v", err) - return + // Stop API Aanalyzer + if processor.StopAPIAnalyzer() { + log.Print("[SentryFlow] Stopped API Analyzer") } - err = exporter.Exp.StartExporterServer(dm.WgDaemon) - if err != nil { - log.Fatalf("[SentryFlow] Unable to start Exporter Server: %v", err) + // Stop exporter + if exporter.StopExporter() { + log.Print("[SentryFlow] Stopped Exporters") } - log.Printf("[SentryFlow] Initialized exporter") -} -func (dm *SentryFlowDaemon) aiEngine() { + log.Print("[SentryFlow] Waiting for routine terminations") + sf.waitGroup.Wait() + log.Print("[SentryFlow] Terminated SentryFlow") } -// patchK8s Function -func (dm *SentryFlowDaemon) patchK8s() error { - err := K8s.PatchIstioConfigMap() - if err != nil { - return err - } +// == // - if cfg.GlobalCfg.PatchNamespace { - err = K8s.PatchNamespaces() - if err != nil { - return err - } - } +// GetOSSigChannel Function +func GetOSSigChannel() chan os.Signal { + c := make(chan os.Signal, 1) - if cfg.GlobalCfg.PatchRestartDeployments { - err = K8s.PatchRestartDeployments() - if err != nil { - return err - } - } + signal.Notify(c, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT, + os.Interrupt) - return nil + return c } +// == // + // SentryFlow Function func SentryFlow() { - // create a daemon - dm := NewSentryFlowDaemon() + sf := NewSentryFlow() + + log.Print("[SentryFlow] Initializing SentryFlow") + + // == // // Initialize Kubernetes client - if !K8s.InitK8sClient() { - log.Printf("[Error] Failed to initialize Kubernetes client") - dm.DestroySentryFlowDaemon() + if !k8s.InitK8sClient() { + sf.DestroySentryFlow() return } - log.Printf("[SentryFlow] Initialized Kubernetes client") + // Start Kubernetes informers + k8s.RunInformers(StopChan, sf.waitGroup) - dm.watchK8s() - log.Printf("[SentryFlow] Started to monitor Kubernetes resources") + // Patch Istio ConfigMap + if !k8s.PatchIstioConfigMap() { + sf.DestroySentryFlow() + return + } - if dm.patchK8s() != nil { - log.Printf("[SentryFlow] Failed to patch Kubernetes") + // Patch Namespaces + if config.GlobalConfig.PatchingNamespaces { + if !k8s.PatchNamespaces() { + sf.DestroySentryFlow() + return + } } - log.Printf("[SentryFlow] Patched Kubernetes and Istio configuration") - if !exporter.MDB.InitMetricsDBHandler() { - log.Printf("[Error] Failed to initialize Metrics DB") + // Patch Deployments + if config.GlobalConfig.RestartingPatchedDeployments { + if !k8s.RestartDeployments() { + sf.DestroySentryFlow() + return + } } - log.Printf("[SentryFlow] Successfuly initialized metrics DB") - // Start log processor - dm.logProcessor() + // == // - // Start metric analyzer - dm.metricAnalyzer() + // Start collector + if !collector.StartCollector() { + sf.DestroySentryFlow() + return + } - // Start exporter server - dm.exporterServer() + // Start log processor + if !processor.StartLogProcessor(sf.waitGroup) { + sf.DestroySentryFlow() + return + } - if !exporter.AH.InitAIHandler() { - log.Printf("[Error] Failed to initialize AI Engine") + // Start API analyzer + if !processor.StartAPIAnalyzer(sf.waitGroup) { + sf.DestroySentryFlow() + return + } + + // Start exporter + if !exporter.StartExporter(sf.waitGroup) { + sf.DestroySentryFlow() + return } - log.Printf("[SentryFlow] Successfuly initialized AI Engine") - log.Printf("[SentryFlow] Successfully started SentryFlow") - dm.WgDaemon.Wait() + log.Print("[SentryFlow] Initialization process is completed") + + // == // + + // listen for interrupt signals + sigChan := GetOSSigChannel() + <-sigChan + log.Print("Got a signal to terminate SentryFlow") + + // == // + + // Destroy SentryFlow + sf.DestroySentryFlow() } diff --git a/sentryflow/exporter/aiHandler.go b/sentryflow/exporter/aiHandler.go deleted file mode 100644 index 97fb773..0000000 --- a/sentryflow/exporter/aiHandler.go +++ /dev/null @@ -1,177 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package exporter - -import ( - "context" - "fmt" - "io" - "log" - - cfg "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/protobuf" - "github.com/5GSEC/SentryFlow/types" - "google.golang.org/grpc" -) - -// AH Local reference for AI handler server -var AH *aiHandler - -// aiHandler Structure -type aiHandler struct { - aiHost string - aiPort string - - error chan error - stopChan chan struct{} - aggregatedLogs chan []*protobuf.APILog - apis chan []string - - aiStream *streamInform - - // @todo: add gRPC stream here for bidirectional connection -} - -// streamInform Structure -type streamInform struct { - aiStream protobuf.SentryFlowMetrics_GetAPIClassificationClient -} - -// init Function -func init() { - // Construct address and start listening - AH = NewAIHandler(cfg.AIEngineService, cfg.AIEngineServicePort) -} - -// NewAIHandler Function -func NewAIHandler(host string, port string) *aiHandler { - ah := &aiHandler{ - aiHost: host, - aiPort: port, - - stopChan: make(chan struct{}), - aggregatedLogs: make(chan []*protobuf.APILog), - apis: make(chan []string), - } - - return ah -} - -// initHandler Function -func (ah *aiHandler) InitAIHandler() bool { - addr := fmt.Sprintf("%s:%s", "10.10.0.116", cfg.GlobalCfg.AIEngineServicePort) - - // Set up a connection to the server. - conn, err := grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - log.Fatalf("could not connect: %v", err) - return false - } - - // Start serving gRPC server - log.Printf("[gRPC] Successfully connected to %s for APIMetric", addr) - - client := protobuf.NewSentryFlowMetricsClient(conn) - - aiStream, err := client.GetAPIClassification(context.Background()) - - AH.aiStream = &streamInform{ - aiStream: aiStream, - } - done := make(chan struct{}) - - go sendAPIRoutine() - go recvAPIRoutine(done) - - return true -} - -// InsertAPILog function -func InsertAPILog(APIs []string) { - AH.apis <- APIs -} - -// callAI Function -func (ah *aiHandler) callAI(api string) error { - // @todo: add gRPC send request - return nil -} - -// processBatch Function -func processBatch(batch []string, update bool) error { - for range batch { - - } - - return nil -} - -// performHealthCheck Function -func (ah *aiHandler) performHealthCheck() error { - return nil -} - -// disconnect Function -func (ah *aiHandler) disconnect() { - return -} - -// sendAPIRoutine Function -func sendAPIRoutine() { -routineLoop: - for { - select { - case aal, ok := <-AH.apis: - if !ok { - log.Printf("[Exporter] EnvoyMetric exporter channel closed") - break routineLoop - } - - curAPIRequest := &protobuf.APIClassificationRequest{ - Path: aal, - } - - // err := AH.aiStream.Send(curAPIRequest) - err := AH.aiStream.aiStream.Send(curAPIRequest) - if err != nil { - log.Printf("[Exporter] AI Engine APIs exporting failed %v:", err) - } - case <-AH.stopChan: - break routineLoop - } - } - - return -} - -// recvAPIRoutine Function -func recvAPIRoutine(done chan struct{}) error { - for { - select { - default: - event, err := AH.aiStream.aiStream.Recv() - if err == io.EOF { - return nil - } - - if err != nil { - log.Printf("[Envoy] Something went on wrong when receiving event: %v", err) - return err - } - - for key, value := range event.Fields { - APICount := &types.PerAPICount{ - API: key, - Count: value, - } - err := MDB.PerAPICountInsert(APICount) - if err != nil { - log.Printf("unable to insert Classified API") - return err - } - } - case <-done: - return nil - } - } -} diff --git a/sentryflow/exporter/dbHandler.go b/sentryflow/exporter/dbHandler.go deleted file mode 100644 index 103b627..0000000 --- a/sentryflow/exporter/dbHandler.go +++ /dev/null @@ -1,324 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package exporter - -import ( - "database/sql" - "log" - "os" - "path/filepath" - "time" - - cfg "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/protobuf" - "github.com/5GSEC/SentryFlow/types" - "google.golang.org/protobuf/proto" - - "github.com/mattn/go-sqlite3" -) - -// MDB global reference for Sqlite3 Handler -var MDB *MetricsDBHandler - -// MetricsDBHandler Structure -type MetricsDBHandler struct { - db *sql.DB - dbFile string - dbClearTime int -} - -// AggregationData Structure -type AggregationData struct { - Labels string - Namespace string - AccessLogs []string -} - -// init Function -func init() { - MDB = NewMetricsDBHandler() -} - -// NewMetricsDBHandler Function -func NewMetricsDBHandler() *MetricsDBHandler { - ret := &MetricsDBHandler{ - dbFile: cfg.GlobalCfg.MetricsDBFileName, - dbClearTime: cfg.GlobalCfg.MetricsDBClearTime, - } - return ret -} - -// InitMetricsDBHandler Function -func (md *MetricsDBHandler) InitMetricsDBHandler() bool { - libVersion, libVersionNumber, sourceID := sqlite3.Version() - log.Printf("[DB] Using Sqlite Version is %v %v %v", libVersion, libVersionNumber, sourceID) - log.Printf("[DB] Using DB File as %s", md.dbFile) - targetDir := filepath.Dir(md.dbFile) - _, err := os.Stat(targetDir) - if err != nil { - log.Printf("[DB] Unable to find target directory %s, creating one...", targetDir) - err := os.Mkdir(targetDir, 0750) - if err != nil { - log.Printf("[Error] Unable to create directory for metrics DB %s: %v", targetDir, err) - return false - } - } - - md.db, err = sql.Open("sqlite3", md.dbFile) - if err != nil { - log.Printf("[Error] Unable to open metrics DB: %v", err) - return false - } - - err = md.initDBTables() - if err != nil { - log.Printf("[Error] Unable to initialize metrics DB tables: %v", err) - return false - } - - go aggregationTimeTickerRoutine() - go exportTimeTickerRoutine() - go DBClearRoutine() - - return true -} - -// StopMetricsDBHandler Function -func (md *MetricsDBHandler) StopMetricsDBHandler() { - _ = md.db.Close() -} - -// initDBTables Function -func (md *MetricsDBHandler) initDBTables() error { - _, err := md.db.Exec(` - CREATE TABLE IF NOT EXISTS aggregation_table ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - labels TEXT, - namespace TEXT, - accesslog BLOB - ); - - CREATE TABLE IF NOT EXISTS per_api_metrics ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - api TEXT, - count INTEGER - ); - `) - - return err -} - -// AccessLogInsert Function -func (md *MetricsDBHandler) AccessLogInsert(data types.DbAccessLogType) error { - alData, err := proto.Marshal(data.AccessLog) - if err != nil { - return err - } - - _, err = md.db.Exec("INSERT INTO aggregation_table (labels, namespace, accesslog) VALUES (?, ?, ?)", data.Labels, data.Namespace, alData) - if err != nil { - log.Printf("INSERT accesslog error: %v", err) - return err - } - - return err -} - -// GetLabelNamespacePairs Function -func (md *MetricsDBHandler) GetLabelNamespacePairs() ([]AggregationData, error) { - query := ` - SELECT labels, namespace - FROM aggregation_table - GROUP BY labels, namespace - ` - - rows, err := md.db.Query(query) - if err != nil { - return nil, err - } - defer rows.Close() - - var pairs []AggregationData - for rows.Next() { - var labels, namespace string - err := rows.Scan(&labels, &namespace) - if err != nil { - return nil, err - } - pair := AggregationData{ - Labels: labels, - Namespace: namespace, - } - - pairs = append(pairs, pair) - } - return pairs, nil -} - -// AggregatedAccessLogSelect Function -func (md *MetricsDBHandler) AggregatedAccessLogSelect() (map[string][]*protobuf.APILog, error) { - als := make(map[string][]*protobuf.APILog) - pairs, err := md.GetLabelNamespacePairs() - if err != nil { - return nil, err - } - - query := ` - SELECT accesslog - FROM aggregation_table - WHERE labels = ? AND namespace = ? - ` - for _, pair := range pairs { - curKey := pair.Labels + pair.Namespace - rows, err := md.db.Query(query, pair.Labels, pair.Namespace) - if err != nil { - return nil, err - } - defer rows.Close() - - var accessLogs []*protobuf.APILog - for rows.Next() { - var accessLog []byte - err := rows.Scan(&accessLog) - if err != nil { - return nil, err - } - - al := &protobuf.APILog{} - err = proto.Unmarshal(accessLog, al) - - accessLogs = append(accessLogs, al) - } - als[curKey] = accessLogs - } - - return als, err -} - -// PerAPICountInsert Function -func (md *MetricsDBHandler) PerAPICountInsert(data *types.PerAPICount) error { - var existAPI int - err := md.db.QueryRow("SELECT COUNT(*) FROM per_api_metrics WHERE api = ?", data.API).Scan(&existAPI) - if err != nil { - return err - } - - if existAPI == 0 { - _, err := md.db.Exec("INSERT INTO per_api_metrics (api, count) VALUES (?, ?)", data.API, data.Count) - if err != nil { - return err - } - } else { - err := md.PerAPICountUpdate(data) - if err != nil { - return err - } - } - - return err -} - -// PerAPICountSelect Function -func (md *MetricsDBHandler) PerAPICountSelect(api string) (types.PerAPICount, error) { - var tm types.PerAPICount - - err := md.db.QueryRow("SELECT api, count FROM per_api_metrics WHERE api = ?", api).Scan(&tm.API, &tm.Count) - if err != nil { - return tm, err - } - - return tm, err -} - -// PerAPICountDelete Function -func (md *MetricsDBHandler) PerAPICountDelete(api string) error { - _, err := md.db.Exec("DELETE FROM per_api_metrics WHERE api = ?", api) - if err != nil { - return err - } - - return nil -} - -// PerAPICountUpdate Function -func (md *MetricsDBHandler) PerAPICountUpdate(data *types.PerAPICount) error { - var existAPI int - err := md.db.QueryRow("SELECT COUNT(*) FROM per_api_metrics WHERE api = ?", data.API).Scan(&existAPI) - if err != nil { - return err - } - - if existAPI > 0 { - _, err = md.db.Exec("UPDATE per_api_metrics SET count = ? WHERE api = ?", data.Count, data.API) - if err != nil { - return err - } - } - - return nil -} - -// GetAllMetrics Function -func (md *MetricsDBHandler) GetAllMetrics() (map[string]uint64, error) { - metrics := make(map[string]uint64) - - rows, err := md.db.Query("SELECT api, count FROM per_api_metrics") - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var metric types.PerAPICount - err := rows.Scan(&metric.API, &metric.Count) - if err != nil { - return nil, err - } - metrics[metric.API] = metric.Count - } - - if err := rows.Err(); err != nil { - return nil, err - } - - return metrics, nil -} - -// ClearAllTable Function -func (md *MetricsDBHandler) ClearAllTable() error { - _, err := md.db.Exec("DELETE FROM aggregation_table") - if err != nil { - log.Fatal(err) - return err - } - log.Println("Data in 'aggregation_table' deleted successfully.") - - _, err = md.db.Exec("DELETE FROM per_api_metrics") - if err != nil { - log.Fatal(err) - return err - } - log.Println("Data in 'per_api_metrics' deleted successfully.") - - return nil -} - -// DBClearRoutine Function -func DBClearRoutine() error { - ticker := time.NewTicker(time.Duration(MDB.dbClearTime) * time.Second) - - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := MDB.ClearAllTable() - if err != nil { - log.Printf("[Error] Unable to Clear DB tables: %v", err) - return err - } - } - } - - return nil -} diff --git a/sentryflow/exporter/exportAPILogs.go b/sentryflow/exporter/exportAPILogs.go new file mode 100644 index 0000000..97cda2e --- /dev/null +++ b/sentryflow/exporter/exportAPILogs.go @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: Apache-2.0 + +package exporter + +import ( + "errors" + "fmt" + "log" + "sort" + "strings" + + "github.com/5gsec/SentryFlow/protobuf" +) + +// == // + +// apiLogStreamInform structure +type apiLogStreamInform struct { + Hostname string + IPAddress string + + stream protobuf.SentryFlow_GetAPILogServer + + error chan error +} + +// GetAPILog Function (for gRPC) +func (exs *ExpService) GetAPILog(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetAPILogServer) error { + log.Printf("[Exporter] Client %s(%s) connected (GetAPILog)", info.HostName, info.IPAddress) + + currExporter := &apiLogStreamInform{ + Hostname: info.HostName, + IPAddress: info.IPAddress, + stream: stream, + } + + ExpH.exporterLock.Lock() + ExpH.apiLogExporters = append(ExpH.apiLogExporters, currExporter) + ExpH.exporterLock.Unlock() + + return <-currExporter.error +} + +// SendAPILogs Function +func (exp *ExpHandler) SendAPILogs(apiLog *protobuf.APILog) error { + var err error + + failed := 0 + total := len(exp.apiLogExporters) + + for _, exporter := range exp.apiLogExporters { + currRetry := 0 + maxRetry := 3 + + for currRetry < maxRetry { + if err = exporter.stream.Send(apiLog); err != nil { + log.Printf("[Exporter] Unable to send a API log to %s(%s) retry=%d/%d: %v", exporter.Hostname, exporter.IPAddress, currRetry, maxRetry, err) + currRetry++ + } else { + break + } + } + + if err != nil { + failed++ + } + } + + if failed != 0 { + msg := fmt.Sprintf("[Exporter] Unable to send API logs properly %d/%d failed", failed, total) + return errors.New(msg) + } + + return nil +} + +// == // + +// InsertAPILog Function +func InsertAPILog(apiLog *protobuf.APILog) { + ExpH.exporterAPILogs <- apiLog + + // Make a string with labels + var labelString []string + for k, v := range apiLog.SrcLabel { + labelString = append(labelString, fmt.Sprintf("%s:%s", k, v)) + } + sort.Strings(labelString) + + // Update Stats per namespace and per labels + UpdateStats(apiLog.SrcNamespace, strings.Join(labelString, ","), apiLog.GetPath()) +} + +// // == // diff --git a/sentryflow/exporter/exportAPIMetrics.go b/sentryflow/exporter/exportAPIMetrics.go new file mode 100644 index 0000000..760e02d --- /dev/null +++ b/sentryflow/exporter/exportAPIMetrics.go @@ -0,0 +1,192 @@ +// SPDX-License-Identifier: Apache-2.0 + +package exporter + +import ( + "errors" + "fmt" + "log" + "time" + + "github.com/5gsec/SentryFlow/config" + "github.com/5gsec/SentryFlow/protobuf" +) + +// == // + +// Stats Structure +type Stats struct { + Count int + LastUpdate uint64 +} + +// StatsPerNamespace structure +type StatsPerNamespace struct { + APIs map[string]Stats +} + +// StatsPerLabel structure +type StatsPerLabel struct { + APIs map[string]Stats +} + +// == // + +// apiMetricStreamInform structure +type apiMetricStreamInform struct { + Hostname string + IPAddress string + + apiMetricsStream protobuf.SentryFlow_GetAPIMetricsServer + + error chan error +} + +// GetAPIMetrics Function (for gRPC) +func (exs *ExpService) GetAPIMetrics(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetAPIMetricsServer) error { + log.Printf("[Exporter] Client %s(%s) connected (GetAPIMetrics)", info.HostName, info.IPAddress) + + currExporter := &apiMetricStreamInform{ + Hostname: info.HostName, + IPAddress: info.IPAddress, + apiMetricsStream: stream, + } + + ExpH.exporterLock.Lock() + ExpH.apiMetricsExporters = append(ExpH.apiMetricsExporters, currExporter) + ExpH.exporterLock.Unlock() + + return <-currExporter.error +} + +// SendAPIMetrics Function +func (exp *ExpHandler) SendAPIMetrics(apiMetrics *protobuf.APIMetrics) error { + var err error + + failed := 0 + total := len(exp.apiMetricsExporters) + + for _, exporter := range exp.apiMetricsExporters { + currRetry := 0 + maxRetry := 3 + + for currRetry < maxRetry { + if err = exporter.apiMetricsStream.Send(apiMetrics); err != nil { + log.Printf("[Exporter] Unable to send API Metrics to %s(%s) retry=%d/%d: %v", exporter.Hostname, exporter.IPAddress, currRetry, maxRetry, err) + currRetry++ + } else { + break + } + } + + if err != nil { + failed++ + } + } + + if failed != 0 { + msg := fmt.Sprintf("unable to send API Metrics properly %d/%d failed", failed, total) + return errors.New(msg) + } + + return nil +} + +// == // + +// UpdateStats Function +func UpdateStats(namespace string, label string, api string) { + // == // + + ExpH.statsPerNamespaceLock.Lock() + + // Check if namespace exists + if _, ok := ExpH.statsPerNamespace[namespace]; !ok { + ExpH.statsPerNamespace[namespace] = StatsPerNamespace{ + APIs: make(map[string]Stats), + } + } + + statsPerNamespace := ExpH.statsPerNamespace[namespace] + + // Check if API exists + if _, ok := statsPerNamespace.APIs[api]; !ok { + init := Stats{ + Count: 1, + LastUpdate: uint64(time.Now().Unix()), + } + statsPerNamespace.APIs[api] = init + } else { + stats := statsPerNamespace.APIs[api] + + stats.Count++ + stats.LastUpdate = uint64(time.Now().Unix()) + + statsPerNamespace.APIs[api] = stats + } + + ExpH.statsPerNamespace[namespace] = statsPerNamespace + + ExpH.statsPerNamespaceLock.Unlock() + + // == // + + ExpH.statsPerLabelLock.Lock() + + // Check if namespace+label exists + if _, ok := ExpH.statsPerLabel[namespace+label]; !ok { + ExpH.statsPerLabel[namespace+label] = StatsPerLabel{ + APIs: make(map[string]Stats), + } + } + + statsPerLabel := ExpH.statsPerLabel[namespace+label] + + // Check if API exists + if _, ok := statsPerLabel.APIs[api]; !ok { + init := Stats{ + Count: 1, + LastUpdate: uint64(time.Now().Unix()), + } + statsPerLabel.APIs[api] = init + } else { + stats := statsPerLabel.APIs[api] + + stats.Count++ + stats.LastUpdate = uint64(time.Now().Unix()) + + statsPerLabel.APIs[api] = stats + } + + ExpH.statsPerLabel[namespace+label] = statsPerLabel + + ExpH.statsPerLabelLock.Unlock() + + // == // +} + +// AggregateAPIMetrics Function +func AggregateAPIMetrics() { + ticker := time.NewTicker(time.Duration(config.GlobalConfig.AggregationPeriod) * time.Second) + defer ticker.Stop() + + for { + select { + // + } + } +} + +// CleanUpOutdatedStats Function +func CleanUpOutdatedStats() { + ticker := time.NewTicker(time.Duration(config.GlobalConfig.CleanUpPeriod) * time.Second) + defer ticker.Stop() + + for { + select { + // + } + } +} + +// == // diff --git a/sentryflow/exporter/exportEnvoyMetrics.go b/sentryflow/exporter/exportEnvoyMetrics.go new file mode 100644 index 0000000..4a04226 --- /dev/null +++ b/sentryflow/exporter/exportEnvoyMetrics.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 + +package exporter + +import ( + "errors" + "fmt" + "log" + + "github.com/5gsec/SentryFlow/protobuf" +) + +// == // + +// envoyMetricsStreamInform structure +type envoyMetricsStreamInform struct { + Hostname string + IPAddress string + + metricsStream protobuf.SentryFlow_GetEnvoyMetricsServer + + error chan error +} + +// GetEnvoyMetrics Function (for gRPC) +func (exs *ExpService) GetEnvoyMetrics(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetEnvoyMetricsServer) error { + log.Printf("[Exporter] Client %s(%s) connected (GetEnvoyMetrics)", info.HostName, info.IPAddress) + + currExporter := &envoyMetricsStreamInform{ + Hostname: info.HostName, + IPAddress: info.IPAddress, + metricsStream: stream, + } + + ExpH.exporterLock.Lock() + ExpH.envoyMetricsExporters = append(ExpH.envoyMetricsExporters, currExporter) + ExpH.exporterLock.Unlock() + + return <-currExporter.error +} + +// SendEnvoyMetrics Function +func (exp *ExpHandler) SendEnvoyMetrics(evyMetrics *protobuf.EnvoyMetrics) error { + var err error + + failed := 0 + total := len(exp.envoyMetricsExporters) + + for _, exporter := range exp.envoyMetricsExporters { + currRetry := 0 + maxRetry := 3 + + for currRetry < maxRetry { + if err = exporter.metricsStream.Send(evyMetrics); err != nil { + log.Printf("[Exporter] Unable to send Envoy Metrics to %s(%s) retry=%d/%d: %v", exporter.Hostname, exporter.IPAddress, currRetry, maxRetry, err) + currRetry++ + } else { + break + } + } + + if err != nil { + failed++ + } + } + + if failed != 0 { + msg := fmt.Sprintf("[Exporter] Unable to send Envoy Metrics properly %d/%d failed", failed, total) + return errors.New(msg) + } + + return nil +} + +// == // + +// InsertEnvoyMetrics Function +func InsertEnvoyMetrics(evyMetrics *protobuf.EnvoyMetrics) { + ExpH.exporterMetrics <- evyMetrics +} + +// == // diff --git a/sentryflow/exporter/exporterHandler.go b/sentryflow/exporter/exporterHandler.go index 598e76e..400433c 100644 --- a/sentryflow/exporter/exporterHandler.go +++ b/sentryflow/exporter/exporterHandler.go @@ -3,414 +3,225 @@ package exporter import ( - "errors" "fmt" "net" - "sort" - "strings" "sync" - "time" - cfg "github.com/5GSEC/SentryFlow/config" - "github.com/5GSEC/SentryFlow/protobuf" - "github.com/5GSEC/SentryFlow/types" + "github.com/5gsec/SentryFlow/config" + "github.com/5gsec/SentryFlow/protobuf" + + "log" - "github.com/emicklei/go-restful/v3/log" "google.golang.org/grpc" ) -// Exp global reference for Exporter Handler -var Exp *Handler +// == // + +// ExpH global reference for Exporter Handler +var ExpH *ExpHandler // init Function func init() { - Exp = NewExporterHandler() + ExpH = NewExporterHandler() } -// Handler structure -type Handler struct { - baseExecutionID uint64 - currentLogCount uint64 - agTime int - exTime int - stopChan chan struct{} - lock sync.Mutex - exporters []*Inform - apiMetricExporters []*apiMetricStreamInform - metricExporters []*metricStreamInform - exporterLock sync.Mutex - exporterLogs chan *protobuf.APILog - exporterAPIMetrics chan *protobuf.APIMetric - exporterMetrics chan *protobuf.EnvoyMetric - - listener net.Listener - gRPCServer *grpc.Server -} +// ExpHandler structure +type ExpHandler struct { + exporterService net.Listener + grpcServer *grpc.Server + grpcService *ExpService -// Inform structure -type Inform struct { - stream protobuf.SentryFlow_GetLogServer - error chan error - Hostname string - IPAddress string -} + apiLogExporters []*apiLogStreamInform + apiMetricsExporters []*apiMetricStreamInform + envoyMetricsExporters []*envoyMetricsStreamInform -// apiMetricStreamInform structure -type apiMetricStreamInform struct { - apiMetricStream protobuf.SentryFlow_GetAPIMetricsServer - error chan error - Hostname string - IPAddress string -} + exporterLock sync.Mutex -// metricStreamInform structure -type metricStreamInform struct { - metricStream protobuf.SentryFlow_GetEnvoyMetricsServer - error chan error - Hostname string - IPAddress string -} + exporterAPILogs chan *protobuf.APILog + exporterAPIMetrics chan *protobuf.APIMetrics + exporterMetrics chan *protobuf.EnvoyMetrics -// NewExporterHandler Function -func NewExporterHandler() *Handler { - exp := &Handler{ - baseExecutionID: uint64(time.Now().UnixMicro()), - currentLogCount: 0, - agTime: cfg.GlobalCfg.MetricsDBAggregationTime, - exTime: cfg.GlobalCfg.APIMetricsSendTime, - exporters: make([]*Inform, 0), - stopChan: make(chan struct{}), - lock: sync.Mutex{}, - exporterLock: sync.Mutex{}, - exporterLogs: make(chan *protobuf.APILog), - exporterAPIMetrics: make(chan *protobuf.APIMetric), - exporterMetrics: make(chan *protobuf.EnvoyMetric), - } + statsPerNamespace map[string]StatsPerNamespace + statsPerNamespaceLock sync.RWMutex - return exp -} + statsPerLabel map[string]StatsPerLabel + statsPerLabelLock sync.RWMutex -// InsertAccessLog Function -func InsertAccessLog(al *protobuf.APILog) { - // Avoid race condition for currentLogCount, otherwise we might have duplicate IDs - Exp.lock.Lock() - al.Id = Exp.baseExecutionID + Exp.currentLogCount - Exp.currentLogCount++ - Exp.lock.Unlock() + stopChan chan struct{} +} - go saveAccessLog(al) // go routine?? - Exp.exporterLogs <- al +// ExpService Structure +type ExpService struct { + protobuf.UnimplementedSentryFlowServer } -func saveAccessLog(al *protobuf.APILog) { - curLabels := al.SrcLabel +// == // - var labelString []string +// NewExporterHandler Function +func NewExporterHandler() *ExpHandler { + exp := &ExpHandler{ + grpcService: new(ExpService), - for key, value := range curLabels { - labelString = append(labelString, fmt.Sprintf("%s:%s", key, value)) - } + apiLogExporters: make([]*apiLogStreamInform, 0), + apiMetricsExporters: make([]*apiMetricStreamInform, 0), + envoyMetricsExporters: make([]*envoyMetricsStreamInform, 0), - sort.Strings(labelString) + exporterLock: sync.Mutex{}, - curData := types.DbAccessLogType{ - Labels: strings.Join(labelString, " "), - Namespace: al.SrcNamespace, - AccessLog: al, - } + exporterAPILogs: make(chan *protobuf.APILog), + exporterAPIMetrics: make(chan *protobuf.APIMetrics), + exporterMetrics: make(chan *protobuf.EnvoyMetrics), - err := MDB.AccessLogInsert(curData) - if err != nil { - log.Printf("unable to insert AccessLog") - return + statsPerNamespace: make(map[string]StatsPerNamespace), + statsPerLabel: make(map[string]StatsPerLabel), + + stopChan: make(chan struct{}), } -} -// InsertEnvoyMetric Function -func InsertEnvoyMetric(em *protobuf.EnvoyMetric) { - Exp.exporterMetrics <- em + return exp } -// InitExporterServer Function -func (exp *Handler) InitExporterServer() error { - listenAddr := fmt.Sprintf("%s:%s", cfg.GlobalCfg.CustomExportListenAddr, cfg.GlobalCfg.CustomExportListenPort) +// == // - // Start listening - lis, err := net.Listen("tcp", listenAddr) +// StartExporter Function +func StartExporter(wg *sync.WaitGroup) bool { + // Make a string with the given exporter address and port + exporterService := fmt.Sprintf("%s:%s", config.GlobalConfig.ExporterAddr, config.GlobalConfig.ExporterPort) + + // Start listening gRPC port + expService, err := net.Listen("tcp", exporterService) if err != nil { - msg := fmt.Sprintf("unable to listen at %s: %v", listenAddr, err) - return errors.New(msg) + log.Fatalf("[Exporter] Unable to listen at %s: %v", exporterService, err) + return false } + ExpH.exporterService = expService + log.Printf("[Exporter] Listening Exporter gRPC (%s)", exporterService) + // Create gRPC server - server := grpc.NewServer() - protobuf.RegisterSentryFlowServer(server, exs) + gRPCServer := grpc.NewServer() + ExpH.grpcServer = gRPCServer - exp.listener = lis - exp.gRPCServer = server + protobuf.RegisterSentryFlowServer(gRPCServer, ExpH.grpcService) - log.Printf("[Exporter] Exporter listening at %s", listenAddr) - return nil -} + log.Printf("[Exporter] Initialized Exporter gRPC") -// StartExporterServer Function -func (exp *Handler) StartExporterServer(wg *sync.WaitGroup) error { - log.Printf("[Exporter] Starting exporter server") - var err error - err = nil - - go exp.exportRoutine(wg) - - go func() { - wg.Add(1) - // Serve is blocking function - err = exp.gRPCServer.Serve(exp.listener) - if err != nil { - wg.Done() - return - } + // Serve gRPC Service + go ExpH.grpcServer.Serve(ExpH.exporterService) - wg.Done() - }() + log.Printf("[Exporter] Serving Exporter gRPC (%s)", exporterService) - return err -} + // Export APILogs + go ExpH.exportAPILogs(wg) -// exportRoutine Function -func (exp *Handler) exportRoutine(wg *sync.WaitGroup) { - wg.Add(1) - log.Printf("[Exporter] Starting export routine") + log.Printf("[Exporter] Exporting API Logs through gRPC") -routineLoop: - for { - select { - // @todo add more channels for this - case al, ok := <-exp.exporterLogs: - if !ok { - log.Printf("[Exporter] Log exporter channel closed") - break routineLoop - } + // Export APIMetrics + go ExpH.exportAPIMetrics(wg) - err := exp.sendLogs(al) - if err != nil { - log.Printf("[Exporter] Log exporting failed %v:", err) - } + log.Printf("[Exporter] Exporting API Metrics through gRPC") - case em, ok := <-exp.exporterMetrics: - if !ok { - log.Printf("[Exporter] EnvoyMetric exporter channel closed") - break routineLoop - } + // Export EnvoyMetrics + go ExpH.exportEnvoyMetrics(wg) - err := exp.sendMetrics(em) - if err != nil { - log.Printf("[Exporter] Metric exporting failed %v:", err) - } + log.Printf("[Exporter] Exporting Envoy Metrics through gRPC") - case am, ok := <-exp.exporterAPIMetrics: - if !ok { - log.Printf("[Exporter] APIMetric exporter channel closed") - break routineLoop - } - err := exp.sendAPIMetrics(am) - if err != nil { - log.Printf("[Exporter] APIMetric exporting failed %v:", err) - } + return true +} - case <-exp.stopChan: - break routineLoop - } - } +// StopExporter Function +func StopExporter() bool { + // One for exportAPILogs + ExpH.stopChan <- struct{}{} - defer wg.Done() - return -} + // One for exportAPIMetrics + ExpH.stopChan <- struct{}{} -// sendLogs Function -func (exp *Handler) sendLogs(l *protobuf.APILog) error { - exp.exporterLock.Lock() - defer exp.exporterLock.Unlock() - - // iterate and send logs - failed := 0 - total := len(exp.exporters) - for _, exporter := range exp.exporters { - curRetry := 0 - - // @todo: make max retry count per logs using config - // @todo: make max retry count per single exporter before removing the exporter using config - var err error - for curRetry < 3 { - err = exporter.stream.Send(l) - if err != nil { - log.Printf("[Exporter] Unable to send log to %s(%s) retry=%d/%d: %v", - exporter.Hostname, exporter.IPAddress, curRetry, 3, err) - curRetry++ - } else { - break - } - } + // One for exportEnvoyMetrics + ExpH.stopChan <- struct{}{} - // Count failed - if err != nil { - failed++ - } - } + // Stop gRPC server + ExpH.grpcServer.GracefulStop() - // notify failed count - if failed != 0 { - msg := fmt.Sprintf("unable to send logs properly %d/%d failed", failed, total) - return errors.New(msg) - } + log.Printf("[Exporter] Gracefully stopped Exporter gRPC") - return nil + return true } -// sendMetrics Function -func (exp *Handler) sendMetrics(l *protobuf.EnvoyMetric) error { - exp.exporterLock.Lock() - defer exp.exporterLock.Unlock() - - // iterate and send logs - failed := 0 - total := len(exp.metricExporters) - for _, exporter := range exp.metricExporters { - curRetry := 0 - - // @todo: make max retry count per logs using config - // @todo: make max retry count per single exporter before removing the exporter using config - var err error - for curRetry < 3 { - err = exporter.metricStream.Send(l) - if err != nil { - log.Printf("[Exporter] Unable to send metric to %s(%s) retry=%d/%d: %v", - exporter.Hostname, exporter.IPAddress, curRetry, 3, err) - curRetry++ - } else { - break - } - } +// == // - // Count failed - if err != nil { - failed++ - } - } - - // notify failed count - if failed != 0 { - msg := fmt.Sprintf("unable to send metrics properly %d/%d failed", failed, total) - return errors.New(msg) - } +// exportAPILogs Function +func (exp *ExpHandler) exportAPILogs(wg *sync.WaitGroup) { + wg.Add(1) - return nil -} +routineLoop: + for { + select { + case apiLog, ok := <-exp.exporterAPILogs: + if !ok { + log.Printf("[Exporter] Log exporter channel closed") + break routineLoop + } -// sendAPIMetrics Function -func (exp *Handler) sendAPIMetrics(l *protobuf.APIMetric) error { - exp.exporterLock.Lock() - defer exp.exporterLock.Unlock() - - // iterate and send logs - failed := 0 - total := len(exp.apiMetricExporters) - for _, exporter := range exp.apiMetricExporters { - curRetry := 0 - - // @todo: make max retry count per logs using config - // @todo: make max retry count per single exporter before removing the exporter using config - var err error - for curRetry < 3 { - err = exporter.apiMetricStream.Send(l) - if err != nil { - log.Printf("[Exporter] Unable to send metric to %s(%s) retry=%d/%d: %v", - exporter.Hostname, exporter.IPAddress, curRetry, 3, err) - curRetry++ - } else { - break + if err := exp.SendAPILogs(apiLog); err != nil { + log.Printf("[Exporter] APILog exporting failed %v:", err) } - } - // Count failed - if err != nil { - failed++ + case <-exp.stopChan: + break routineLoop } } - // notify failed count - if failed != 0 { - msg := fmt.Sprintf("unable to send metrics properly %d/%d failed", failed, total) - return errors.New(msg) - } - - return nil -} - -// APIMetricsExportRoutine function -func (exp *Handler) APIMetricsExportRoutine() { - + defer wg.Done() } -// aggregationTimeTickerRoutine Function -func aggregationTimeTickerRoutine() error { - aggregationTicker := time.NewTicker(time.Duration(Exp.agTime) * time.Second) - - defer aggregationTicker.Stop() +// exportAPIMetrics Function +func (exp *ExpHandler) exportAPIMetrics(wg *sync.WaitGroup) { + wg.Add(1) +routineLoop: for { select { - case <-aggregationTicker.C: - als, err := MDB.AggregatedAccessLogSelect() - if err != nil { - log.Printf("[Exporter] AccessLog Aggregation %v", err) - return err + case apiMetrics, ok := <-exp.exporterAPIMetrics: + if !ok { + log.Printf("[Exporter] APIMetric exporter channel closed") + break routineLoop } - - for _, val := range als { - // export part - curAPIs := []string{} - for _, APILog := range val { - curAPIs = append(curAPIs, APILog.Path) - } - InsertAPILog(curAPIs) + if err := exp.SendAPIMetrics(apiMetrics); err != nil { + log.Printf("[Exporter] APIMetric exporting failed %v:", err) } + + case <-exp.stopChan: + break routineLoop } } -} -// exportTimeTickerRoutine Function -func exportTimeTickerRoutine() error { - apiMetricTicker := time.NewTicker(time.Duration(Exp.exTime) * time.Second) + defer wg.Done() +} - defer apiMetricTicker.Stop() +// exportEnvoyMetrics Function +func (exp *ExpHandler) exportEnvoyMetrics(wg *sync.WaitGroup) { + wg.Add(1) +routineLoop: for { select { - case <-apiMetricTicker.C: - curAPIMetrics, err := MDB.GetAllMetrics() - - if err != nil { - log.Printf("[Exporter] APIMetric TimeTicker channel closed") - return err + case evyMetrics, ok := <-exp.exporterMetrics: + if !ok { + log.Printf("[Exporter] EnvoyMetric exporter channel closed") + break routineLoop } - if len(curAPIMetrics) > 0 { - curAPIMetric := &protobuf.APIMetric{ - PerAPICounts: curAPIMetrics, - } - Exp.exporterAPIMetrics <- curAPIMetric + if err := exp.SendEnvoyMetrics(evyMetrics); err != nil { + log.Printf("[Exporter] EnvoyMetric exporting failed %v:", err) } + + case <-exp.stopChan: + break routineLoop } } -} - -// StopExporterServer Function -func (exp *Handler) StopExporterServer() { - // Gracefully stop all client connections - exp.stopChan <- struct{}{} - // Gracefully stop gRPC Server - exp.gRPCServer.GracefulStop() - - log.Printf("[Exporter] Stopped exporter server") + defer wg.Done() } + +// == // diff --git a/sentryflow/exporter/exporterServer.go b/sentryflow/exporter/exporterServer.go deleted file mode 100644 index 71f0c40..0000000 --- a/sentryflow/exporter/exporterServer.go +++ /dev/null @@ -1,83 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package exporter - -import ( - "log" - - "github.com/5GSEC/SentryFlow/protobuf" -) - -var exs *Server - -// init Function -func init() { - exs = NewExporterServer() -} - -// Server Structure -type Server struct { - protobuf.UnimplementedSentryFlowServer // @todo: make this fixed. -} - -// NewExporterServer Function -func NewExporterServer() *Server { - return new(Server) -} - -// GetLog Function -func (exs *Server) GetLog(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetLogServer) error { - log.Printf("[Exporter] Client %s(%s) connected (GetLog)", info.HostName, info.IPAddress) - - curExporter := &Inform{ - stream: stream, - Hostname: info.HostName, - IPAddress: info.IPAddress, - } - - // Append new exporter client for future use - Exp.exporterLock.Lock() - Exp.exporters = append(Exp.exporters, curExporter) - Exp.exporterLock.Unlock() - - // Keeping gRPC stream alive - // refer https://stackoverflow.com/questions/36921131/ - return <-curExporter.error -} - -// GetEnvoyMetrics Function -func (exs *Server) GetEnvoyMetrics(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetEnvoyMetricsServer) error { - log.Printf("[Exporter] Client %s(%s) connected (GetEnvoyMetrics)", info.HostName, info.IPAddress) - - curExporter := &metricStreamInform{ - metricStream: stream, - Hostname: info.HostName, - IPAddress: info.IPAddress, - } - - // Append new exporter client for future use - Exp.exporterLock.Lock() - Exp.metricExporters = append(Exp.metricExporters, curExporter) - Exp.exporterLock.Unlock() - - // Keeping gRPC stream alive - // refer https://stackoverflow.com/questions/36921131/ - return <-curExporter.error -} - -// GetAPIMetrics Function -func (exs *Server) GetAPIMetrics(info *protobuf.ClientInfo, stream protobuf.SentryFlow_GetAPIMetricsServer) error { - log.Printf("[Exporter] Client %s(%s) connected (GetAPIMetrics)", info.HostName, info.IPAddress) - - curExporter := &apiMetricStreamInform{ - apiMetricStream: stream, - Hostname: info.HostName, - IPAddress: info.IPAddress, - } - - Exp.exporterLock.Lock() - Exp.apiMetricExporters = append(Exp.apiMetricExporters, curExporter) - Exp.exporterLock.Unlock() - - return <-curExporter.error -} diff --git a/sentryflow/go.mod b/sentryflow/go.mod index 79e87b1..2b8f325 100644 --- a/sentryflow/go.mod +++ b/sentryflow/go.mod @@ -1,18 +1,15 @@ -module github.com/5GSEC/SentryFlow +module github.com/5gsec/SentryFlow go 1.21 -replace github.com/5GSEC/SentryFlow/protobuf => ../protobuf +replace github.com/5gsec/SentryFlow/protobuf => ../protobuf require ( - github.com/5GSEC/SentryFlow/protobuf v0.0.0-00010101000000-000000000000 - github.com/emicklei/go-restful/v3 v3.11.0 + github.com/5gsec/SentryFlow/protobuf v0.0.0-00010101000000-000000000000 github.com/envoyproxy/go-control-plane v0.12.0 - github.com/mattn/go-sqlite3 v1.14.22 github.com/spf13/viper v1.18.2 go.opentelemetry.io/proto/otlp v1.0.0 google.golang.org/grpc v1.63.2 - google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 @@ -22,6 +19,7 @@ require ( require ( github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.3.0 // indirect @@ -66,6 +64,7 @@ require ( google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/protobuf v1.34.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/sentryflow/go.sum b/sentryflow/go.sum index d45125b..55a94cd 100644 --- a/sentryflow/go.sum +++ b/sentryflow/go.sum @@ -67,8 +67,6 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= -github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -192,8 +190,8 @@ google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.0 h1:Qo/qEd2RZPCf2nKuorzksSknv0d3ERwp1vFG38gSmH4= +google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/sentryflow/k8s/k8sHandler.go b/sentryflow/k8s/k8sHandler.go new file mode 100644 index 0000000..f41fa44 --- /dev/null +++ b/sentryflow/k8s/k8sHandler.go @@ -0,0 +1,529 @@ +// SPDX-License-Identifier: Apache-2.0 + +package k8s + +import ( + "context" + "log" + "sync" + "time" + + "github.com/5gsec/SentryFlow/config" + "github.com/5gsec/SentryFlow/types" + + "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +// == // + +// K8sH global reference for Kubernetes Handler +var K8sH *KubernetesHandler + +// init Function +func init() { + K8sH = NewK8sHandler() +} + +// KubernetesHandler Structure +type KubernetesHandler struct { + config *rest.Config + clientSet *kubernetes.Clientset + + watchers map[string]*cache.ListWatch + informers map[string]cache.Controller + + podMap map[string]*corev1.Pod // NOT thread safe + serviceMap map[string]*corev1.Service // NOT thread safe +} + +// NewK8sHandler Function +func NewK8sHandler() *KubernetesHandler { + kh := &KubernetesHandler{ + watchers: make(map[string]*cache.ListWatch), + informers: make(map[string]cache.Controller), + + podMap: make(map[string]*corev1.Pod), + serviceMap: make(map[string]*corev1.Service), + } + + return kh +} + +// == // + +// InitK8sClient Function +func InitK8sClient() bool { + var err error + + // Initialize in cluster config + K8sH.config, err = rest.InClusterConfig() + if err != nil { + log.Fatal("[InitK8sClient] Failed to initialize Kubernetes client") + return false + } + + // Initialize Kubernetes clientSet + K8sH.clientSet, err = kubernetes.NewForConfig(K8sH.config) + if err != nil { + log.Fatal("[InitK8sClient] Failed to initialize Kubernetes client") + return false + } + + // Create a mapping table for existing pods and services to IPs + K8sH.initExistingResources() + + watchTargets := []string{"pods", "services"} + + // Initialize watchers for pods and services + for _, target := range watchTargets { + watcher := cache.NewListWatchFromClient( + K8sH.clientSet.CoreV1().RESTClient(), + target, + corev1.NamespaceAll, + fields.Everything(), + ) + K8sH.watchers[target] = watcher + } + + // Initialize informers + K8sH.initInformers() + + log.Printf("[InitK8sClient] Initialized Kubernetes client") + + return true +} + +// initExistingResources Function that creates a mapping table for existing pods and services to IPs +// This is required since informers are NOT going to see existing resources until they are updated, created or deleted +// @todo: Refactor this function, this is kind of messy +func (k8s *KubernetesHandler) initExistingResources() { + // List existing Pods + podList, err := k8s.clientSet.CoreV1().Pods(corev1.NamespaceAll).List(context.TODO(), v1.ListOptions{}) + if err != nil { + log.Print("[K8s] Error listing Pods:", err.Error()) + } + + // Add existing Pods to the podMap + for _, pod := range podList.Items { + currentPod := pod + k8s.podMap[pod.Status.PodIP] = ¤tPod + log.Printf("[K8s] Add existing pod %s: %s/%s", pod.Status.PodIP, pod.Namespace, pod.Name) + } + + // List existing Services + serviceList, err := k8s.clientSet.CoreV1().Services(corev1.NamespaceAll).List(context.TODO(), v1.ListOptions{}) + if err != nil { + log.Print("[K8s] Error listing Services:", err.Error()) + } + + // Add existing Services to the serviceMap + for _, service := range serviceList.Items { + currentService := service + + // Check if the service has a LoadBalancer type + if service.Spec.Type == "LoadBalancer" { + for _, lbIngress := range service.Status.LoadBalancer.Ingress { + lbIP := lbIngress.IP + if lbIP != "" { + k8s.serviceMap[lbIP] = ¤tService + log.Printf("[K8s] Add existing service (LoadBalancer) %s: %s/%s", lbIP, service.Namespace, service.Name) + } + } + } else { + k8s.serviceMap[service.Spec.ClusterIP] = ¤tService + if len(service.Spec.ExternalIPs) != 0 { + for _, eIP := range service.Spec.ExternalIPs { + k8s.serviceMap[eIP] = ¤tService + log.Printf("[K8s] Add existing service %s: %s/%s", eIP, service.Namespace, service.Name) + } + } + } + } +} + +// initInformers Function that initializes informers for services and pods in a cluster +func (k8s *KubernetesHandler) initInformers() { + // Create Pod controller informer + _, pc := cache.NewInformer( + k8s.watchers["pods"], + &corev1.Pod{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { // Add pod + pod := obj.(*corev1.Pod) + k8s.podMap[pod.Status.PodIP] = pod + }, + UpdateFunc: func(oldObj, newObj interface{}) { // Update pod + newPod := newObj.(*corev1.Pod) + k8s.podMap[newPod.Status.PodIP] = newPod + }, + DeleteFunc: func(obj interface{}) { // Remove deleted pod + pod := obj.(*corev1.Pod) + delete(k8s.podMap, pod.Status.PodIP) + }, + }, + ) + k8s.informers["pods"] = pc + + // Create Service controller informer + _, sc := cache.NewInformer( + k8s.watchers["services"], + &corev1.Service{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { // Add service + service := obj.(*corev1.Service) + + if service.Spec.Type == "LoadBalancer" { + for _, lbIngress := range service.Status.LoadBalancer.Ingress { + lbIP := lbIngress.IP + if lbIP != "" { + k8s.serviceMap[lbIP] = service + } + } + } else { + k8s.serviceMap[service.Spec.ClusterIP] = service + if len(service.Spec.ExternalIPs) != 0 { + for _, eIP := range service.Spec.ExternalIPs { + k8s.serviceMap[eIP] = service + } + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { // Update service + newService := newObj.(*corev1.Service) + if newService.Spec.Type == "LoadBalancer" { + for _, lbIngress := range newService.Status.LoadBalancer.Ingress { + lbIP := lbIngress.IP + if lbIP != "" { + k8s.serviceMap[lbIP] = newService + } + } + } else { + k8s.serviceMap[newService.Spec.ClusterIP] = newService + if len(newService.Spec.ExternalIPs) != 0 { + for _, eIP := range newService.Spec.ExternalIPs { + k8s.serviceMap[eIP] = newService + } + } + } + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*corev1.Service) + if service.Spec.Type == "LoadBalancer" { + for _, lbIngress := range service.Status.LoadBalancer.Ingress { + lbIP := lbIngress.IP + if lbIP != "" { + delete(k8s.serviceMap, lbIP) + } + } + } else { + delete(k8s.serviceMap, service.Spec.ClusterIP) // Remove deleted service + if len(service.Spec.ExternalIPs) != 0 { + for _, eIP := range service.Spec.ExternalIPs { + delete(k8s.serviceMap, eIP) + } + } + } + }, + }, + ) + k8s.informers["services"] = sc +} + +// == // + +// RunInformers Function that starts running informers +func RunInformers(stopChan chan struct{}, wg *sync.WaitGroup) { + wg.Add(1) + + for name, informer := range K8sH.informers { + name := name + informer := informer + go func() { + log.Printf("[RunInformers] Starting an informer for %s", name) + informer.Run(stopChan) + defer wg.Done() + }() + } + + log.Printf("[RunInformers] Started all Kubernetes informers") +} + +// == // + +// PatchIstioConfigMap Function that patches the Istio's configmap for meshConfig +// This will make istio know that there is an exporter with envoyOtelAls +func PatchIstioConfigMap() bool { + var meshConfig map[string]interface{} + + // Get the ConfigMap istio-system/istio + configMap, err := K8sH.clientSet.CoreV1().ConfigMaps("istio-system").Get(context.Background(), "istio", v1.GetOptions{}) + if err != nil { + log.Fatalf("[PatchIstioConfigMap] Unable to retrieve ConfigMap istio-system/istio :%v", err) + return false + } + + // Unmarshal the YAML string into meshConfig + if err = yaml.Unmarshal([]byte(configMap.Data["mesh"]), &meshConfig); err != nil { + log.Fatalf("[PatchIstioConfigMap] Unable to unmarshall ConfigMap istio-system/istio :%v", err) + return false + } + + if _, evyAccLogExist := meshConfig["enableEnvoyAccessLogService"]; evyAccLogExist { + log.Printf("[PatchIstioConfigMap] Overwrite the contents of \"enableEnvoyAccessLogService\"") + } + meshConfig["enableEnvoyAccessLogService"] = true + + if _, evyAccLogExist := meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyAccessLogService"]; evyAccLogExist { + log.Printf("[PatchIstioConfigMap] Overwrite the contents of \"defaultConfig.envoyAccessLogService\"") + } + meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyAccessLogService"] = map[string]string{ + "address": "sentryflow.sentryflow.svc.cluster.local:4317", + } + + if _, evyMetricsExist := meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyMetricsService"]; evyMetricsExist { + log.Printf("[PatchIstioConfigMap] Overwrite the contents of \"defaultConfig.envoyMetricsService\"") + } + meshConfig["defaultConfig"].(map[interface{}]interface{})["envoyMetricsService"] = map[string]string{ + "address": "sentryflow.sentryflow.svc.cluster.local:4317", + } + + // Update defaultProviders.accessLogs + if defProviders, exists := meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"]; exists { + newDefProviders := defProviders.([]interface{}) + + exists = false + for _, entry := range newDefProviders { + if entry == "sentryflow" { // If "sentryflow" already exists + log.Printf("[PatchIstioConfigMap] istio-system/istio ConfigMap has SentryFlow under defaultProviders.accessLogs, ignoring...") + exists = true + break + } + } + + if !exists { // If "sentryflow" does not exist + newDefProviders = append(newDefProviders, "sentryflow") + meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"] = newDefProviders + } + } else { // If it does not exist + meshConfig["defaultProviders"].(map[interface{}]interface{})["accessLogs"] = []string{"sentryflow"} + } + + // ExtensionProvider for our service + extensionProvider := map[interface{}]interface{}{ + "name": "sentryflow", + "envoyOtelAls": map[interface{}]interface{}{ + "service": "sentryflow.sentryflow.svc.cluster.local", + "port": config.GlobalConfig.CollectorPort, + }, + } + + // Update extensionProviders + if extensionProviders, exists := meshConfig["extensionProviders"]; exists { + newExtensionProviders, ok := extensionProviders.([]interface{}) + if !ok { + log.Printf("[PatchIstioConfigMap] 'extensionProviders' in istio-system/istio ConfigMap has an unexpected type") + } + + exists = false + for _, entry := range newExtensionProviders { + if entryMap, ok := entry.(map[interface{}]interface{}); !ok { + log.Printf("[PatchIstioConfigMap] 'extensionProviders' in istio-system/istio ConfigMap has an unexpected type") + } else if entryMap["name"] == "sentryflow" { // If "sentryflow" already exists + log.Printf("[PatchIstioConfigMap] istio-system/istio ConfigMap has sentryflow under extensionProviders, ignoring... ") + exists = true + break + } + } + + if !exists { + meshConfig["extensionProviders"] = append(extensionProviders.([]map[interface{}]interface{}), extensionProvider) + } + } else { // If it does not exist + meshConfig["extensionProviders"] = []map[interface{}]interface{}{extensionProvider} + } + + // Update the ConfigMap data with the modified meshConfig + updatedMeshConfig, err := yaml.Marshal(meshConfig) + if err != nil { + log.Fatalf("[PatchIstioConfigMap] Unable to marshal updated meshConfig to YAML: %v", err) + return false + } + + // Convert the []byte to string + configMap.Data["mesh"] = string(updatedMeshConfig) + + // Preview changes, for debugging + if config.GlobalConfig.Debug { + log.Printf("[PatchIstioConfigMap] Patching istio-system/istio ConfigMap as: \n%v", configMap) + } + + // Patch the ConfigMap + if updatedConfigMap, err := K8sH.clientSet.CoreV1().ConfigMaps("istio-system").Update(context.Background(), configMap, v1.UpdateOptions{}); err != nil { + log.Fatalf("[PatchIstioConfigMap] Unable to update configmap istio-system/istio :%v", err) + } else { + log.Printf("[PatchIstioConfigMap] Updated istio-system/istio ConfigMap") + + if config.GlobalConfig.Debug { + log.Printf("%v", updatedConfigMap) + } + } + + log.Printf("[PatchIstioConfigMap] Patched Istio ConfigMap") + + return true +} + +// UnpatchIstioConfigMap Function +func UnpatchIstioConfigMap() bool { + // @todo: Remove SentryFlow collector from Kubernetes + return true +} + +// == // + +// PatchNamespaces Function that patches namespaces for adding 'istio-injection' +func PatchNamespaces() bool { + namespaces, err := K8sH.clientSet.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{}) + if err != nil { + log.Fatalf("[PatchNamespaces] Unable to list namespaces: %v", err) + return false + } + + for _, namespace := range namespaces.Items { + // Skip the following namespaces + if namespace.Name == "sentryflow" { + continue + } + + namespace.Labels["istio-injection"] = "enabled" + + // Patch the namespace + if _, err := K8sH.clientSet.CoreV1().Namespaces().Update(context.TODO(), &namespace, v1.UpdateOptions{FieldManager: "patcher"}); err != nil { + log.Printf("[PatchNamespaces] Unable to update namespace %s: %v", namespace.Name, err) + return false + } + + log.Printf("[PatchNamespaces] Updated Namespace: %s\n", namespace.Name) + } + + log.Printf("[PatchNamespaces] Updated all namespaces") + + return true +} + +// restartDeployment Function that performs a rolling restart for a deployment in the specified namespace +// @todo: fix this, this DOES NOT restart deployments +func (k8s *KubernetesHandler) restartDeployment(namespace string, deploymentName string) error { + deploymentClient := k8s.clientSet.AppsV1().Deployments(namespace) + + // Get the deployment to retrieve the current spec + deployment, err := deploymentClient.Get(context.Background(), deploymentName, v1.GetOptions{}) + if err != nil { + return err + } + + // Trigger a rolling restart by updating the deployment's labels or annotations + deployment.Spec.Template.ObjectMeta.Labels["restartedAt"] = v1.Now().String() + + // Update the deployment to trigger the rolling restart + _, err = deploymentClient.Update(context.TODO(), deployment, v1.UpdateOptions{}) + if err != nil { + return err + } + + return nil +} + +// RestartDeployments Function that restarts the deployments in the namespaces with "istio-injection=enabled" +func RestartDeployments() bool { + deployments, err := K8sH.clientSet.AppsV1().Deployments("").List(context.Background(), v1.ListOptions{}) + if err != nil { + log.Fatalf("[PatchDeployments] Unable to list deployments: %v", err) + return false + } + + for _, deployment := range deployments.Items { + // Skip the following namespaces + if deployment.Namespace == "sentryflow" { + continue + } + + // Restart the deployment + if err := K8sH.restartDeployment(deployment.Namespace, deployment.Name); err != nil { + log.Fatalf("[PatchDeployments] Unable to restart deployment %s/%s: %v", deployment.Namespace, deployment.Name, err) + return false + } + + log.Printf("[PatchDeployments] Deployment %s/%s restarted", deployment.Namespace, deployment.Name) + } + + log.Print("[PatchDeployments] Restarted all patched deployments") + + return true +} + +// == // + +// lookupIPAddress Function +func lookupIPAddress(ipAddr string) interface{} { + // Look for pod map + pod, ok := K8sH.podMap[ipAddr] + if ok { + return pod + } + + // Look for service map + service, ok := K8sH.serviceMap[ipAddr] + if ok { + return service + } + + return nil +} + +// LookupK8sResource Function +func LookupK8sResource(srcIP string) types.K8sResource { + ret := types.K8sResource{ + Namespace: "Unknown", + Name: "Unknown", + Labels: make(map[string]string), + Type: types.K8sResourceTypeUnknown, + } + + // Find Kubernetes resource from source IP (service or a pod) + raw := lookupIPAddress(srcIP) + + // Currently supports Service or Pod + switch raw.(type) { + case *corev1.Pod: + pod, ok := raw.(*corev1.Pod) + if ok { + ret.Namespace = pod.Namespace + ret.Name = pod.Name + ret.Labels = pod.Labels + ret.Type = types.K8sResourceTypePod + } + case *corev1.Service: + svc, ok := raw.(*corev1.Service) + if ok { + ret.Namespace = svc.Namespace + ret.Name = svc.Name + ret.Labels = svc.Labels + ret.Type = types.K8sResourceTypeService + } + default: + ret.Type = types.K8sResourceTypeUnknown + } + + return ret +} + +// == // diff --git a/sentryflow/main.go b/sentryflow/main.go index 626777d..d96e538 100644 --- a/sentryflow/main.go +++ b/sentryflow/main.go @@ -3,25 +3,13 @@ package main import ( - "github.com/5GSEC/SentryFlow/collector" - "github.com/5GSEC/SentryFlow/core" - _ "google.golang.org/grpc/encoding/gzip" // If not set, encoding problem occurs https://stackoverflow.com/questions/74062727 - "log" + "github.com/5gsec/SentryFlow/core" ) -// main is the entrypoint of this program -func main() { - go func() { - core.SentryFlow() - }() - - err := collector.Ch.InitGRPCServer() - if err != nil { - log.Fatalf("[Error] Unable to start collector gRPC Server: %v", err) - } +// ========== // +// == Main == // +// ========== // - err = collector.Ch.Serve() - if err != nil { - log.Fatalf("[Error] Unable to serve gRPC Server: %v", err) - } +func main() { + core.SentryFlow() } diff --git a/sentryflow/metrics/api/apiAnalyzer.go b/sentryflow/metrics/api/apiAnalyzer.go deleted file mode 100644 index 78a2ff7..0000000 --- a/sentryflow/metrics/api/apiAnalyzer.go +++ /dev/null @@ -1,92 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "sync" -) - -// aa Local reference for API analyzer -var aa *Analyzer - -// init function -func init() { - aa = NewAPIAnalyzer() -} - -// Analyzer Structure -type Analyzer struct { - perAPICount map[string]uint64 - perAPICountLock sync.Mutex // @todo perhaps combine those two? - - curBatchCount int - batchCountLock sync.Mutex - - stopChan chan struct{} - apiJob chan string -} - -// NewAPIAnalyzer Function -func NewAPIAnalyzer() *Analyzer { - ret := &Analyzer{ - perAPICount: make(map[string]uint64), - } - - return ret -} - -// StartAPIAnalyzer Function -func StartAPIAnalyzer(wg *sync.WaitGroup) { - go apiAnalyzerRoutine(wg) -} - -// StopAPIAnalyzer Function -func StopAPIAnalyzer() { - aa.stopChan <- struct{}{} -} - -// apiAnalyzerRoutine Function -func apiAnalyzerRoutine(wg *sync.WaitGroup) { - wg.Add(1) - for { - select { - case job, ok := <-aa.apiJob: - if !ok { - // @todo perhaps error message here? - continue - } - analyzeAPI(job) - - case <-aa.stopChan: - wg.Done() - break - } - } -} - -// analyzeAPI Function -func analyzeAPI(api string) { - // @todo implement this - classifyAPI(api) -} - -// GetPerAPICount Function -func GetPerAPICount() map[string]uint64 { - aa.perAPICountLock.Lock() - ret := aa.perAPICount - aa.perAPICountLock.Unlock() - - return ret -} - -// UpdatePerAPICount Function -func UpdatePerAPICount(nm map[string]uint64) { - aa.perAPICountLock.Lock() - aa.perAPICount = nm - aa.perAPICountLock.Unlock() -} - -// InsertAnalyzeJob Function -func InsertAnalyzeJob(api string) { - aa.apiJob <- api -} diff --git a/sentryflow/metrics/api/apiClassifier.go b/sentryflow/metrics/api/apiClassifier.go deleted file mode 100644 index e251dc1..0000000 --- a/sentryflow/metrics/api/apiClassifier.go +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package api - -type node struct { - path string - count int - child []*node -} - -type classifiedAPI struct { - destination string - method string - URIRoot *node -} - -// classifyAPI Function -func classifyAPI(api string) { -} - -// generateMetric Function -func generateMetric(cal classifiedAPI) { - -} - -// statisticOfAPIsPerDestination Function -func statisticOfAPIsPerDestination(cal classifiedAPI) { - -} - -// statisticOfAPIsPerMin Function -func statisticOfAPIsPerMin(cal classifiedAPI) { - -} - -// statisticOfErrorAPI Function -func statisticOfErrorAPI(cal classifiedAPI) { - -} - -// statisticOfAPILatency Function -func statisticOfAPILatency(cal classifiedAPI) { - -} diff --git a/sentryflow/metrics/metricHandler.go b/sentryflow/metrics/metricHandler.go deleted file mode 100644 index 2e78627..0000000 --- a/sentryflow/metrics/metricHandler.go +++ /dev/null @@ -1,45 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package metrics - -import ( - "sync" - - "github.com/5GSEC/SentryFlow/metrics/api" - "github.com/5GSEC/SentryFlow/protobuf" -) - -// Mh Global reference for metric handler -var Mh *MetricHandler - -// init Function -func init() { - Mh = NewMetricHandler() -} - -// MetricHandler Structure -type MetricHandler struct { -} - -// NewMetricHandler Function -func NewMetricHandler() *MetricHandler { - mh := &MetricHandler{} - - return mh -} - -// StartMetricsAnalyzer Function -func StartMetricsAnalyzer(wg *sync.WaitGroup) { - api.StartAPIAnalyzer(wg) -} - -// StopMetricsAnalyzer Function -func StopMetricsAnalyzer() { - api.StopAPIAnalyzer() -} - -// InsertAccessLog Function -func InsertAccessLog(al *protobuf.APILog) { - // @todo: make this fixed, for now will just send path from AccessLog - api.InsertAnalyzeJob(al.Path) -} diff --git a/sentryflow/processor/apiAnalyzer.go b/sentryflow/processor/apiAnalyzer.go new file mode 100644 index 0000000..49b29a0 --- /dev/null +++ b/sentryflow/processor/apiAnalyzer.go @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "sync" +) + +// == // + +// APIA Local reference for API analyzer +var APIA *Analyzer + +// init function +func init() { + APIA = NewAPIAnalyzer() +} + +// Analyzer Structure +type Analyzer struct { + apiLog chan string + + stopChan chan struct{} +} + +// NewAPIAnalyzer Function +func NewAPIAnalyzer() *Analyzer { + ret := &Analyzer{} + return ret +} + +// StartAPIAnalyzer Function +func StartAPIAnalyzer(wg *sync.WaitGroup) bool { + // keep analyzing given APIs + go analyzeAPIs(wg) + + return true +} + +// AnalyzeAPI Function +func AnalyzeAPI(api string) { + APIA.apiLog <- api +} + +// StopAPIAnalyzer Function +func StopAPIAnalyzer() bool { + APIA.stopChan <- struct{}{} + + return true +} + +// == // + +// analyzeAPIs Function +func analyzeAPIs(wg *sync.WaitGroup) { + wg.Add(1) + + for { + select { + case api, ok := <-APIA.apiLog: + if !ok { + continue + } + + ClassifyAPI(api) + + case <-APIA.stopChan: + wg.Done() + break + } + } +} + +// == // + +// ClassifyAPI Function +func ClassifyAPI(api string) { + // +} + +// == // diff --git a/sentryflow/processor/apiClassifier.go b/sentryflow/processor/apiClassifier.go new file mode 100644 index 0000000..43f2410 --- /dev/null +++ b/sentryflow/processor/apiClassifier.go @@ -0,0 +1,176 @@ +// SPDX-License-Identifier: Apache-2.0 + +package processor + +// import ( +// "context" +// "fmt" +// "io" +// "log" + +// "github.com/5gsec/SentryFlow/config" +// "github.com/5gsec/SentryFlow/protobuf" +// "github.com/5gsec/SentryFlow/types" +// "google.golang.org/grpc" +// ) + +// // AIH Local reference for AI handler server +// var AIH *AIHandler + +// // AIHandler Structure +// type AIHandler struct { +// AIEngineAddr string +// AIEnginePort string + +// error chan error +// stopChan chan struct{} + +// aggregatedLogs chan []*protobuf.APILog +// APIs chan []string + +// AIStream *streamInform +// } + +// // streamInform Structure +// type streamInform struct { +// AIStream protobuf.SentryFlowMetrics_GetAPIClassificationClient +// } + +// // init Function +// func init() { +// // Construct address and start listening +// ai = NewAIHandler(cfg.AIEngineAddr, cfg.AIEnginePort) +// } + +// // NewAIHandler Function +// func NewAIHandler(addr string, port string) *AIHandler { +// ah := &AIHandler{ +// AIEngineAddr: addr, +// AIEnginePort: port, + +// stopChan: make(chan struct{}), + +// aggregatedLogs: make(chan []*protobuf.APILog), +// APIs: make(chan []string), +// } +// return ah +// } + +// // initHandler Function +// func (ai *AIHandler) InitAIHandler() bool { +// AIEngineService := fmt.Sprintf("%s:%s", cfg.GlobalCfg.AIEngineAddr, cfg.GlobalCfg.AIEnginePort) + +// // Set up a connection to the server. +// conn, err := grpc.Dial(AIEngineService, grpc.WithInsecure()) +// if err != nil { +// log.Fatalf("[AI] Could not connect: %v", err) +// return false +// } + +// // Start serving gRPC server +// log.Printf("[AI] Successfully connected to %s for APIMetrics", AIEngineService) + +// client := protobuf.NewSentryFlowMetricsClient(conn) +// aiStream, err := client.GetAPIClassification(context.Background()) + +// ai.AIStream = &streamInform{ +// AIStream: aiStream, +// } + +// done := make(chan struct{}) + +// go sendAPIRoutine() +// go recvAPIRoutine(done) + +// return true +// } + +// // InsertAPILog function +// func InsertAPILog(APIs []string) { +// ai.APIs <- APIs +// } + +// // callAI Function +// func (ah *aiHandler) callAI(api string) error { +// // @todo: add gRPC send request +// return nil +// } + +// // processBatch Function +// func processBatch(batch []string, update bool) error { +// for range batch { + +// } + +// return nil +// } + +// // performHealthCheck Function +// func (ah *aiHandler) performHealthCheck() error { +// return nil +// } + +// // disconnect Function +// func (ah *aiHandler) disconnect() { +// return +// } + +// // sendAPIRoutine Function +// func sendAPIRoutine() { +// routineLoop: +// for { +// select { +// case aal, ok := <-AH.apis: +// if !ok { +// log.Printf("[Exporter] EnvoyMetric exporter channel closed") +// break routineLoop +// } + +// curAPIRequest := &protobuf.APIClassificationRequest{ +// Path: aal, +// } + +// // err := AH.aiStream.Send(curAPIRequest) +// err := AH.aiStream.aiStream.Send(curAPIRequest) +// if err != nil { +// log.Printf("[Exporter] AI Engine APIs exporting failed %v:", err) +// } +// case <-AH.stopChan: +// break routineLoop +// } +// } + +// return +// } + +// // recvAPIRoutine Function +// func recvAPIRoutine(done chan struct{}) error { +// for { +// select { +// default: +// event, err := AH.aiStream.aiStream.Recv() +// if err == io.EOF { +// return nil +// } + +// if err != nil { +// log.Printf("[Envoy] Something went on wrong when receiving event: %v", err) +// return err +// } + +// for key, value := range event.Fields { +// APICount := &types.PerAPICount{ +// API: key, +// Count: value, +// } +// err := MDB.PerAPICountInsert(APICount) +// if err != nil { +// log.Printf("unable to insert Classified API") +// return err +// } +// } +// case <-done: +// return nil +// } +// } +// } diff --git a/sentryflow/processor/logProcessor.go b/sentryflow/processor/logProcessor.go new file mode 100644 index 0000000..782ebb8 --- /dev/null +++ b/sentryflow/processor/logProcessor.go @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "log" + "sync" + + "github.com/5gsec/SentryFlow/exporter" + "github.com/5gsec/SentryFlow/protobuf" +) + +// == // + +// LogH global reference for Log Handler +var LogH *LogHandler + +// init Function +func init() { + LogH = NewLogHandler() +} + +// LogHandler Structure +type LogHandler struct { + stopChan chan struct{} + + apiLogChan chan interface{} + metricsChan chan interface{} +} + +// aggregationLog Structure +type aggregationLog struct { + Labels map[string]string + Annotations map[string]string + + Logs []*protobuf.APILog +} + +// NewLogHandler Structure +func NewLogHandler() *LogHandler { + lh := &LogHandler{ + stopChan: make(chan struct{}), + + apiLogChan: make(chan interface{}), + metricsChan: make(chan interface{}), + } + + return lh +} + +// == // + +// StartLogProcessor Function +func StartLogProcessor(wg *sync.WaitGroup) bool { + // handle API logs + go ProcessAPILogs(wg) + + // handle metrics + go ProcessMetrics(wg) + + log.Print("[LogProcessor] Started Log Processor") + + return true +} + +// StopLogProcessor Function +func StopLogProcessor() bool { + // One for ProcessAPILogs + LogH.stopChan <- struct{}{} + + // One for ProcessMetrics + LogH.stopChan <- struct{}{} + + log.Print("[LogProcessor] Stopped Log Processor") + + return true +} + +// == // + +// InsertAPILog Function +func InsertAPILog(data interface{}) { + LogH.apiLogChan <- data +} + +// ProcessLogs Function +func ProcessAPILogs(wg *sync.WaitGroup) { + wg.Add(1) + + for { + select { + case logType, ok := <-LogH.apiLogChan: + if !ok { + log.Print("[LogProcessor] Unable to process an API log") + } + + switch logType.(type) { + case *protobuf.APILog: + go exporter.InsertAPILog(logType.(*protobuf.APILog)) + + // Send API for Further Analysis + go AnalyzeAPI(logType.(*protobuf.APILog).Path) + } + + case <-LogH.stopChan: + wg.Done() + return + } + } +} + +// InsertMetrics Function +func InsertMetrics(data interface{}) { + LogH.metricsChan <- data +} + +// ProcessMetrics Function +func ProcessMetrics(wg *sync.WaitGroup) { + wg.Add(1) + + for { + select { + case logType, ok := <-LogH.metricsChan: + if !ok { + log.Print("[LogProcessor] Unable to process metrics") + } + + switch logType.(type) { + case *protobuf.EnvoyMetrics: + go exporter.InsertEnvoyMetrics(logType.(*protobuf.EnvoyMetrics)) + } + + case <-LogH.stopChan: + wg.Done() + return + } + } +} + +// == // diff --git a/sentryflow/sentryflow b/sentryflow/sentryflow new file mode 100755 index 0000000..ff9dbeb Binary files /dev/null and b/sentryflow/sentryflow differ diff --git a/sentryflow/types/metrics.go b/sentryflow/types/metrics.go deleted file mode 100644 index 1a95584..0000000 --- a/sentryflow/types/metrics.go +++ /dev/null @@ -1,20 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package types - -import ( - "github.com/5GSEC/SentryFlow/protobuf" -) - -// PerAPICount Structure -type PerAPICount struct { - API string - Count uint64 -} - -// DbAccessLogType Structure -type DbAccessLogType struct { - Namespace string - Labels string - AccessLog *protobuf.APILog -} diff --git a/sentryflow/types/k8sResources.go b/sentryflow/types/types.go similarity index 72% rename from sentryflow/types/k8sResources.go rename to sentryflow/types/types.go index 50f8fb3..4ce59ab 100644 --- a/sentryflow/types/k8sResources.go +++ b/sentryflow/types/types.go @@ -2,33 +2,35 @@ package types -// k8sResources const +// == // + +// K8sResourceTypes const ( K8sResourceTypeUnknown = 0 K8sResourceTypePod = 1 K8sResourceTypeService = 2 ) -// K8sNetworkedResource Structure -type K8sNetworkedResource struct { - Name string +// K8sResource Structure +type K8sResource struct { + Type uint8 Namespace string + Name string Labels map[string]string Containers []string - Type uint8 } // K8sResourceTypeToString Function -func K8sResourceTypeToString(t uint8) string { - switch t { +func K8sResourceTypeToString(resourceType uint8) string { + switch resourceType { case K8sResourceTypePod: return "Pod" case K8sResourceTypeService: return "Service" case K8sResourceTypeUnknown: - default: return "Unknown" } - return "Unknown" } + +// == //