diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go index ea4b7fcb3..9a2fe9c93 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go @@ -51,6 +51,15 @@ var expected = `[Service] Rename rk1 r1v Rename rk2 r2v Rename rk3 r3v +[Output] + Name es + Match * + Alias output_elasticsearch_alias + Host https://example2.com + Port 9200 + Index my_index + Type my_type + Write_Operation upsert [Output] Name http Match logs.foo.bar @@ -126,6 +135,13 @@ var expectedK8s = `[Service] Host foo.bar Port 9200 Index foo-index +[Output] + Name es + Match acbd18db4cc2f85cedef654fccc4a4d8.kube.* + Host foo.bar + Port 9200 + Index foo-index + Write_Operation update ` var labels = map[string]string{ @@ -323,6 +339,28 @@ func Test_FluentBitConfig_RenderMainConfig(t *testing.T) { }, } + elasticSearchOutput := ClusterOutput{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fluentbit.fluent.io/v1alpha2", + Kind: "ClusterOutput", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "elasticsearch_output_0", + Labels: labels, + }, + Spec: OutputSpec{ + Alias: "output_elasticsearch_alias", + Match: "*", + Elasticsearch: &output.Elasticsearch{ + Host: "https://example2.com", + Port: ptrInt32(int32(9200)), + Index: "my_index", + Type: "my_type", + WriteOperation: "upsert", + }, + }, + } + kafkaOutput := ClusterOutput{ TypeMeta: metav1.TypeMeta{ APIVersion: "fluentbit.fluent.io/v1alpha2", @@ -340,7 +378,7 @@ func Test_FluentBitConfig_RenderMainConfig(t *testing.T) { } outputs := ClusterOutputList{ - Items: []ClusterOutput{syslogOut, httpOutput, openSearchOutput, kafkaOutput}, + Items: []ClusterOutput{syslogOut, httpOutput, openSearchOutput, elasticSearchOutput, kafkaOutput}, } var nsFilterList []FilterList @@ -437,9 +475,31 @@ func TestRenderMainConfigK8s(t *testing.T) { }, }, } + + outputObjEs := &Output{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "fluentbit.fluent.io/v1alpha2", + Kind: "Output", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "output1", + Namespace: "foo", + Labels: labels, + }, + Spec: OutputSpec{ + Match: "kube.*", + Elasticsearch: &output.Elasticsearch{ + Host: "foo.bar", + Port: ptrInt32(9200), + Index: "foo-index", + WriteOperation: "update", + }, + }, + } + nsOutputList := []OutputList{ { - Items: []Output{*outputObj}, + Items: []Output{*outputObj, *outputObjEs}, }, } var rewriteTagCfg []string diff --git a/apis/fluentbit/v1alpha2/plugins/output/elasticsearch_types.go b/apis/fluentbit/v1alpha2/plugins/output/elasticsearch_types.go index 091976479..527e11a34 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/elasticsearch_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/elasticsearch_types.go @@ -85,6 +85,8 @@ type Elasticsearch struct { GenerateID *bool `json:"generateID,omitempty"` // If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. IdKey string `json:"idKey,omitempty"` + // Operation to use to write in bulk requests. + WriteOperation string `json:"writeOperation,omitempty"` // When enabled, replace field name dots with underscore, required by Elasticsearch 2.0-2.3. ReplaceDots *bool `json:"replaceDots,omitempty"` // When enabled print the elasticsearch API calls to stdout (for diag only) @@ -197,6 +199,9 @@ func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*params.KVs, error) { if es.IdKey != "" { kvs.Insert("ID_KEY", es.IdKey) } + if es.WriteOperation != "" { + kvs.Insert("Write_Operation", es.WriteOperation) + } if es.ReplaceDots != nil { kvs.Insert("Replace_Dots", fmt.Sprint(*es.ReplaceDots)) } diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml index 210383456..d63eae2ca 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -513,6 +513,9 @@ spec: description: If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. type: string + writeOperation: + description: Operation to use to write in bulk requests. + type: string includeTagKey: description: When enabled, it append the Tag name to the record. type: boolean diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml index c141d1d0b..2b4fc8dc3 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml @@ -513,6 +513,9 @@ spec: description: If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. type: string + writeOperation: + description: Operation to use to write in bulk requests. + type: string includeTagKey: description: When enabled, it append the Tag name to the record. type: boolean diff --git a/charts/fluent-operator/values.yaml b/charts/fluent-operator/values.yaml index 3119d5db8..a9e0c4042 100644 --- a/charts/fluent-operator/values.yaml +++ b/charts/fluent-operator/values.yaml @@ -228,6 +228,7 @@ fluentbit: # logstashFormat: true # replaceDots: false # enableTLS: false + # writeOperation: upsert # tls: # verify: On # debug: 1 @@ -256,7 +257,7 @@ fluentbit: # port: 2020 # addLabels: # app: "fluentbit" - + # Loki fluentbit ClusterOutput, to be encapsulated in fluentbit config # See https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentbit/output/loki.md # See https://docs.fluentbit.io/manual/pipeline/outputs/loki @@ -291,7 +292,7 @@ fluentbit: # #labels: [] # String list of = #labelKeys: [] # String list of - #removeKeys: [] # String list of + #removeKeys: [] # String list of #labelMapPath: '' # String, path to file, ex /here/it/is #dropSingleKey: off #lineFormat: '' # String diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index 210383456..d63eae2ca 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -513,6 +513,9 @@ spec: description: If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. type: string + writeOperation: + description: Operation to use to write in bulk requests. + type: string includeTagKey: description: When enabled, it append the Tag name to the record. type: boolean diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index c141d1d0b..2b4fc8dc3 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -513,6 +513,9 @@ spec: description: If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. type: string + writeOperation: + description: Operation to use to write in bulk requests. + type: string includeTagKey: description: When enabled, it append the Tag name to the record. type: boolean diff --git a/docs/plugins/fluentbit/output/elasticsearch.md b/docs/plugins/fluentbit/output/elasticsearch.md index c5d6e38a9..f607d93a9 100644 --- a/docs/plugins/fluentbit/output/elasticsearch.md +++ b/docs/plugins/fluentbit/output/elasticsearch.md @@ -32,6 +32,7 @@ Elasticsearch is the es output plugin, allows to ingest your records into an Ela | tagKey | When Include_Tag_Key is enabled, this property defines the key name for the tag. | string | | generateID | When enabled, generate _id for outgoing records. This prevents duplicate records when retrying ES. | *bool | | idKey | If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. | string | +| writeOperation | Operation to use to write in bulk requests. | string | | replaceDots | When enabled, replace field name dots with underscore, required by Elasticsearch 2.0-2.3. | *bool | | traceOutput | When enabled print the elasticsearch API calls to stdout (for diag only) | *bool | | traceError | When enabled print the elasticsearch API calls to stdout when elasticsearch returns an error | *bool |