diff --git a/CHANGELOG.md b/CHANGELOG.md index 854b0893c..53926fa5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Add `KafkaTopic` field `config.local_retention_bytes`, type `integer`: local.retention.bytes value +- Add `KafkaTopic` field `config.local_retention_ms`, type `integer`: local.retention.ms value +- Add `KafkaTopic` field `config.remote_storage_enable`, type `boolean`: remote_storage_enable + ## v0.17.0 - 2024-02-01 - Bump k8s deps to 1.26.13 diff --git a/api/v1alpha1/kafkatopic_types.go b/api/v1alpha1/kafkatopic_types.go index 5663ab32c..21c233dee 100644 --- a/api/v1alpha1/kafkatopic_types.go +++ b/api/v1alpha1/kafkatopic_types.go @@ -91,6 +91,12 @@ type KafkaTopicConfig struct { // index.interval.bytes value IndexIntervalBytes *int64 `json:"index_interval_bytes,omitempty"` + // local.retention.bytes value + LocalRetentionBytes *int64 `json:"local_retention_bytes,omitempty"` + + // local.retention.ms value + LocalRetentionMs *int64 `json:"local_retention_ms,omitempty"` + // max.compaction.lag.ms value MaxCompactionLagMs *int64 `json:"max_compaction_lag_ms,omitempty"` @@ -121,6 +127,9 @@ type KafkaTopicConfig struct { // preallocate value Preallocate *bool `json:"preallocate,omitempty"` + // remote_storage_enable + RemoteStorageEnable *bool `json:"remote_storage_enable,omitempty"` + // retention.bytes value RetentionBytes *int64 `json:"retention_bytes,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 45d0c8b99..094486607 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1246,6 +1246,16 @@ func (in *KafkaTopicConfig) DeepCopyInto(out *KafkaTopicConfig) { *out = new(int64) **out = **in } + if in.LocalRetentionBytes != nil { + in, out := &in.LocalRetentionBytes, &out.LocalRetentionBytes + *out = new(int64) + **out = **in + } + if in.LocalRetentionMs != nil { + in, out := &in.LocalRetentionMs, &out.LocalRetentionMs + *out = new(int64) + **out = **in + } if in.MaxCompactionLagMs != nil { in, out := &in.MaxCompactionLagMs, &out.MaxCompactionLagMs *out = new(int64) @@ -1286,6 +1296,11 @@ func (in *KafkaTopicConfig) DeepCopyInto(out *KafkaTopicConfig) { *out = new(bool) **out = **in } + if in.RemoteStorageEnable != nil { + in, out := &in.RemoteStorageEnable, &out.RemoteStorageEnable + *out = new(bool) + **out = **in + } if in.RetentionBytes != nil { in, out := &in.RetentionBytes, &out.RetentionBytes *out = new(int64) diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkatopics.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkatopics.yaml index dbddcef81..b20b8d667 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkatopics.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkatopics.yaml @@ -92,6 +92,14 @@ spec: description: index.interval.bytes value format: int64 type: integer + local_retention_bytes: + description: local.retention.bytes value + format: int64 + type: integer + local_retention_ms: + description: local.retention.ms value + format: int64 + type: integer max_compaction_lag_ms: description: max.compaction.lag.ms value format: int64 @@ -127,6 +135,9 @@ spec: preallocate: description: preallocate value type: boolean + remote_storage_enable: + description: remote_storage_enable + type: boolean retention_bytes: description: retention.bytes value format: int64 diff --git a/config/crd/bases/aiven.io_kafkatopics.yaml b/config/crd/bases/aiven.io_kafkatopics.yaml index dbddcef81..b20b8d667 100644 --- a/config/crd/bases/aiven.io_kafkatopics.yaml +++ b/config/crd/bases/aiven.io_kafkatopics.yaml @@ -92,6 +92,14 @@ spec: description: index.interval.bytes value format: int64 type: integer + local_retention_bytes: + description: local.retention.bytes value + format: int64 + type: integer + local_retention_ms: + description: local.retention.ms value + format: int64 + type: integer max_compaction_lag_ms: description: max.compaction.lag.ms value format: int64 @@ -127,6 +135,9 @@ spec: preallocate: description: preallocate value type: boolean + remote_storage_enable: + description: remote_storage_enable + type: boolean retention_bytes: description: retention.bytes value format: int64 diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 2ef6bd55f..614b18f28 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -205,6 +205,8 @@ func convertKafkaTopicConfig(topic *v1alpha1.KafkaTopic) aiven.KafkaTopicConfig FlushMessages: topic.Spec.Config.FlushMessages, FlushMs: topic.Spec.Config.FlushMs, IndexIntervalBytes: topic.Spec.Config.IndexIntervalBytes, + LocalRetentionBytes: topic.Spec.Config.LocalRetentionBytes, + LocalRetentionMs: topic.Spec.Config.LocalRetentionMs, MaxCompactionLagMs: topic.Spec.Config.MaxCompactionLagMs, MaxMessageBytes: topic.Spec.Config.MaxMessageBytes, MessageDownconversionEnable: topic.Spec.Config.MessageDownconversionEnable, @@ -215,6 +217,7 @@ func convertKafkaTopicConfig(topic *v1alpha1.KafkaTopic) aiven.KafkaTopicConfig MinCompactionLagMs: topic.Spec.Config.MinCompactionLagMs, MinInsyncReplicas: topic.Spec.Config.MinInsyncReplicas, Preallocate: topic.Spec.Config.Preallocate, + RemoteStorageEnable: topic.Spec.Config.RemoteStorageEnable, RetentionBytes: topic.Spec.Config.RetentionBytes, RetentionMs: topic.Spec.Config.RetentionMs, SegmentBytes: topic.Spec.Config.SegmentBytes, diff --git a/docs/docs/api-reference/kafkatopic.md b/docs/docs/api-reference/kafkatopic.md index 55a26e41a..5d69186cc 100644 --- a/docs/docs/api-reference/kafkatopic.md +++ b/docs/docs/api-reference/kafkatopic.md @@ -83,6 +83,8 @@ Kafka topic configuration. - [`flush_messages`](#spec.config.flush_messages-property){: name='spec.config.flush_messages-property'} (integer). flush.messages value. - [`flush_ms`](#spec.config.flush_ms-property){: name='spec.config.flush_ms-property'} (integer). flush.ms value. - [`index_interval_bytes`](#spec.config.index_interval_bytes-property){: name='spec.config.index_interval_bytes-property'} (integer). index.interval.bytes value. +- [`local_retention_bytes`](#spec.config.local_retention_bytes-property){: name='spec.config.local_retention_bytes-property'} (integer). local.retention.bytes value. +- [`local_retention_ms`](#spec.config.local_retention_ms-property){: name='spec.config.local_retention_ms-property'} (integer). local.retention.ms value. - [`max_compaction_lag_ms`](#spec.config.max_compaction_lag_ms-property){: name='spec.config.max_compaction_lag_ms-property'} (integer). max.compaction.lag.ms value. - [`max_message_bytes`](#spec.config.max_message_bytes-property){: name='spec.config.max_message_bytes-property'} (integer). max.message.bytes value. - [`message_downconversion_enable`](#spec.config.message_downconversion_enable-property){: name='spec.config.message_downconversion_enable-property'} (boolean). message.downconversion.enable value. @@ -93,6 +95,7 @@ Kafka topic configuration. - [`min_compaction_lag_ms`](#spec.config.min_compaction_lag_ms-property){: name='spec.config.min_compaction_lag_ms-property'} (integer). min.compaction.lag.ms value. - [`min_insync_replicas`](#spec.config.min_insync_replicas-property){: name='spec.config.min_insync_replicas-property'} (integer). min.insync.replicas value. - [`preallocate`](#spec.config.preallocate-property){: name='spec.config.preallocate-property'} (boolean). preallocate value. +- [`remote_storage_enable`](#spec.config.remote_storage_enable-property){: name='spec.config.remote_storage_enable-property'} (boolean). remote_storage_enable. - [`retention_bytes`](#spec.config.retention_bytes-property){: name='spec.config.retention_bytes-property'} (integer). retention.bytes value. - [`retention_ms`](#spec.config.retention_ms-property){: name='spec.config.retention_ms-property'} (integer). retention.ms value. - [`segment_bytes`](#spec.config.segment_bytes-property){: name='spec.config.segment_bytes-property'} (integer). segment.bytes value. diff --git a/tests/kafkatopic_test.go b/tests/kafkatopic_test.go index b2ff1d016..3975cfd1b 100644 --- a/tests/kafkatopic_test.go +++ b/tests/kafkatopic_test.go @@ -44,6 +44,9 @@ spec: partitions: 1 config: min_cleanable_dirty_ratio: 0.2 + local_retention_bytes: 1024 + local_retention_ms: 1000000 + remote_storage_enable: true --- @@ -120,6 +123,9 @@ func TestKafkaTopic(t *testing.T) { // Validates MinCleanableDirtyRatio require.Equal(t, anyPointer(0.2), fooTopic.Spec.Config.MinCleanableDirtyRatio) + require.Equal(t, anyPointer(int64(1024)), fooTopic.Spec.Config.LocalRetentionBytes) + require.Equal(t, anyPointer(int64(1000000)), fooTopic.Spec.Config.LocalRetentionMs) + require.Equal(t, anyPointer(true), fooTopic.Spec.Config.RemoteStorageEnable) // KafkaTopic with name `bar_topic_name_with_underscores` barAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, ksName, barTopic.GetTopicName()) @@ -134,6 +140,9 @@ func TestKafkaTopic(t *testing.T) { // Validates MinCleanableDirtyRatio (not set) assert.Nil(t, barTopic.Spec.Config.MinCleanableDirtyRatio) + assert.Nil(t, barTopic.Spec.Config.LocalRetentionBytes) + assert.Nil(t, barTopic.Spec.Config.LocalRetentionMs) + assert.Nil(t, barTopic.Spec.Config.RemoteStorageEnable) // We need to validate deletion, // because we can get false positive here: