diff --git a/apis/fluentbit/v1alpha2/clusterinput_types.go b/apis/fluentbit/v1alpha2/clusterinput_types.go
index dbe3f0fe9..7c3b3864b 100644
--- a/apis/fluentbit/v1alpha2/clusterinput_types.go
+++ b/apis/fluentbit/v1alpha2/clusterinput_types.go
@@ -55,10 +55,12 @@ type InputSpec struct {
CustomPlugin *custom.CustomPlugin `json:"customPlugin,omitempty"`
// Forward defines forward input plugin configuration
Forward *input.Forward `json:"forward,omitempty"`
- // OpenTelemetry defines forward input plugin configuration
+ // OpenTelemetry defines the OpenTelemetry input plugin configuration
OpenTelemetry *input.OpenTelemetry `json:"openTelemetry,omitempty"`
- // HTTP defines forward input plugin configuration
+ // HTTP defines the HTTP input plugin configuration
HTTP *input.HTTP `json:"http,omitempty"`
+ // MQTT defines the MQTT input plugin configuration
+ MQTT *input.MQTT `json:"mqtt,omitempty"`
}
// +kubebuilder:object:root=true
diff --git a/apis/fluentbit/v1alpha2/plugins/input/mqtt.go b/apis/fluentbit/v1alpha2/plugins/input/mqtt.go
new file mode 100644
index 000000000..9602e105e
--- /dev/null
+++ b/apis/fluentbit/v1alpha2/plugins/input/mqtt.go
@@ -0,0 +1,38 @@
+package input
+
+import (
+ "fmt"
+
+ "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
+ "github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
+)
+
+// +kubebuilder:object:generate:=true
+
+// The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection.
+// The incoming data to receive must be a JSON map.
+// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**
+type MQTT struct {
+ // Listener network interface, default: 0.0.0.0
+ Listen string `json:"listen,omitempty"`
+ // TCP port where listening for connections, default: 1883
+ // +kubebuilder:validation:Minimum:=1
+ // +kubebuilder:validation:Maximum:=65535
+ Port *int32 `json:"port,omitempty"`
+}
+
+func (_ *MQTT) Name() string {
+ return "mqtt"
+}
+
+// implement Section() method
+func (m *MQTT) Params(_ plugins.SecretLoader) (*params.KVs, error) {
+ kvs := params.NewKVs()
+ if m.Listen != "" {
+ kvs.Insert("Listen", m.Listen)
+ }
+ if m.Port != nil {
+ kvs.Insert("Port", fmt.Sprint(*m.Port))
+ }
+ return kvs, nil
+}
diff --git a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
index 4d676498c..2ccb6983d 100644
--- a/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
+++ b/apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go
@@ -120,6 +120,26 @@ func (in *HTTP) DeepCopy() *HTTP {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *MQTT) DeepCopyInto(out *MQTT) {
+ *out = *in
+ if in.Port != nil {
+ in, out := &in.Port, &out.Port
+ *out = new(int32)
+ **out = **in
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MQTT.
+func (in *MQTT) DeepCopy() *MQTT {
+ if in == nil {
+ return nil
+ }
+ out := new(MQTT)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NodeExporterMetrics) DeepCopyInto(out *NodeExporterMetrics) {
*out = *in
diff --git a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
index 0f4033b6a..ed49f9665 100644
--- a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
+++ b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go
@@ -1088,6 +1088,11 @@ func (in *InputSpec) DeepCopyInto(out *InputSpec) {
*out = new(input.HTTP)
(*in).DeepCopyInto(*out)
}
+ if in.MQTT != nil {
+ in, out := &in.MQTT, &out.MQTT
+ *out = new(input.MQTT)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputSpec.
diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
index ed29b3526..36e29de9b 100644
--- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
+++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusterinputs.yaml
@@ -124,7 +124,7 @@ spec:
type: string
type: object
http:
- description: HTTP defines forward input plugin configuration
+ description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
@@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines the MQTT input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
@@ -261,7 +275,8 @@ spec:
type: string
type: object
openTelemetry:
- description: OpenTelemetry defines forward input plugin configuration
+ description: OpenTelemetry defines the OpenTelemetry input plugin
+ configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
diff --git a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
index ed29b3526..36e29de9b 100644
--- a/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
+++ b/config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
@@ -124,7 +124,7 @@ spec:
type: string
type: object
http:
- description: HTTP defines forward input plugin configuration
+ description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
@@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines the MQTT input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
@@ -261,7 +275,8 @@ spec:
type: string
type: object
openTelemetry:
- description: OpenTelemetry defines forward input plugin configuration
+ description: OpenTelemetry defines the OpenTelemetry input plugin
+ configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
diff --git a/docs/fluentbit.md b/docs/fluentbit.md
index 4c6ace504..8d5d4f34a 100644
--- a/docs/fluentbit.md
+++ b/docs/fluentbit.md
@@ -419,8 +419,9 @@ InputSpec defines the desired state of ClusterInput
| fluentBitMetrics | FluentBitMetrics defines Fluent Bit Metrics Input configuration. | *[input.FluentbitMetrics](plugins/input/fluentbitmetrics.md) |
| customPlugin | CustomPlugin defines Custom Input configuration. | *custom.CustomPlugin |
| forward | Forward defines forward input plugin configuration | *[input.Forward](plugins/input/forward.md) |
-| openTelemetry | OpenTelemetry defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
-| http | HTTP defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
+| openTelemetry | OpenTelemetry defines the OpenTelemetry input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
+| http | HTTP defines the HTTP input plugin configuration | *[input.HTTP](plugins/input/http.md) |
+| mqtt | MQTT defines the MQTT input plugin configuration | *[input.MQTT](plugins/input/mqtt.md) |
[Back to TOC](#table-of-contents)
# NamespacedFluentBitCfgSpec
diff --git a/docs/plugins/fluentbit/input/mqtt.md b/docs/plugins/fluentbit/input/mqtt.md
new file mode 100644
index 000000000..e8fffd36d
--- /dev/null
+++ b/docs/plugins/fluentbit/input/mqtt.md
@@ -0,0 +1,9 @@
+# MQTT
+
+The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection.
The incoming data to receive must be a JSON map.
**For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**
+
+
+| Field | Description | Scheme |
+| ----- | ----------- | ------ |
+| listen | Listener network interface, default: 0.0.0.0 | string |
+| port | TCP port where listening for connections, default: 1883 | *int32 |
diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml
index bde8e6ab9..12a0bd113 100644
--- a/manifests/setup/fluent-operator-crd.yaml
+++ b/manifests/setup/fluent-operator-crd.yaml
@@ -1873,7 +1873,7 @@ spec:
type: string
type: object
http:
- description: HTTP defines forward input plugin configuration
+ description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
@@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines the MQTT input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
@@ -2010,7 +2024,8 @@ spec:
type: string
type: object
openTelemetry:
- description: OpenTelemetry defines forward input plugin configuration
+ description: OpenTelemetry defines the OpenTelemetry input plugin
+ configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml
index 849f9b498..44e35aeb2 100644
--- a/manifests/setup/setup.yaml
+++ b/manifests/setup/setup.yaml
@@ -1873,7 +1873,7 @@ spec:
type: string
type: object
http:
- description: HTTP defines forward input plugin configuration
+ description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
@@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
+ mqtt:
+ description: MQTT defines the MQTT input plugin configuration
+ properties:
+ listen:
+ description: 'Listener network interface, default: 0.0.0.0'
+ type: string
+ port:
+ description: 'TCP port where listening for connections, default:
+ 1883'
+ format: int32
+ maximum: 65535
+ minimum: 1
+ type: integer
+ type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
@@ -2010,7 +2024,8 @@ spec:
type: string
type: object
openTelemetry:
- description: OpenTelemetry defines forward input plugin configuration
+ description: OpenTelemetry defines the OpenTelemetry input plugin
+ configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON