Skip to content

Commit

Permalink
Merge pull request fluent#1190 from fschlager/feat/elastic-data-stream
Browse files Browse the repository at this point in the history
Support elastic_data_stream
  • Loading branch information
benjaminhuo authored Jun 4, 2024
2 parents ec9c5e4 + b395c4b commit d1a2e27
Show file tree
Hide file tree
Showing 16 changed files with 1,967 additions and 61 deletions.
26 changes: 19 additions & 7 deletions apis/fluentd/v1alpha1/plugins/output/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package output
import "github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins"

// Elasticsearch defines the parameters for out_es output plugin
type Elasticsearch struct {
type ElasticsearchCommon struct {
// The hostname of your Elasticsearch node (default: localhost).
Host *string `json:"host,omitempty"`
// The port number of your Elasticsearch node (default: 9200).
Expand All @@ -21,12 +21,6 @@ type Elasticsearch struct {
CloudId *plugins.Secret `json:"cloudId,omitempty"`
// Authenticate towards Elastic Cloud using cloudAuth.
CloudAuth *plugins.Secret `json:"cloudAuth,omitempty"`
// IndexName defines the placeholder syntax of Fluentd plugin API. See https://docs.fluentd.org/configuration/buffer-section.
IndexName *string `json:"indexName,omitempty"`
// If true, Fluentd uses the conventional index name format logstash-%Y.%m.%d (default: false). This option supersedes the index_name option.
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// LogstashPrefix defines the logstash prefix index name to write events when logstash_format is true (default: logstash).
LogstashPrefix *string `json:"logstashPrefix,omitempty"`
// Optional, The login credentials to connect to Elasticsearch
User *plugins.Secret `json:"user,omitempty"`
// Optional, The login credentials to connect to Elasticsearch
Expand All @@ -42,3 +36,21 @@ type Elasticsearch struct {
// Optional, password for ClientKey file
ClientKeyPassword *plugins.Secret `json:"clientKeyPassword,omitempty"`
}

type Elasticsearch struct {
ElasticsearchCommon `json:",inline"`

// IndexName defines the placeholder syntax of Fluentd plugin API. See https://docs.fluentd.org/configuration/buffer-section.
IndexName *string `json:"indexName,omitempty"`
// If true, Fluentd uses the conventional index name format logstash-%Y.%m.%d (default: false). This option supersedes the index_name option.
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// LogstashPrefix defines the logstash prefix index name to write events when logstash_format is true (default: logstash).
LogstashPrefix *string `json:"logstashPrefix,omitempty"`
}

type ElasticsearchDataStream struct {
ElasticsearchCommon `json:",inline"`

// You can specify Elasticsearch data stream name by this parameter. This parameter is mandatory for elasticsearch_data_stream
DataStreamName *string `json:"dataStreamName"`
}
81 changes: 56 additions & 25 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Output struct {
Http *Http `json:"http,omitempty"`
// out_es plugin
Elasticsearch *Elasticsearch `json:"elasticsearch,omitempty"`
// out_es datastreams plugin
ElasticsearchDataStream *ElasticsearchDataStream `json:"elasticsearchDataStream,omitempty"`
// out_opensearch plugin
Opensearch *Opensearch `json:"opensearch,omitempty"`
// out_kafka plugin
Expand Down Expand Up @@ -135,6 +137,11 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
return o.elasticsearchPlugin(ps, loader)
}

if o.ElasticsearchDataStream != nil {
ps.InsertType(string(params.ElasticsearchDataStreamOutputType))
return o.elasticsearchDataStreamPlugin(ps, loader)
}

if o.Opensearch != nil {
ps.InsertType(string(params.OpensearchOutputType))
return o.opensearchPlugin(ps, loader)
Expand Down Expand Up @@ -383,65 +390,75 @@ func (o *Output) httpPlugin(parent *params.PluginStore, loader plugins.SecretLoa
return parent
}

func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Elasticsearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Elasticsearch.Host))
func (o *Output) elasticsearchPluginCommon(common *ElasticsearchCommon, parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if common.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*common.Host))
}

if o.Elasticsearch.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*o.Elasticsearch.Port))
if common.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*common.Port))
}

if o.Elasticsearch.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*o.Elasticsearch.Hosts))
if common.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*common.Hosts))
}

if o.Elasticsearch.User != nil {
user, err := loader.LoadSecret(*o.Elasticsearch.User)
if common.User != nil {
user, err := loader.LoadSecret(*common.User)
if err != nil {
return nil, err
}
parent.InsertPairs("user", user)
}

if o.Elasticsearch.Password != nil {
pwd, err := loader.LoadSecret(*o.Elasticsearch.Password)
if common.Password != nil {
pwd, err := loader.LoadSecret(*common.Password)
if err != nil {
return nil, err
}
parent.InsertPairs("password", pwd)
}

if o.Elasticsearch.SslVerify != nil {
parent.InsertPairs("ssl_verify", fmt.Sprint(*o.Elasticsearch.SslVerify))
if common.SslVerify != nil {
parent.InsertPairs("ssl_verify", fmt.Sprint(*common.SslVerify))
}

if o.Elasticsearch.CAFile != nil {
parent.InsertPairs("ca_file", fmt.Sprint(*o.Elasticsearch.CAFile))
if common.CAFile != nil {
parent.InsertPairs("ca_file", fmt.Sprint(*common.CAFile))
}

if o.Elasticsearch.ClientCert != nil {
parent.InsertPairs("client_cert", fmt.Sprint(*o.Elasticsearch.ClientCert))
if common.ClientCert != nil {
parent.InsertPairs("client_cert", fmt.Sprint(*common.ClientCert))
}

if o.Elasticsearch.ClientKey != nil {
parent.InsertPairs("client_key", fmt.Sprint(*o.Elasticsearch.ClientKey))
if common.ClientKey != nil {
parent.InsertPairs("client_key", fmt.Sprint(*common.ClientKey))
}

if o.Elasticsearch.ClientKeyPassword != nil {
pwd, err := loader.LoadSecret(*o.Elasticsearch.ClientKeyPassword)
if common.ClientKeyPassword != nil {
pwd, err := loader.LoadSecret(*common.ClientKeyPassword)
if err != nil {
return nil, err
}
parent.InsertPairs("client_key_pass", pwd)
}

if o.Elasticsearch.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*o.Elasticsearch.Scheme))
if common.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*common.Scheme))
}

if common.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*common.Path))
}

