diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go index 2b7c64721..73b607bfc 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go @@ -103,6 +103,10 @@ type Service struct { ParsersFile string `json:"parsersFile,omitempty"` // Configure a global environment for the storage layer in Service. It is recommended to configure the volume and volumeMount separately for this storage. The hostPath type should be used for that Volume in Fluentbit daemon set. Storage *Storage `json:"storage,omitempty"` + // Per-namespace re-emitter configuration + EmitterName string `json:"emitterName,omitempty"` + EmitterMemBufLimit string `json:"emitterMemBufLimit,omitempty"` + EmitterStorageType string `json:"emitterStorageType,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/fluentbit/v1alpha2/fluentbitconfig_types.go b/apis/fluentbit/v1alpha2/fluentbitconfig_types.go index 29a0d0144..29b13b9d8 100644 --- a/apis/fluentbit/v1alpha2/fluentbitconfig_types.go +++ b/apis/fluentbit/v1alpha2/fluentbitconfig_types.go @@ -31,6 +31,8 @@ type NamespacedFluentBitCfgSpec struct { OutputSelector metav1.LabelSelector `json:"outputSelector,omitempty"` // Select parser plugins ParserSelector metav1.LabelSelector `json:"parserSelector,omitempty"` + // Service defines the global behaviour of the Fluent Bit engine. + Service *Service `json:"service,omitempty"` // Select cluster level parser config ClusterParserSelector metav1.LabelSelector `json:"clusterParserSelector,omitempty"` } diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_fluentbitconfigs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_fluentbitconfigs.yaml index 73a7e7ede..40468ec61 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_fluentbitconfigs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_fluentbitconfigs.yaml @@ -217,6 +217,30 @@ spec: type: object type: object x-kubernetes-map-type: atomic + service: + description: Service defines the namespaced behaviour of the Fluent Bit + engine. + properties: + emitterName: + description: When the filter emits a record under the new + Tag, there is an internal emitter plugin that takes care + of the job. Since this emitter expose metrics as any other + component of the pipeline, you can use this property to + configure an optional name for it. + type: string + emitterStorageType: + description: Specify the emitter buffering mechanism to use. It can be + memory or filesystem + enum: + - filesystem + - memory + type: string + emitterMemBufLimit: + description: Set a limit of memory that Emitter plugin can use when + appending data to the Engine. If the limit is reach, it will + be paused; when the data is flushed it resumes. + type: string + type: object type: object type: object served: true diff --git a/controllers/fluentbitconfig_controller.go b/controllers/fluentbitconfig_controller.go index 7ae78cb04..c27da6c2f 100644 --- a/controllers/fluentbitconfig_controller.go +++ b/controllers/fluentbitconfig_controller.go @@ -310,7 +310,17 @@ func (r *FluentBitConfigReconciler) generateRewriteTagConfig(cfg fluentbitv1alph buf.WriteString(fmt.Sprintf(" Name rewrite_tag\n")) buf.WriteString(fmt.Sprintf(" Match %s\n", tag)) buf.WriteString(fmt.Sprintf(" Rule $kubernetes['namespace_name'] ^(%s)$ %x.$TAG false\n", cfg.Namespace, md5.Sum([]byte(cfg.Namespace)))) - buf.WriteString(fmt.Sprintf(" Emitter_Name re_emitted_%x\n", md5.Sum([]byte(cfg.Namespace)))) + if cfg.Spec.Service.EmitterName != "" { + buf.WriteString(fmt.Sprintf(" Emitter_Name %s\n", cfg.Spec.Service.EmitterName)) + } else { + buf.WriteString(fmt.Sprintf(" Emitter_Name re_emitted_%x\n", md5.Sum([]byte(cfg.Namespace)))) + } + if cfg.Spec.Service.EmitterStorageType != "" { + buf.WriteString(fmt.Sprintf(" Emitter_Storage.type %s\n", cfg.Spec.Service.EmitterStorageType)) + } + if cfg.Spec.Service.EmitterMemBufLimit != "" { + buf.WriteString(fmt.Sprintf(" Emitter_Mem_Buf_Limit %s\n", cfg.Spec.Service.EmitterMemBufLimit)) + } return buf.String() }