Skip to content

Commit

Permalink
Merge branch 'master' into apiclient-apiserver-socket
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Mar 8, 2024
2 parents 228542b + 6c5e8af commit 007a64f
Show file tree
Hide file tree
Showing 53 changed files with 821 additions and 162 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ cscli: goversion ## Build cscli
crowdsec: goversion ## Build crowdsec
@$(MAKE) -C $(CROWDSEC_FOLDER) build $(MAKE_FLAGS)

.PHONY: generate
generate: ## Generate code for the database and APIs
$(GO) generate ./pkg/database/ent
$(GO) generate ./pkg/models

.PHONY: testclean
testclean: bats-clean ## Remove test artifacts
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/api.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"errors"
"fmt"
"runtime"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/crowdsecurity/go-cs-lib/trace"
Expand Down
33 changes: 19 additions & 14 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,42 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)

func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, error) {
// initCrowdsec prepares the log processor service
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, []acquisition.DataSource, error) {
var err error

if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading context: %w", err)
return nil, nil, fmt.Errorf("while loading context: %w", err)
}

// Start loading configs
csParsers := parser.NewParsers(hub)
if csParsers, err = parser.LoadParsers(cConfig, csParsers); err != nil {
return nil, fmt.Errorf("while loading parsers: %w", err)
return nil, nil, fmt.Errorf("while loading parsers: %w", err)
}

if err := LoadBuckets(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading scenarios: %w", err)
return nil, nil, fmt.Errorf("while loading scenarios: %w", err)
}

if err := appsec.LoadAppsecRules(hub); err != nil {
return nil, fmt.Errorf("while loading appsec rules: %w", err)
return nil, nil, fmt.Errorf("while loading appsec rules: %w", err)
}

if err := LoadAcquisition(cConfig); err != nil {
return nil, fmt.Errorf("while loading acquisition config: %w", err)
datasources, err := LoadAcquisition(cConfig)
if err != nil {
return nil, nil, fmt.Errorf("while loading acquisition config: %w", err)
}

return csParsers, nil
return csParsers, datasources, nil
}

