From 02e9b61221aa64e5feadce9a00cd8b6a2994d2cb Mon Sep 17 00:00:00 2001 From: Heiko Reese Date: Thu, 3 Aug 2023 15:10:21 +0200 Subject: [PATCH] Lumberjack: add parallel flow processing (#77) * lumberjack: added support for optical decimal divider for options batchsize and queuesize * lumberjack: implemented multiple backend goroutines (count config parameter) --------- Co-authored-by: Heiko Reese --- CONFIGURATION.md | 13 ++ segments/output/lumberjack/lumberjack.go | 157 +++++++++++++---------- 2 files changed, 103 insertions(+), 67 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 7b937b1..5f43d84 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -914,6 +914,19 @@ no encryption), `tls://` (TLS encryption) or `tlsnoverify://` (TLS encryption wi certificate verification). The schema is followed by the hostname or IP address, a colon `:`, and a port number. IPv6 addresses must be surrounded by square brackets. +A goroutine is spawned for every lumberjack server. Each goroutine only uses one CPU core to +process and send flows. This may not be enough when the ingress flow rate is high and/or a high compression +level is used. The number of goroutines per backend can by set explicitly with the `?count=x` URL +parameter. For example: + +```yaml +config: + server: tls://host1:5043/?count=4, tls://host2:5043/?compression=9&count=16 +``` + +will use four parallel goroutines for `host1` and sixteen parallel goroutines for `host2`. Use `&count=…` instead of +`?count=…` when `count` is not the first parameter (standard URI convention). + Transport compression is disabled by default. Use `compression` to set the compression level for all hosts. Compression levels can vary between 0 (no compression) and 9 (maximum compression). To set per-host transport compression adding `?compression=` to the server URI. diff --git a/segments/output/lumberjack/lumberjack.go b/segments/output/lumberjack/lumberjack.go index 50a8f45..068fb56 100644 --- a/segments/output/lumberjack/lumberjack.go +++ b/segments/output/lumberjack/lumberjack.go @@ -6,6 +6,7 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" "log" "net/url" + "runtime" "strconv" "strings" "sync" @@ -25,6 +26,7 @@ type ServerOptions struct { UseTLS bool VerifyCertificate bool CompressionLevel int + Parallism int } type Lumberjack struct { @@ -113,10 +115,29 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment { } } + // parse count url argument + var numRoutines = 1 + numRoutinesString := urlQueryParams.Get("count") + if numRoutinesString == "" { + numRoutines = 1 + } else { + numRoutines, err = strconv.Atoi(numRoutinesString) + switch { + case err != nil: + log.Fatalf("[error] Lumberjack: Failed to parse count %s for host %s: %s", numRoutinesString, serverURL.Host, err) + case numRoutines < 1: + log.Printf("[warning] Lumberjack: count is smaller than 1, setting to 1") + numRoutines = 1 + case numRoutines > runtime.NumCPU(): + log.Printf("[warning] Lumberjack: count is larger than runtime.NumCPU (%d). This will most likely hurt performance.", runtime.NumCPU()) + } + } + segment.Servers[serverURL.Host] = ServerOptions{ UseTLS: useTLS, VerifyCertificate: verifyTLS, CompressionLevel: compressionLevel, + Parallism: numRoutines, } } } @@ -124,7 +145,7 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment { // parse batchSize option segment.BatchSize = defaultBatchSize if config["batchsize"] != "" { - segment.BatchSize, err = strconv.Atoi(config["batchsize"]) + segment.BatchSize, err = strconv.Atoi(strings.ReplaceAll(config["batchsize"], "_", "")) if err != nil { log.Fatalf("[error] Lumberjack: Failed to parse batchsize config option: %s", err) } @@ -183,7 +204,7 @@ func (segment *Lumberjack) New(config map[string]string) segments.Segment { // create buffered channel if config["queuesize"] != "" { - buflen, err = strconv.Atoi(config["queuesize"]) + buflen, err = strconv.Atoi(strings.ReplaceAll(config["queuesize"], "_", "")) if err != nil { log.Fatalf("[error] Lumberjack: Failed to parse queuesize config option: %s", err) } @@ -224,80 +245,82 @@ func (segment *Lumberjack) Run(wg *sync.WaitGroup) { for server, options := range segment.Servers { writerWG.Add(1) options := options - go func(server string) { - // connect to lumberjack server - client := NewResilientClient(server, options, segment.ReconnectWait) - defer client.Close() - log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v, Compression: %d)", server, options.UseTLS, options.VerifyCertificate, options.CompressionLevel) - - flowInterface := make([]interface{}, segment.BatchSize) - idx := 0 - - // see https://stackoverflow.com/questions/66037676/go-reset-a-timer-newtimer-within-select-loop for timer mechanics - timer := time.NewTimer(segment.BatchTimeout) - timer.Stop() - defer timer.Stop() - var timerSet bool - - for { - select { - case flow, isOpen := <-segment.LumberjackOut: - // exit on closed channel - if !isOpen { - // send local buffer - count, err := client.SendNoRetry(flowInterface[:idx]) - if err != nil { - log.Printf("[error] Lumberjack: Failed to send final flow batch upon exit to %s: %s", server, err) - } else { - segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent final batch (%d)", server, count) + for i := 0; i < options.Parallism; i++ { + go func(server string, numServer int) { + // connect to lumberjack server + client := NewResilientClient(server, options, segment.ReconnectWait) + defer client.Close() + log.Printf("[info] Lumberjack: Connected to %s (TLS: %v, VerifyTLS: %v, Compression: %d, number %d/%d)", server, options.UseTLS, options.VerifyCertificate, options.CompressionLevel, numServer+1, options.Parallism) + + flowInterface := make([]interface{}, segment.BatchSize) + idx := 0 + + // see https://stackoverflow.com/questions/66037676/go-reset-a-timer-newtimer-within-select-loop for timer mechanics + timer := time.NewTimer(segment.BatchTimeout) + timer.Stop() + defer timer.Stop() + var timerSet bool + + for { + select { + case flow, isOpen := <-segment.LumberjackOut: + // exit on closed channel + if !isOpen { + // send local buffer + count, err := client.SendNoRetry(flowInterface[:idx]) + if err != nil { + log.Printf("[error] Lumberjack: Failed to send final flow batch upon exit to %s: %s", server, err) + } else { + segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent final batch (%d)", server, count) + } + wg.Done() + return } - wg.Done() - return - } - // append flow to batch - flowInterface[idx] = flow - idx++ - - // send batch if full - if idx == segment.BatchSize { - // We got an event, and timer was already set. - // We need to stop the timer and drain the channel if needed, - // so that we can safely reset it later. - if timerSet { - if !timer.Stop() { - <-timer.C + // append flow to batch + flowInterface[idx] = flow + idx++ + + // send batch if full + if idx == segment.BatchSize { + // We got an event, and timer was already set. + // We need to stop the timer and drain the channel if needed, + // so that we can safely reset it later. + if timerSet { + if !timer.Stop() { + <-timer.C + } + timerSet = false } - timerSet = false - } - client.Send(flowInterface) - segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent full batch (%d)", server, segment.BatchSize) + client.Send(flowInterface) + segment.BatchDebugPrintf("[debug] Lumberjack: %s Sent full batch (%d)", server, segment.BatchSize) - // reset idx - idx = 0 + // reset idx + idx = 0 - // If timer was not set, or it was stopped before, it's safe to reset it. - if !timerSet { - timerSet = true - timer.Reset(segment.BatchTimeout) + // If timer was not set, or it was stopped before, it's safe to reset it. + if !timerSet { + timerSet = true + timer.Reset(segment.BatchTimeout) + } + } + case <-timer.C: + // timer expired, send batch + if idx > 0 { + segment.BatchDebugPrintf("[debug] Lumberjack: %s Sending incomplete batch (%d/%d)", server, idx, segment.BatchSize) + client.Send(flowInterface[:idx]) + idx = 0 + } else { + segment.BatchDebugPrintf("[debug] Lumberjack: %s Timer expired with empty batch", server) } - } - case <-timer.C: - // timer expired, send batch - if idx > 0 { - segment.BatchDebugPrintf("[debug] Lumberjack: %s Sending incomplete batch (%d/%d)", server, idx, segment.BatchSize) - client.Send(flowInterface[:idx]) - idx = 0 - } else { - segment.BatchDebugPrintf("[debug] Lumberjack: %s Timer expired with empty batch", server) - } - timer.Reset(segment.BatchTimeout) - timerSet = true + timer.Reset(segment.BatchTimeout) + timerSet = true + } } - } - }(server) + }(server, i) + } } // forward flows to lumberjack servers and to the next segment