This is a set of Kafka Connect transformations.
See the Kafka documentation for more details about configuring transformations.
This transformation replaces the original record's timestamp with a value taken from the the record.
The transformation:
- expects the record value to be either a
STRUCT
or aMAP
; - expects it to have a specified field;
- expects the value of the field to be either
INT64
ororg.apache.kafka.connect.data.Timestamp
and not benull
.
Exists in two variants:
io.aiven.kafka.connect.transforms.ExtractTimestamp$Value
- works on values.io.aiven.kafka.connect.transforms.ExtractTimestamp$Key
- works on keys.
The transformation defines the following configurations:
field.name
- The name of the field which should be used as the new timestamp. Cannot benull
or empty.
Here's an example of this transformation configuration:
transforms=ExtractTimestampFromValueField
transforms.ExtractTimestampFromValueField.type=io.aiven.kafka.connect.transforms.ExtractTimestamp$Value
transforms.ExtractTimestampFromValueField.field.name=inner_field_name
This transformation extracts a string value from the record and use it as the topic name.
The transformation can use either the whole key or value (in this case, it must have INT8
, INT16
, INT32
, INT64
, FLOAT32
, FLOAT32
, BOOLEAN
, or STRING
type) or a field in them (in this case, it must have STRUCT
type and the field's value must be INT8
, INT16
, INT32
, INT64
, FLOAT32
, FLOAT32
, BOOLEAN
, or STRING
).
Exists in two variants:
io.aiven.kafka.connect.transforms.ExtractTopic$Key
- works on keys;io.aiven.kafka.connect.transforms.ExtractTopic$Value
- works on values.
The transformation defines the following configurations:
field.name
- The name of the field which should be used as the topic name. Ifnull
or empty, the entire key or value is used (and assumed to be a string). By default isnull
.skip.missing.or.null
- In case the source of the new topic name isnull
or missing, should a record be silently passed without transformation. By default, isfalse
.
Here is an example of this transformation configuration:
transforms=ExtractTopicFromValueField
transforms.ExtractTopicFromValueField.type=io.aiven.kafka.connect.transforms.ExtractTopic$Value
transforms.ExtractTopicFromValueField.field.name=inner_field_name
This transformation replaces a string value with its hash.
The transformation can hash either the whole key or value (in this case, it must have STRING
type) or a field in them (in this case, it must have STRUCT
type and the field's value must be STRING
).
Exists in two variants:
io.aiven.kafka.connect.transforms.Hash$Key
- works on keys;io.aiven.kafka.connect.transforms.Hash$Value
- works on values.
The transformation defines the following configurations:
field.name
- The name of the field which value should be hashed. Ifnull
or empty, the entire key or value is used (and assumed to be a string). By default, isnull
.function
- The name of the hash function to use. The supported values are:md5
,sha1
, andsha256
.skip.missing.or.null
- In case the value to be hashed isnull
or missing, should a record be silently passed without transformation. By default, isfalse
.
Here is an example of this transformation configuration:
transforms=HashEmail
transforms.HashEmail.type=io.aiven.kafka.connect.transforms.Hash$Value
transforms.HashEmail.field.name=email
transforms.HashEmail.function=sha1
This transformation manages tombstone records,
i.e. records with the entire value field being null
.
The transformation defines the following configurations:
behavior
- The action the transformation must perform when encounter a tombstone record. The supported values are:drop_silent
- silently drop tombstone records.drop_warn
- drop tombstone records and log atWARN
level.fail
- fail withDataException
.
Here is an example of this transformation configuration:
transforms=TombstoneHandler
transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHandler
transforms.TombstoneHandler.behavior=drop_silent
This project is licensed under the Apache License, Version 2.0.