func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub) error {
// runCrowdsec starts the log processor service
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
inputEventChan = make(chan types.Event)
inputLineChan = make(chan types.Event)

//start go-routines for parsing, buckets pour and outputs.
// start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}

parsersTomb.Go(func() error {
Expand All @@ -65,7 +68,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
parsersTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runParse")

if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil {
// this error will never happen as parser.Parse is not able to return errors
log.Fatalf("starting parse error : %s", err)
return err
}
Expand Down Expand Up @@ -161,7 +165,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
return nil
}

func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
// serveCrowdsec wraps the log processor service
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")

Expand All @@ -171,7 +176,7 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true

if err := runCrowdsec(cConfig, parsers, hub); err != nil {
if err := runCrowdsec(cConfig, parsers, hub, datasources); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
Expand Down Expand Up @@ -275,7 +280,7 @@ func waitOnTomb() {
case <-acquisTomb.Dead():
/*if it's acquisition dying it means that we were in "cat" mode.
while shutting down, we need to give time for all buckets to process in flight data*/
log.Warning("Acquisition is finished, shutting down")
log.Info("Acquisition is finished, shutting down")
/*
While it might make sense to want to shut-down parser/buckets/etc. as soon as acquisition is finished,
we might have some pending buckets: buckets that overflowed, but whose LeakRoutine are still alive because they
Expand Down
28 changes: 28 additions & 0 deletions cmd/crowdsec/fatalhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"io"

log "github.com/sirupsen/logrus"
)

// FatalHook is used to log fatal messages to stderr when the rest goes to a file
type FatalHook struct {
Writer io.Writer
LogLevels []log.Level
}

func (hook *FatalHook) Fire(entry *log.Entry) error {
line, err := entry.String()
if err != nil {
return err
}

_, err = hook.Writer.Write([]byte(line))

return err
}

func (hook *FatalHook) Levels() []log.Level {
return hook.LogLevels
}
43 changes: 0 additions & 43 deletions cmd/crowdsec/hook.go

This file was deleted.

44 changes: 29 additions & 15 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"flag"
"fmt"
_ "net/http/pprof"
Expand All @@ -10,7 +11,6 @@ import (
"strings"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"

Expand Down Expand Up @@ -72,7 +72,11 @@ type Flags struct {
DisableCAPI bool
Transform string
OrderEvent bool
CpuProfile string
CPUProfile string
}

func (f *Flags) haveTimeMachine() bool {
return f.OneShotDSN != ""
}

type labelsMap map[string]string
Expand All @@ -95,7 +99,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, hub, files, &bucketsTomb, buckets, flags.OrderEvent)

if err != nil {
return fmt.Errorf("scenario loading failed: %v", err)
return fmt.Errorf("scenario loading failed: %w", err)
}

if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
Expand All @@ -107,7 +111,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
return nil
}

func LoadAcquisition(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error

if flags.SingleFileType != "" && flags.OneShotDSN != "" {
Expand All @@ -116,20 +120,20 @@ func LoadAcquisition(cConfig *csconfig.Config) error {

dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
return err
return nil, err
}
}

if len(dataSources) == 0 {
return fmt.Errorf("no datasource enabled")
return nil, errors.New("no datasource enabled")
}

return nil
return dataSources, nil
}

var (
Expand Down Expand Up @@ -181,7 +185,7 @@ func (f *Flags) Parse() {
}

flag.StringVar(&dumpFolder, "dump-data", "", "dump parsers/buckets raw outputs")
flag.StringVar(&f.CpuProfile, "cpu-profile", "", "write cpu profile to file")
flag.StringVar(&f.CPUProfile, "cpu-profile", "", "write cpu profile to file")
flag.Parse()
}

Expand Down Expand Up @@ -249,7 +253,12 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
return nil, err
}

primalHook.Enabled = (cConfig.Common.LogMedia != "stdout")
if cConfig.Common.LogMedia != "stdout" {
log.AddHook(&FatalHook{
Writer: os.Stderr,
LogLevels: []log.Level{log.FatalLevel, log.PanicLevel},
})
}

if err := csconfig.LoadFeatureFlagsFile(configFile, log.StandardLogger()); err != nil {
return nil, err
Expand All @@ -272,7 +281,7 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
}

if cConfig.DisableAPI && cConfig.DisableAgent {
return nil, errors.New("You must run at least the API Server or crowdsec")
return nil, errors.New("you must run at least the API Server or crowdsec")
}

if flags.OneShotDSN != "" && flags.SingleFileType == "" {
Expand Down Expand Up @@ -323,7 +332,9 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
var crowdsecT0 time.Time

func main() {
log.AddHook(primalHook)
// The initial log level is INFO, even if the user provided an -error or -warning flag
// because we need feature flags before parsing cli flags
log.SetFormatter(&log.TextFormatter{TimestampFormat: time.RFC3339, FullTimestamp: true})

if err := fflag.RegisterAllFeatures(); err != nil {
log.Fatalf("failed to register features: %s", err)
Expand Down Expand Up @@ -355,16 +366,19 @@ func main() {
os.Exit(0)
}

if flags.CpuProfile != "" {
f, err := os.Create(flags.CpuProfile)
if flags.CPUProfile != "" {
f, err := os.Create(flags.CPUProfile)
if err != nil {
log.Fatalf("could not create CPU profile: %s", err)
}
log.Infof("CPU profile will be written to %s", flags.CpuProfile)

log.Infof("CPU profile will be written to %s", flags.CPUProfile)

if err := pprof.StartCPUProfile(f); err != nil {
f.Close()
log.Fatalf("could not start CPU profile: %s", err)
}

defer f.Close()
defer pprof.StopCPUProfile()
}
Expand Down
11 changes: 7 additions & 4 deletions cmd/crowdsec/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// catch panics here because they are not handled by servePrometheus
defer trace.CatchPanic("crowdsec/computeDynamicMetrics")
//update cache metrics (stash)
// update cache metrics (stash)
cache.UpdateCacheMetrics()
//update cache metrics (regexp)
// update cache metrics (regexp)
exprhelpers.UpdateRegexpCacheMetrics()

//decision metrics are only relevant for LAPI
// decision metrics are only relevant for LAPI
if dbClient == nil {
next.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -196,6 +196,9 @@ func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client,
log.Debugf("serving metrics after %s ms", time.Since(crowdsecT0))

if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil {
log.Warningf("prometheus: %s", err)
// in time machine, we most likely have the LAPI using the port
if !flags.haveTimeMachine() {
log.Warningf("prometheus: %s", err)
}
}
}
8 changes: 4 additions & 4 deletions cmd/crowdsec/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) {
}

for k, src := range alert.Sources {
refsrc := *alert.Alert //copy
refsrc := *alert.Alert // copy

log.Tracef("source[%s]", k)

Expand Down Expand Up @@ -81,7 +81,7 @@ LOOP:
cacheMutex.Unlock()
if err := PushAlerts(cachecopy, client); err != nil {
log.Errorf("while pushing to api : %s", err)
//just push back the events to the queue
// just push back the events to the queue
cacheMutex.Lock()
cache = append(cache, cachecopy...)
cacheMutex.Unlock()
Expand Down Expand Up @@ -110,8 +110,8 @@ LOOP:
return fmt.Errorf("postoverflow failed: %w", err)
}
log.Printf("%s", *event.Overflow.Alert.Message)
//if the Alert is nil, it's to signal bucket is ready for GC, don't track this
//dump after postoveflow processing to avoid missing whitelist info
// if the Alert is nil, it's to signal bucket is ready for GC, don't track this
// dump after postoveflow processing to avoid missing whitelist info
if dumpStates && event.Overflow.Alert != nil {
if bucketOverflows == nil {
bucketOverflows = make([]types.Event, 0)
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

func runParse(input chan types.Event, output chan types.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error {

LOOP:
for {
select {
Expand Down Expand Up @@ -56,5 +55,6 @@ LOOP:
output <- parsed
}
}

return nil
}
Loading

0 comments on commit 007a64f

Please sign in to comment.