Skip to content

Commit

Permalink
Merge pull request fluent#324 from jett-deng/develop
Browse files Browse the repository at this point in the history
Fluentd integrate OpenSearch plugin
  • Loading branch information
benjaminhuo authored Jun 13, 2022
2 parents 270caca + fe034d9 commit 2d2e832
Show file tree
Hide file tree
Showing 22 changed files with 1,160 additions and 7 deletions.
29 changes: 29 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/opensearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package output

import "github.com/fluent/fluent-operator/apis/fluentd/v1alpha1/plugins"

// Opensearch defines the parameters for out_opensearch plugin
type Opensearch struct {
// The hostname of your Opensearch node (default: localhost).
Host *string `json:"host,omitempty"`
// The port number of your Opensearch node (default: 9200).
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=65535
Port *uint32 `json:"port,omitempty"`
// Hosts defines a list of hosts if you want to connect to more than one Openearch nodes
Hosts *string `json:"hosts,omitempty"`
// Specify https if your Opensearch endpoint supports SSL (default: http).
Scheme *string `json:"scheme,omitempty"`
// Path defines the REST API endpoint of Opensearch to post write requests (default: nil).
Path *string `json:"path,omitempty"`
// IndexName defines the placeholder syntax of Fluentd plugin API. See https://docs.fluentd.org/configuration/buffer-section.
IndexName *string `json:"indexName,omitempty"`
// If true, Fluentd uses the conventional index name format logstash-%Y.%m.%d (default: false). This option supersedes the index_name option.
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// LogstashPrefix defines the logstash prefix index name to write events when logstash_format is true (default: logstash).
LogstashPrefix *string `json:"logstashPrefix,omitempty"`
// Optional, The login credentials to connect to Opensearch
User *plugins.Secret `json:"user,omitempty"`
// Optional, The login credentials to connect to Opensearch
Password *plugins.Secret `json:"password,omitempty"`
}
59 changes: 59 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Output struct {
Http *Http `json:"http,omitempty"`
// out_es plugin
Elasticsearch *Elasticsearch `json:"elasticsearch,omitempty"`
// out_opensearch plugin
Opensearch *Opensearch `json:"opensearch,omitempty"`
// out_kafka plugin
Kafka *Kafka2 `json:"kafka,omitempty"`
// out_s3 plugin
Expand Down Expand Up @@ -120,6 +122,11 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
return o.elasticsearchPlugin(ps, loader)
}

if o.Opensearch != nil {
ps.InsertType(string(params.OpensearchOutputType))
return o.opensearchPlugin(ps, loader)
}

if o.S3 != nil {
ps.InsertType(string(params.S3OutputType))
return o.s3Plugin(ps, loader), nil
Expand Down Expand Up @@ -391,6 +398,58 @@ func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.
return parent, nil
}

func (o *Output) opensearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Opensearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Opensearch.Host))
}

if o.Opensearch.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*o.Opensearch.Port))
}

if o.Opensearch.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*o.Opensearch.Hosts))
}

if o.Opensearch.User != nil {
user, err := loader.LoadSecret(*o.Opensearch.User)
if err != nil {
return nil, err
}
parent.InsertPairs("user", user)
}

if o.Opensearch.Password != nil {
pwd, err := loader.LoadSecret(*o.Opensearch.Password)
if err != nil {
return nil, err
}
parent.InsertPairs("password", pwd)
}

if o.Opensearch.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*o.Opensearch.Scheme))
}

if o.Opensearch.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*o.Opensearch.Path))
}

if o.Opensearch.IndexName != nil {
parent.InsertPairs("index_name", fmt.Sprint(*o.Opensearch.IndexName))
}

if o.Opensearch.LogstashFormat != nil {
parent.InsertPairs("logstash_format", fmt.Sprint(*o.Opensearch.LogstashFormat))
}

if o.Opensearch.LogstashPrefix != nil {
parent.InsertPairs("logstash_prefix", fmt.Sprint(*o.Opensearch.LogstashPrefix))
}

return parent, nil
}

