Skip to content

Commit

Permalink
Additional params for input & output APIs + ability to set retry_limi…
Browse files Browse the repository at this point in the history
…t for default loki output (#1442)

* threads support added for various inputs & outputs + loki retries

* set loki output to no_limits by default
  • Loading branch information
chrono2002 authored Dec 26, 2024
1 parent a12acde commit 369c7e5
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 49 deletions.
5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/tail_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type Tail struct {
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
//Skips empty lines in the log file from any further processing or output.
SkipEmptyLines *bool `json:"skipEmptyLines,omitempty"`
// Threaded mechanism allows input plugin to run in a separate thread which helps to desaturate the main pipeline.
Threaded *string `json:"threaded,omitempty"`
}

func (_ *Tail) Name() string {
Expand Down Expand Up @@ -202,5 +204,8 @@ func (t *Tail) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if t.SkipEmptyLines != nil {
kvs.Insert("Skip_Empty_Lines", fmt.Sprint(*t.SkipEmptyLines))
}
if t.Threaded != nil {
kvs.Insert("Threaded", *t.Threaded)
}
return kvs, nil
}
11 changes: 11 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Kafka struct {
//The default value is 10 times, the interval between each retry is 1 second.
//Setting the queue_full_retries value to 0 set's an unlimited number of retries.
QueueFullRetries *int64 `json:"queueFullRetries,omitempty"`
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
TotalLimitSize string `json:"totalLimitSize,omitempty"`
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"workers,omitempty"`
}

func (*Kafka) Name() string {
Expand Down Expand Up @@ -88,5 +92,12 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) {
return fmt.Sprintf("rdkafka.%s", k), v
})

if k.TotalLimitSize != "" {
kvs.Insert("storage.total_limit_size", k.TotalLimitSize)
}
if k.Workers != nil {
kvs.Insert("workers", fmt.Sprint(*k.Workers))
}

return kvs, nil
}
10 changes: 10 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/loki_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type Loki struct {
*plugins.TLS `json:"tls,omitempty"`
// Include fluentbit networking options for this output-plugin
*plugins.Networking `json:"networking,omitempty"`
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
TotalLimitSize string `json:"totalLimitSize,omitempty"`
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"workers,omitempty"`
}

// implement Section() method
Expand Down Expand Up @@ -144,5 +148,11 @@ func (l *Loki) Params(sl plugins.SecretLoader) (*params.KVs, error) {
}
kvs.Merge(net)
}
if l.TotalLimitSize != "" {
kvs.Insert("storage.total_limit_size", l.TotalLimitSize)
}
if l.Workers != nil {
kvs.Insert("workers", fmt.Sprint(*l.Workers))
}
return kvs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type OpenSearch struct {
// When enabled, mapping types is removed and Type option is ignored. Types are deprecated in APIs in v7.0. This options is for v7.0 or later.
SuppressTypeName *bool `json:"suppressTypeName,omitempty"`
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"Workers,omitempty"`
Workers *int32 `json:"workers,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
// Include fluentbit networking options for this output-plugin
*plugins.Networking `json:"networking,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ spec:
tagRegex:
description: Set a regex to exctract fields from the file
type: string
threaded:
description: Threaded mechanism allows input plugin to run in
a separate thread which helps to desaturate the main pipeline.
type: string
type: object
tcp:
description: TCP defines the TCP input plugin configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,16 @@ spec:
If only one topic is set, that one will be used for all records.
Instead if multiple topics exists, the one set in the record by Topic_Key will be used.
type: string
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
Expand Down Expand Up @@ -2474,10 +2484,20 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
uri:
description: Specify a custom HTTP URI. It must start with forward
slash.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
required:
- host
type: object
Expand All @@ -2497,12 +2517,6 @@ spec:
opensearch:
description: OpenSearch defines OpenSearch Output configuration.
properties:
Workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
awsAuth:
description: Enable AWS Sigv4 Authentication for Amazon OpenSearch
Service.
Expand Down Expand Up @@ -2835,6 +2849,12 @@ spec:
type:
description: Type name
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
writeOperation:
description: Operation to use to write in bulk requests.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,16 @@ spec:
If only one topic is set, that one will be used for all records.
Instead if multiple topics exists, the one set in the record by Topic_Key will be used.
type: string
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
Expand Down Expand Up @@ -2474,10 +2484,20 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
uri:
description: Specify a custom HTTP URI. It must start with forward
slash.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
required:
- host
type: object
Expand All @@ -2497,12 +2517,6 @@ spec:
opensearch:
description: OpenSearch defines OpenSearch Output configuration.
properties:
Workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
awsAuth:
description: Enable AWS Sigv4 Authentication for Amazon OpenSearch
Service.
Expand Down Expand Up @@ -2835,6 +2849,12 @@ spec:
type:
description: Type name
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
writeOperation:
description: Operation to use to write in bulk requests.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ metadata:
fluentbit.fluent.io/component: logging
spec:
matchRegex: (?:kube|service)\.(.*)
retry_limit: "{{ .retryLimit }}"
loki:
{{ if .host }}host: {{ .host | quote }}{{ end }}
{{ if .port }}port: {{ .port }}{{ end }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluent-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ fluentbit:
loki:
# Switch for generation of fluentbit loki ClusterOutput (and loki basic auth http user and pass secrets if required)
enable: false # Bool
retryLimit: "no_limits"
host: 127.0.0.1 # String
port: 3100 # Int
# Either, give http{User,Password},tenantID string values specifying them directly
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ spec:
tagRegex:
description: Set a regex to exctract fields from the file
type: string
threaded:
description: Threaded mechanism allows input plugin to run in
a separate thread which helps to desaturate the main pipeline.
type: string
type: object
tcp:
description: TCP defines the TCP input plugin configuration
Expand Down
32 changes: 26 additions & 6 deletions config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,16 @@ spec:
If only one topic is set, that one will be used for all records.
Instead if multiple topics exists, the one set in the record by Topic_Key will be used.
type: string
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
Expand Down Expand Up @@ -2474,10 +2484,20 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
uri:
description: Specify a custom HTTP URI. It must start with forward
slash.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
required:
- host
type: object
Expand All @@ -2497,12 +2517,6 @@ spec:
opensearch:
description: OpenSearch defines OpenSearch Output configuration.
properties:
Workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
awsAuth:
description: Enable AWS Sigv4 Authentication for Amazon OpenSearch
Service.
Expand Down Expand Up @@ -2835,6 +2849,12 @@ spec:
type:
description: Type name
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
writeOperation:
description: Operation to use to write in bulk requests.
type: string
Expand Down
32 changes: 26 additions & 6 deletions config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,16 @@ spec:
If only one topic is set, that one will be used for all records.
Instead if multiple topics exists, the one set in the record by Topic_Key will be used.
type: string
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
Expand Down Expand Up @@ -2474,10 +2484,20 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
uri:
description: Specify a custom HTTP URI. It must start with forward
slash.
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
required:
- host
type: object
Expand All @@ -2497,12 +2517,6 @@ spec:
opensearch:
description: OpenSearch defines OpenSearch Output configuration.
properties:
Workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
awsAuth:
description: Enable AWS Sigv4 Authentication for Amazon OpenSearch
Service.
Expand Down Expand Up @@ -2835,6 +2849,12 @@ spec:
type:
description: Type name
type: string
workers:
description: Enables dedicated thread(s) for this output. Default
value is set since version 1.8.13. For previous versions is
0.
format: int32
type: integer
writeOperation:
description: Operation to use to write in bulk requests.
type: string
Expand Down
Loading

0 comments on commit 369c7e5

Please sign in to comment.