Skip to content

Commit

Permalink
feat: update KafkaTopicConfig
Browse files Browse the repository at this point in the history
Allow for specifying the following fields:

- local_retention_bytes
- local_retention_ms
- remote_storage_enable
  • Loading branch information
jeff-held-aiven committed Feb 12, 2024
1 parent 017d5c5 commit 2587131
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Add `KafkaTopic` field `config.local_retention_bytes`, type `integer`: local.retention.bytes value
- Add `KafkaTopic` field `config.local_retention_ms`, type `integer`: local.retention.ms value
- Add `KafkaTopic` field `config.remote_storage_enable`, type `boolean`: remote_storage_enable

## v0.17.0 - 2024-02-01

- Bump k8s deps to 1.26.13
Expand Down
9 changes: 9 additions & 0 deletions api/v1alpha1/kafkatopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ type KafkaTopicConfig struct {
// index.interval.bytes value
IndexIntervalBytes *int64 `json:"index_interval_bytes,omitempty"`

// local.retention.bytes value
LocalRetentionBytes *int64 `json:"local_retention_bytes,omitempty"`

// local.retention.ms value
LocalRetentionMs *int64 `json:"local_retention_ms,omitempty"`

// max.compaction.lag.ms value
MaxCompactionLagMs *int64 `json:"max_compaction_lag_ms,omitempty"`

Expand Down Expand Up @@ -121,6 +127,9 @@ type KafkaTopicConfig struct {
// preallocate value
Preallocate *bool `json:"preallocate,omitempty"`

// remote_storage_enable
RemoteStorageEnable *bool `json:"remote_storage_enable,omitempty"`

// retention.bytes value
RetentionBytes *int64 `json:"retention_bytes,omitempty"`

Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions charts/aiven-operator-crds/templates/aiven.io_kafkatopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ spec:
description: index.interval.bytes value
format: int64
type: integer
local_retention_bytes:
description: local.retention.bytes value
format: int64
type: integer
local_retention_ms:
description: local.retention.ms value
format: int64
type: integer
max_compaction_lag_ms:
description: max.compaction.lag.ms value
format: int64
Expand Down Expand Up @@ -127,6 +135,9 @@ spec:
preallocate:
description: preallocate value
type: boolean
remote_storage_enable:
description: remote_storage_enable
type: boolean
retention_bytes:
description: retention.bytes value
format: int64
Expand Down
11 changes: 11 additions & 0 deletions config/crd/bases/aiven.io_kafkatopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ spec:
description: index.interval.bytes value
format: int64
type: integer
local_retention_bytes:
description: local.retention.bytes value
format: int64
type: integer
local_retention_ms:
description: local.retention.ms value
format: int64
type: integer
max_compaction_lag_ms:
description: max.compaction.lag.ms value
format: int64
Expand Down Expand Up @@ -127,6 +135,9 @@ spec:
preallocate:
description: preallocate value
type: boolean
remote_storage_enable:
description: remote_storage_enable
type: boolean
retention_bytes:
description: retention.bytes value
format: int64
Expand Down
3 changes: 3 additions & 0 deletions controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ func convertKafkaTopicConfig(topic *v1alpha1.KafkaTopic) aiven.KafkaTopicConfig
FlushMessages: topic.Spec.Config.FlushMessages,
FlushMs: topic.Spec.Config.FlushMs,
IndexIntervalBytes: topic.Spec.Config.IndexIntervalBytes,
LocalRetentionBytes: topic.Spec.Config.LocalRetentionBytes,
LocalRetentionMs: topic.Spec.Config.LocalRetentionMs,
MaxCompactionLagMs: topic.Spec.Config.MaxCompactionLagMs,
MaxMessageBytes: topic.Spec.Config.MaxMessageBytes,
MessageDownconversionEnable: topic.Spec.Config.MessageDownconversionEnable,
Expand All @@ -215,6 +217,7 @@ func convertKafkaTopicConfig(topic *v1alpha1.KafkaTopic) aiven.KafkaTopicConfig
MinCompactionLagMs: topic.Spec.Config.MinCompactionLagMs,
MinInsyncReplicas: topic.Spec.Config.MinInsyncReplicas,
Preallocate: topic.Spec.Config.Preallocate,
RemoteStorageEnable: topic.Spec.Config.RemoteStorageEnable,
RetentionBytes: topic.Spec.Config.RetentionBytes,
RetentionMs: topic.Spec.Config.RetentionMs,
SegmentBytes: topic.Spec.Config.SegmentBytes,
Expand Down
3 changes: 3 additions & 0 deletions docs/docs/api-reference/kafkatopic.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Kafka topic configuration.
- [`flush_messages`](#spec.config.flush_messages-property){: name='spec.config.flush_messages-property'} (integer). flush.messages value.
- [`flush_ms`](#spec.config.flush_ms-property){: name='spec.config.flush_ms-property'} (integer). flush.ms value.
- [`index_interval_bytes`](#spec.config.index_interval_bytes-property){: name='spec.config.index_interval_bytes-property'} (integer). index.interval.bytes value.
- [`local_retention_bytes`](#spec.config.local_retention_bytes-property){: name='spec.config.local_retention_bytes-property'} (integer). local.retention.bytes value.
- [`local_retention_ms`](#spec.config.local_retention_ms-property){: name='spec.config.local_retention_ms-property'} (integer). local.retention.ms value.
- [`max_compaction_lag_ms`](#spec.config.max_compaction_lag_ms-property){: name='spec.config.max_compaction_lag_ms-property'} (integer). max.compaction.lag.ms value.
- [`max_message_bytes`](#spec.config.max_message_bytes-property){: name='spec.config.max_message_bytes-property'} (integer). max.message.bytes value.
- [`message_downconversion_enable`](#spec.config.message_downconversion_enable-property){: name='spec.config.message_downconversion_enable-property'} (boolean). message.downconversion.enable value.
Expand All @@ -93,6 +95,7 @@ Kafka topic configuration.
- [`min_compaction_lag_ms`](#spec.config.min_compaction_lag_ms-property){: name='spec.config.min_compaction_lag_ms-property'} (integer). min.compaction.lag.ms value.
- [`min_insync_replicas`](#spec.config.min_insync_replicas-property){: name='spec.config.min_insync_replicas-property'} (integer). min.insync.replicas value.
- [`preallocate`](#spec.config.preallocate-property){: name='spec.config.preallocate-property'} (boolean). preallocate value.
- [`remote_storage_enable`](#spec.config.remote_storage_enable-property){: name='spec.config.remote_storage_enable-property'} (boolean). remote_storage_enable.
- [`retention_bytes`](#spec.config.retention_bytes-property){: name='spec.config.retention_bytes-property'} (integer). retention.bytes value.
- [`retention_ms`](#spec.config.retention_ms-property){: name='spec.config.retention_ms-property'} (integer). retention.ms value.
- [`segment_bytes`](#spec.config.segment_bytes-property){: name='spec.config.segment_bytes-property'} (integer). segment.bytes value.
Expand Down
9 changes: 9 additions & 0 deletions tests/kafkatopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ spec:
partitions: 1
config:
min_cleanable_dirty_ratio: 0.2
local_retention_bytes: 1024
local_retention_ms: 1000000
remote_storage_enable: true
---
Expand Down Expand Up @@ -120,6 +123,9 @@ func TestKafkaTopic(t *testing.T) {

// Validates MinCleanableDirtyRatio
require.Equal(t, anyPointer(0.2), fooTopic.Spec.Config.MinCleanableDirtyRatio)
require.Equal(t, anyPointer(int64(1024)), fooTopic.Spec.Config.LocalRetentionBytes)
require.Equal(t, anyPointer(int64(1000000)), fooTopic.Spec.Config.LocalRetentionMs)
require.Equal(t, anyPointer(true), fooTopic.Spec.Config.RemoteStorageEnable)

// KafkaTopic with name `bar_topic_name_with_underscores`
barAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, ksName, barTopic.GetTopicName())
Expand All @@ -134,6 +140,9 @@ func TestKafkaTopic(t *testing.T) {

// Validates MinCleanableDirtyRatio (not set)
assert.Nil(t, barTopic.Spec.Config.MinCleanableDirtyRatio)
assert.Nil(t, barTopic.Spec.Config.LocalRetentionBytes)
assert.Nil(t, barTopic.Spec.Config.LocalRetentionMs)
assert.Nil(t, barTopic.Spec.Config.RemoteStorageEnable)

// We need to validate deletion,
// because we can get false positive here:
Expand Down

0 comments on commit 2587131

Please sign in to comment.