if o.Elasticsearch.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*o.Elasticsearch.Path))
return parent, nil
}

func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {

parent, err := o.elasticsearchPluginCommon(&o.Elasticsearch.ElasticsearchCommon, parent, loader)
if err != nil {
return nil, err
}

if o.Elasticsearch.IndexName != nil {
Expand All @@ -459,6 +476,20 @@ func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.
return parent, nil
}

func (o *Output) elasticsearchDataStreamPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {

parent, err := o.elasticsearchPluginCommon(&o.ElasticsearchDataStream.ElasticsearchCommon, parent, loader)
if err != nil {
return nil, err
}

if o.ElasticsearchDataStream.DataStreamName != nil {
parent.InsertPairs("data_stream_name", fmt.Sprint(*o.ElasticsearchDataStream.DataStreamName))
}

return parent, nil
}

func (o *Output) opensearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Opensearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Opensearch.Host))
Expand Down
46 changes: 24 additions & 22 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ const (
ParserPlugin PluginName = "parser"
StdoutPlugin PluginName = "stdout"

ReLabelPlugin PluginName = "relabel"
LabelPlugin PluginName = "label"
LabelRouterPlugin PluginName = "label_router"
S3Plugin PluginName = "s3"
KafkaPlugin PluginName = "kafka2"
ElasticsearchPlugin PluginName = "elasticsearch"
OpensearchPlugin PluginName = "opensearch"
MatchPlugin PluginName = "match"
BufferPlugin PluginName = "buffer"
CloudWatchPlugin PluginName = "cloudwatch_logs"
DatadogPlugin PluginName = "datadog"
ReLabelPlugin PluginName = "relabel"
LabelPlugin PluginName = "label"
LabelRouterPlugin PluginName = "label_router"
S3Plugin PluginName = "s3"
KafkaPlugin PluginName = "kafka2"
ElasticsearchPlugin PluginName = "elasticsearch"
ElasticsearchDataStreamPlugin PluginName = "elasticsearch_data_stream"
OpensearchPlugin PluginName = "opensearch"
MatchPlugin PluginName = "match"
BufferPlugin PluginName = "buffer"
CloudWatchPlugin PluginName = "cloudwatch_logs"
DatadogPlugin PluginName = "datadog"

BufferTag string = "tag"
LabelTag string = "tag"
Expand All @@ -60,17 +61,18 @@ const (
StdoutFilterType FilterType = "stdout"

// Enums the supported output types
ForwardOutputType OutputType = "forward"
HttpOutputType OutputType = "http"
StdOutputType OutputType = "stdout"
KafkaOutputType OutputType = "kafka2"
ElasticsearchOutputType OutputType = "elasticsearch"
OpensearchOutputType OutputType = "opensearch"
S3OutputType OutputType = "s3"
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
ForwardOutputType OutputType = "forward"
HttpOutputType OutputType = "http"
StdOutputType OutputType = "stdout"
KafkaOutputType OutputType = "kafka2"
ElasticsearchOutputType OutputType = "elasticsearch"
ElasticsearchDataStreamOutputType OutputType = "elasticsearch_data_stream"
OpensearchOutputType OutputType = "opensearch"
S3OutputType OutputType = "s3"
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
)

var (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-0
@type copy
copy_mode no_copy
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-1
@type elasticsearch_data_stream
data_stream_name es1-notag-1
host elasticsearch-logging-data.kubesphere-logging-system.svc
password s3cr3tP@ssword
port 9243
scheme https
ssl_verify false
user s3cr3tUsern4me
<buffer>
@type memory
flush_mode immediate
</buffer>
</store>
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-2
@type elasticsearch_data_stream
data_stream_name es1-notag-2
host elasticsearch-logging-data.kubesphere-logging-system.svc
password s3cr3tP@ssword
port 9243
scheme https
ssl_verify false
user s3cr3tUsern4me
<buffer>
@type memory
flush_mode immediate
</buffer>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-0
@type elasticsearch_data_stream
data_stream_name test-ds
host elasticsearch-logging-data.kubesphere-logging-system.svc
port 9200
</match>
</label>
54 changes: 53 additions & 1 deletion apis/fluentd/v1alpha1/tests/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,66 @@ func Test_ClusterCfgOutput2ES(t *testing.T) {
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
fmt.Println(config)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2ESDataStream(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterFilters := []fluentdv1alpha1.ClusterFilter{}
clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es-data-stream.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2CopyESDataStream(t *testing.T) {
g := NewGomegaWithT(t)
sl := NewSecretLoader(logr.Logger{}, esCredentials)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterFilters := []fluentdv1alpha1.ClusterFilter{}
clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2CopyESDataStream}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs)
err = psr.IdentifyCopyAndPatchOutput(clustercfgResources)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-copy-es-data-stream.cfg"))).To(Equal(config))

i++
}
}

func Test_Cfg2OpenSearch(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
Expand Down
Loading

0 comments on commit d1a2e27

Please sign in to comment.