Skip to content

Commit

Permalink
fluentbit/output/elasticsearch: Add writeOperation option (#1080)
Browse files Browse the repository at this point in the history
* fluentbit/output/elasticsearch: Add writeOperation option

Signed-off-by: Ky-Anh Huynh <[email protected]>

* fluentbit/output/elasticsearch: Add writeOperation option (CRDs update)

Signed-off-by: Ky-Anh Huynh <[email protected]>

* fluentbit/output/elasticsearch: Add writeOperation option (CRDs update in config/)

Signed-off-by: Ky-Anh Huynh <[email protected]>

---------

Signed-off-by: Ky-Anh Huynh <[email protected]>
  • Loading branch information
icy authored Mar 6, 2024
1 parent 979f25a commit e141288
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 4 deletions.
64 changes: 62 additions & 2 deletions apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ var expected = `[Service]
Rename rk1 r1v
Rename rk2 r2v
Rename rk3 r3v
[Output]
Name es
Match *
Alias output_elasticsearch_alias
Host https://example2.com
Port 9200
Index my_index
Type my_type
Write_Operation upsert
[Output]
Name http
Match logs.foo.bar
Expand Down Expand Up @@ -126,6 +135,13 @@ var expectedK8s = `[Service]
Host foo.bar
Port 9200
Index foo-index
[Output]
Name es
Match acbd18db4cc2f85cedef654fccc4a4d8.kube.*
Host foo.bar
Port 9200
Index foo-index
Write_Operation update
`

var labels = map[string]string{
Expand Down Expand Up @@ -323,6 +339,28 @@ func Test_FluentBitConfig_RenderMainConfig(t *testing.T) {
},
}

elasticSearchOutput := ClusterOutput{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "ClusterOutput",
},
ObjectMeta: metav1.ObjectMeta{
Name: "elasticsearch_output_0",
Labels: labels,
},
Spec: OutputSpec{
Alias: "output_elasticsearch_alias",
Match: "*",
Elasticsearch: &output.Elasticsearch{
Host: "https://example2.com",
Port: ptrInt32(int32(9200)),
Index: "my_index",
Type: "my_type",
WriteOperation: "upsert",
},
},
}

kafkaOutput := ClusterOutput{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Expand All @@ -340,7 +378,7 @@ func Test_FluentBitConfig_RenderMainConfig(t *testing.T) {
}

outputs := ClusterOutputList{
Items: []ClusterOutput{syslogOut, httpOutput, openSearchOutput, kafkaOutput},
Items: []ClusterOutput{syslogOut, httpOutput, openSearchOutput, elasticSearchOutput, kafkaOutput},
}

var nsFilterList []FilterList
Expand Down Expand Up @@ -437,9 +475,31 @@ func TestRenderMainConfigK8s(t *testing.T) {
},
},
}

outputObjEs := &Output{
TypeMeta: metav1.TypeMeta{
APIVersion: "fluentbit.fluent.io/v1alpha2",
Kind: "Output",
},
ObjectMeta: metav1.ObjectMeta{
Name: "output1",
Namespace: "foo",
Labels: labels,
},
Spec: OutputSpec{
Match: "kube.*",
Elasticsearch: &output.Elasticsearch{
Host: "foo.bar",
Port: ptrInt32(9200),
Index: "foo-index",
WriteOperation: "update",
},
},
}

nsOutputList := []OutputList{
{
Items: []Output{*outputObj},
Items: []Output{*outputObj, *outputObjEs},
},
}
var rewriteTagCfg []string
Expand Down
5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/elasticsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type Elasticsearch struct {
GenerateID *bool `json:"generateID,omitempty"`
// If set, _id will be the value of the key from incoming record and Generate_ID option is ignored.
IdKey string `json:"idKey,omitempty"`
// Operation to use to write in bulk requests.
WriteOperation string `json:"writeOperation,omitempty"`
// When enabled, replace field name dots with underscore, required by Elasticsearch 2.0-2.3.
ReplaceDots *bool `json:"replaceDots,omitempty"`
// When enabled print the elasticsearch API calls to stdout (for diag only)
Expand Down Expand Up @@ -197,6 +199,9 @@ func (es *Elasticsearch) Params(sl plugins.SecretLoader) (*params.KVs, error) {
if es.IdKey != "" {
kvs.Insert("ID_KEY", es.IdKey)
}
if es.WriteOperation != "" {
kvs.Insert("Write_Operation", es.WriteOperation)
}
if es.ReplaceDots != nil {
kvs.Insert("Replace_Dots", fmt.Sprint(*es.ReplaceDots))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ spec:
description: If set, _id will be the value of the key from incoming
record and Generate_ID option is ignored.
type: string
writeOperation:
description: Operation to use to write in bulk requests.
type: string
includeTagKey:
description: When enabled, it append the Tag name to the record.
type: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ spec:
description: If set, _id will be the value of the key from incoming
record and Generate_ID option is ignored.
type: string
writeOperation:
description: Operation to use to write in bulk requests.
type: string
includeTagKey:
description: When enabled, it append the Tag name to the record.
type: boolean
Expand Down
5 changes: 3 additions & 2 deletions charts/fluent-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ fluentbit:
# logstashFormat: true
# replaceDots: false
# enableTLS: false
# writeOperation: upsert
# tls:
# verify: On
# debug: 1
Expand Down Expand Up @@ -256,7 +257,7 @@ fluentbit:
# port: 2020
# addLabels:
# app: "fluentbit"

# Loki fluentbit ClusterOutput, to be encapsulated in fluentbit config
# See https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentbit/output/loki.md
# See https://docs.fluentbit.io/manual/pipeline/outputs/loki
Expand Down Expand Up @@ -291,7 +292,7 @@ fluentbit:
#
#labels: [] # String list of <name>=<value>
#labelKeys: [] # String list of <key>
#removeKeys: [] # String list of <key>
#removeKeys: [] # String list of <key>
#labelMapPath: '' # String, path to file, ex /here/it/is
#dropSingleKey: off
#lineFormat: '' # String
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ spec:
description: If set, _id will be the value of the key from incoming
record and Generate_ID option is ignored.
type: string
writeOperation:
description: Operation to use to write in bulk requests.
type: string
includeTagKey:
description: When enabled, it append the Tag name to the record.
type: boolean
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ spec:
description: If set, _id will be the value of the key from incoming
record and Generate_ID option is ignored.
type: string
writeOperation:
description: Operation to use to write in bulk requests.
type: string
includeTagKey:
description: When enabled, it append the Tag name to the record.
type: boolean
Expand Down
1 change: 1 addition & 0 deletions docs/plugins/fluentbit/output/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Elasticsearch is the es output plugin, allows to ingest your records into an Ela
| tagKey | When Include_Tag_Key is enabled, this property defines the key name for the tag. | string |
| generateID | When enabled, generate _id for outgoing records. This prevents duplicate records when retrying ES. | *bool |
| idKey | If set, _id will be the value of the key from incoming record and Generate_ID option is ignored. | string |
| writeOperation | Operation to use to write in bulk requests. | string |
| replaceDots | When enabled, replace field name dots with underscore, required by Elasticsearch 2.0-2.3. | *bool |
| traceOutput | When enabled print the elasticsearch API calls to stdout (for diag only) | *bool |
| traceError | When enabled print the elasticsearch API calls to stdout when elasticsearch returns an error | *bool |
Expand Down

0 comments on commit e141288

Please sign in to comment.