Skip to content

Commit

Permalink
[summerospp]add fluentbit mqtt plugin (#911)
Browse files Browse the repository at this point in the history
* [summerospp]add fluentbit mqtt plugin

Signed-off-by: “sjliu1” <“[email protected]”>

* [summerospp]add fluentbit mqtt plugin

Signed-off-by: “sjliu1” <“[email protected]”>

* [summerospp]add fluentbit mqtt plugin

Signed-off-by: “sjliu1” <“[email protected]”>

---------

Signed-off-by: “sjliu1” <“[email protected]”>
Co-authored-by: “sjliu1” <“[email protected]”>
  • Loading branch information
sjliu1 and “sjliu1” authored Sep 9, 2023
1 parent a669907 commit 8e3b536
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 12 deletions.
6 changes: 4 additions & 2 deletions apis/fluentbit/v1alpha2/clusterinput_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ type InputSpec struct {
CustomPlugin *custom.CustomPlugin `json:"customPlugin,omitempty"`
// Forward defines forward input plugin configuration
Forward *input.Forward `json:"forward,omitempty"`
// OpenTelemetry defines forward input plugin configuration
// OpenTelemetry defines the OpenTelemetry input plugin configuration
OpenTelemetry *input.OpenTelemetry `json:"openTelemetry,omitempty"`
// HTTP defines forward input plugin configuration
// HTTP defines the HTTP input plugin configuration
HTTP *input.HTTP `json:"http,omitempty"`
// MQTT defines the MQTT input plugin configuration
MQTT *input.MQTT `json:"mqtt,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
38 changes: 38 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package input

import (
"fmt"

"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)

// +kubebuilder:object:generate:=true

// The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection. <br />
// The incoming data to receive must be a JSON map. <br />
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**
type MQTT struct {
// Listener network interface, default: 0.0.0.0
Listen string `json:"listen,omitempty"`
// TCP port where listening for connections, default: 1883
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=65535
Port *int32 `json:"port,omitempty"`
}

func (_ *MQTT) Name() string {
return "mqtt"
}

// implement Section() method
func (m *MQTT) Params(_ plugins.SecretLoader) (*params.KVs, error) {
kvs := params.NewKVs()
if m.Listen != "" {
kvs.Insert("Listen", m.Listen)
}
if m.Port != nil {
kvs.Insert("Port", fmt.Sprint(*m.Port))
}
return kvs, nil
}
20 changes: 20 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ spec:
type: string
type: object
http:
description: HTTP defines forward input plugin configuration
description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down Expand Up @@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
mqtt:
description: MQTT defines the MQTT input plugin configuration
properties:
listen:
description: 'Listener network interface, default: 0.0.0.0'
type: string
port:
description: 'TCP port where listening for connections, default:
1883'
format: int32
maximum: 65535
minimum: 1
type: integer
type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
Expand All @@ -261,7 +275,8 @@ spec:
type: string
type: object
openTelemetry:
description: OpenTelemetry defines forward input plugin configuration
description: OpenTelemetry defines the OpenTelemetry input plugin
configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down
19 changes: 17 additions & 2 deletions config/crd/bases/fluentbit.fluent.io_clusterinputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ spec:
type: string
type: object
http:
description: HTTP defines forward input plugin configuration
description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down Expand Up @@ -236,6 +236,20 @@ spec:
- debug
- trace
type: string
mqtt:
description: MQTT defines the MQTT input plugin configuration
properties:
listen:
description: 'Listener network interface, default: 0.0.0.0'
type: string
port:
description: 'TCP port where listening for connections, default:
1883'
format: int32
maximum: 65535
minimum: 1
type: integer
type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
Expand All @@ -261,7 +275,8 @@ spec:
type: string
type: object
openTelemetry:
description: OpenTelemetry defines forward input plugin configuration
description: OpenTelemetry defines the OpenTelemetry input plugin
configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down
5 changes: 3 additions & 2 deletions docs/fluentbit.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ InputSpec defines the desired state of ClusterInput
| fluentBitMetrics | FluentBitMetrics defines Fluent Bit Metrics Input configuration. | *[input.FluentbitMetrics](plugins/input/fluentbitmetrics.md) |
| customPlugin | CustomPlugin defines Custom Input configuration. | *custom.CustomPlugin |
| forward | Forward defines forward input plugin configuration | *[input.Forward](plugins/input/forward.md) |
| openTelemetry | OpenTelemetry defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
| http | HTTP defines forward input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
| openTelemetry | OpenTelemetry defines the OpenTelemetry input plugin configuration | *[input.OpenTelemetry](plugins/input/opentelemetry.md) |
| http | HTTP defines the HTTP input plugin configuration | *[input.HTTP](plugins/input/http.md) |
| mqtt | MQTT defines the MQTT input plugin configuration | *[input.MQTT](plugins/input/mqtt.md) |

[Back to TOC](#table-of-contents)
# NamespacedFluentBitCfgSpec
Expand Down
9 changes: 9 additions & 0 deletions docs/plugins/fluentbit/input/mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# MQTT

The MQTT input plugin, allows to retrieve messages/data from MQTT control packets over a TCP connection. <br /> The incoming data to receive must be a JSON map. <br /> **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/inputs/mqtt**


| Field | Description | Scheme |
| ----- | ----------- | ------ |
| listen | Listener network interface, default: 0.0.0.0 | string |
| port | TCP port where listening for connections, default: 1883 | *int32 |
19 changes: 17 additions & 2 deletions manifests/setup/fluent-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ spec:
type: string
type: object
http:
description: HTTP defines forward input plugin configuration
description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down Expand Up @@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
mqtt:
description: MQTT defines the MQTT input plugin configuration
properties:
listen:
description: 'Listener network interface, default: 0.0.0.0'
type: string
port:
description: 'TCP port where listening for connections, default:
1883'
format: int32
maximum: 65535
minimum: 1
type: integer
type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
Expand All @@ -2010,7 +2024,8 @@ spec:
type: string
type: object
openTelemetry:
description: OpenTelemetry defines forward input plugin configuration
description: OpenTelemetry defines the OpenTelemetry input plugin
configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down
19 changes: 17 additions & 2 deletions manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ spec:
type: string
type: object
http:
description: HTTP defines forward input plugin configuration
description: HTTP defines the HTTP input plugin configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down Expand Up @@ -1985,6 +1985,20 @@ spec:
- debug
- trace
type: string
mqtt:
description: MQTT defines the MQTT input plugin configuration
properties:
listen:
description: 'Listener network interface, default: 0.0.0.0'
type: string
port:
description: 'TCP port where listening for connections, default:
1883'
format: int32
maximum: 65535
minimum: 1
type: integer
type: object
nodeExporterMetrics:
description: NodeExporterMetrics defines Node Exporter Metrics Input
configuration.
Expand All @@ -2010,7 +2024,8 @@ spec:
type: string
type: object
openTelemetry:
description: OpenTelemetry defines forward input plugin configuration
description: OpenTelemetry defines the OpenTelemetry input plugin
configuration
properties:
bufferChunkSize:
description: This sets the chunk size for incoming incoming JSON
Expand Down

0 comments on commit 8e3b536

Please sign in to comment.