func (o *Output) kafka2Plugin(parent *params.PluginStore, loader plugins.SecretLoader) *params.PluginStore {
if o.Kafka.Brokers != nil {
parent.InsertPairs("brokers", fmt.Sprint(*o.Kafka.Brokers))
Expand Down
2 changes: 2 additions & 0 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
S3Plugin PluginName = "s3"
KafkaPlugin PluginName = "kafka2"
ElasticsearchPlugin PluginName = "elasticsearch"
OpensearchPlugin PluginName = "opensearch"
MatchPlugin PluginName = "match"
BufferPlugin PluginName = "buffer"

Expand Down Expand Up @@ -59,6 +60,7 @@ const (
StdOutputType OutputType = "stdout"
KafkaOutputType OutputType = "kafka2"
ElasticsearchOutputType OutputType = "elasticsearch"
OpensearchOutputType OutputType = "opensearch"
S3OutputType OutputType = "s3"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-opensearch-0
@type opensearch
host opensearch-logging-data.kubesphere-logging-system.svc
logstash_format true
logstash_prefix ks-logstash-log
port 9200
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-opensearch-0
@type opensearch
host opensearch-logging-data.kubesphere-logging-system.svc
logstash_format true
logstash_prefix ks-logstash-log
port 9200
</match>
</label>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match **>
@id FluentdConfig-fluent-fluentd-config::cluster::clusteroutput::fluentd-output-opensearch-0
@type opensearch
host opensearch-logging-data.kubesphere-logging-system.svc
logstash_format true
logstash_prefix ks-logstash-log
port 9200
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match **>
@id FluentdConfig-fluent-fluentd-config::cluster::clusteroutput::fluentd-output-opensearch-0
@type opensearch
host opensearch-logging-data.kubesphere-logging-system.svc
logstash_format true
logstash_prefix ks-logstash-log
port 9200
</match>
</label>
84 changes: 84 additions & 0 deletions apis/fluentd/v1alpha1/tests/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,58 @@ func Test_ClusterCfgOutput2ES(t *testing.T) {
}
}

func Test_Cfg2OpenSearch(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, nil)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch}
cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster)
err = psr.WithCfgResources(*cfgRouter.Label, cfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-namespaced-cfg-output-opensearch.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2OpenSearch(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, nil)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterFilters := []fluentdv1alpha1.ClusterFilter{}
clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), clusterFilters, clusterOutputs)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
//fmt.Println(config)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-opensearch.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2Kafka(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, nil)
Expand Down Expand Up @@ -126,6 +178,38 @@ func Test_MixedCfgs2ES(t *testing.T) {
}
}

func Test_MixedCfgs2OpenSearch(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, nil)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster)
err = psr.WithCfgResources(*cfgRouter.Label, cfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
// fmt.Println(config)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-mixed-cfgs-output-opensearch.cfg"))).To(Equal(config))

i++
}
}

func Test_MixedCfgs2MultiTenant(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, nil)
Expand Down
18 changes: 18 additions & 0 deletions apis/fluentd/v1alpha1/tests/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ spec:
logstashPrefix: ks-logstash-log
`

FluentdclusterOutput2OpenSearch fluentdv1alpha1.ClusterOutput
FluentdclusterOutput2OpenSearchRaw = `
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
name: fluentd-output-opensearch
labels:
output.fluentd.fluent.io/enabled: "true"
spec:
outputs:
- opensearch:
host: opensearch-logging-data.kubesphere-logging-system.svc
port: 9200
logstashFormat: true
logstashPrefix: ks-logstash-log
`

FluentdClusterOutput2kafka fluentdv1alpha1.ClusterOutput
FluentdClusterOutput2kafkaRaw = `
apiVersion: fluentd.fluent.io/v1alpha1
Expand Down Expand Up @@ -359,6 +376,7 @@ func init() {
ParseIntoObject(FluentdClusterOutputLogOperatorRaw, &FluentdClusterOutputLogOperator)
ParseIntoObject(FluentdClusterOutputBufferRaw, &FluentdClusterOutputBuffer)
ParseIntoObject(FluentdclusterOutput2ESRaw, &FluentdclusterOutput2ES)
ParseIntoObject(FluentdclusterOutput2OpenSearchRaw, &FluentdclusterOutput2OpenSearch)
ParseIntoObject(FluentdClusterOutput2kafkaRaw, &FluentdClusterOutput2kafka)
ParseIntoObject(FluentdOutputUser1Raw, &FluentdOutputUser1)
},
Expand Down
Loading

0 comments on commit 2d2e832

Please sign in to comment.