Skip to content

Commit

Permalink
update sentryflow
Browse files Browse the repository at this point in the history
Signed-off-by: Jaehyun Nam <[email protected]>
  • Loading branch information
nam-jaehyun committed May 6, 2024
1 parent 07c3013 commit c5eafe3
Show file tree
Hide file tree
Showing 28 changed files with 1,987 additions and 2,293 deletions.
109 changes: 51 additions & 58 deletions sentryflow/collector/collectorHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,91 @@
package collector

import (
"errors"
"fmt"
cfg "github.com/5GSEC/SentryFlow/config"
"google.golang.org/grpc"
"log"
"net"
"sync"

"github.com/5gsec/SentryFlow/config"
"google.golang.org/grpc"
)

// Ch global reference for Collector Handler
var Ch *Handler
// == //

// ColH global reference for Collector Handler
var ColH *ColHandler

// init Function
func init() {
Ch = NewCollectorHandler()
ColH = NewCollectorHandler()
}

// Handler Structure
type Handler struct {
collectors []collectorInterface

listener net.Listener
// ColHandler Structure
type ColHandler struct {
colService net.Listener
grpcServer *grpc.Server

wg sync.WaitGroup
collectors []collectorInterface
}

// NewCollectorHandler Function
func NewCollectorHandler() *Handler {
ch := &Handler{
func NewCollectorHandler() *ColHandler {
ch := &ColHandler{
collectors: make([]collectorInterface, 0),
}

return ch
}

// InitGRPCServer Function
func (h *Handler) InitGRPCServer() error {
listenAddr := fmt.Sprintf("%s:%s", cfg.GlobalCfg.OtelGRPCListenAddr, cfg.GlobalCfg.OtelGRPCListenPort)
// == //

// Start listening
lis, err := net.Listen("tcp", listenAddr)
// StartCollector Function
func StartCollector() bool {
// Make a string with the given collector address and port
collectorService := fmt.Sprintf("%s:%s", config.GlobalConfig.CollectorAddr, config.GlobalConfig.CollectorPort)

// Start listening gRPC port
colService, err := net.Listen("tcp", collectorService)
if err != nil {
msg := fmt.Sprintf("unable to listen at %s: %v", listenAddr, err)
return errors.New(msg)
log.Fatalf("[Collector] Unable to listen at %s: %v", collectorService, err)
return false
}

// Create gRPC Server, register services
server := grpc.NewServer()
ColH.colService = colService
log.Printf("[Collector] Listening Collector gRPC (%s)", collectorService)

h.listener = lis
h.grpcServer = server
// Create gRPC Service
gRPCServer := grpc.NewServer()
ColH.grpcServer = gRPCServer

// initialize collectors
err = h.initCollectors()
if err != nil {
log.Printf("[Collector] Unable to initialize collector: %v", err)
}
// initialize OpenTelemetry collector
ColH.collectors = append(ColH.collectors, newOpenTelemetryLogsServer())

// initialize Envoy collectors for AccessLogs and Metrics
ColH.collectors = append(ColH.collectors, newEnvoyAccessLogsServer())
ColH.collectors = append(ColH.collectors, newEnvoyMetricsServer())

// register services
h.registerServices()
for _, col := range ColH.collectors {
col.registerService(ColH.grpcServer)
}

log.Printf("[Collector] Server listening at %s", listenAddr)
return nil
}
log.Printf("[Collector] Initialized Collector gRPC")

// initCollectors Function
func (h *Handler) initCollectors() error {
// @todo make configuration determine which collector to start or not
h.collectors = append(h.collectors, newOtelLogServer())
h.collectors = append(h.collectors, newEnvoyMetricsServer())
h.collectors = append(h.collectors, newEnvoyAccessLogsServer())
// Serve gRPC Service
go ColH.grpcServer.Serve(ColH.colService)

return nil
}
log.Printf("[Collector] Serving Collector gRPC")

// registerServices Function
func (h *Handler) registerServices() {
for _, col := range h.collectors {
col.registerService(h.grpcServer)
log.Printf("[Collector] Successfully registered services")
}
return true
}

// Serve Function
func (h *Handler) Serve() error {
log.Printf("[Collector] Starting gRPC server")
return h.grpcServer.Serve(h.listener)
}
// StopCollector Function
func StopCollector() bool {
ColH.grpcServer.GracefulStop()

// Stop Function
func (h *Handler) Stop() {
log.Printf("[Collector] Stopped gRPC server")
h.grpcServer.GracefulStop()
log.Printf("[Collector] Gracefully stopped Collector gRPC")

return true
}

// == //
Loading

0 comments on commit c5eafe3

Please sign in to comment.