diff --git a/commands/databases.go b/commands/databases.go index 5ebb8529e..d9e31b8dd 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -2443,6 +2443,7 @@ This command functions as a PATCH request, meaning that only the specified field displayerType(&displayers.PostgreSQLConfiguration{}), displayerType(&displayers.RedisConfiguration{}), displayerType(&displayers.MongoDBConfiguration{}), + displayerType(&displayers.KafkaConfiguration{}), ) AddStringFlag( getDatabaseCfgCommand, @@ -2501,9 +2502,10 @@ func RunDatabaseConfigurationGet(c *CmdConfig) error { "pg": nil, "redis": nil, "mongodb": nil, + "kafka": nil, } if _, ok := allowedEngines[engine]; !ok { - return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb'", c.NS) + return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb', 'kafka'", c.NS) } dbId := args[0] @@ -2547,6 +2549,16 @@ func RunDatabaseConfigurationGet(c *CmdConfig) error { MongoDBConfig: *config, } return c.Display(&displayer) + } else if engine == "kafka" { + config, err := c.Databases().GetKafkaConfiguration(dbId) + if err != nil { + return err + } + + displayer := displayers.KafkaConfiguration{ + KafkaConfig: *config, + } + return c.Display(&displayer) } return nil @@ -2571,9 +2583,10 @@ func RunDatabaseConfigurationUpdate(c *CmdConfig) error { "pg": nil, "redis": nil, "mongodb": nil, + "kafka": nil, } if _, ok := allowedEngines[engine]; !ok { - return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb'", c.NS) + return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb', 'kafka'", c.NS) } configJson, err := c.Doit.GetString(c.NS, doctl.ArgDatabaseConfigJson) @@ -2602,6 +2615,11 @@ func RunDatabaseConfigurationUpdate(c *CmdConfig) error { if err != nil { return err } + } else if engine == "kafka" { + err := c.Databases().UpdateKafkaConfiguration(dbId, configJson) + if err != nil { + return err + } } return nil diff --git a/commands/databases_test.go b/commands/databases_test.go index 410fc08a7..05594019e 100644 --- a/commands/databases_test.go +++ b/commands/databases_test.go @@ -215,6 +215,10 @@ var ( MongoDBConfig: &godo.MongoDBConfig{}, } + testKafkaConfiguration = do.KafkaConfig{ + KafkaConfig: &godo.KafkaConfig{}, + } + topicReplicationFactor = uint32(3) testKafkaTopic = do.DatabaseTopic{ DatabaseTopic: &godo.DatabaseTopic{ @@ -1666,6 +1670,16 @@ func TestDatabaseConfigurationGet(t *testing.T) { assert.NoError(t, err) }) + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().GetKafkaConfiguration(testDBCluster.ID).Return(&testKafkaConfiguration, nil) + config.Args = append(config.Args, testDBCluster.ID) + config.Doit.Set(config.NS, doctl.ArgDatabaseEngine, "kafka") + + err := RunDatabaseConfigurationGet(config) + + assert.NoError(t, err) + }) + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { err := RunDatabaseConfigurationGet(config) @@ -1730,6 +1744,16 @@ func TestDatabaseConfigurationUpdate(t *testing.T) { assert.NoError(t, err) }) + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().UpdateKafkaConfiguration(testDBCluster.ID, "").Return(nil) + config.Args = append(config.Args, testDBCluster.ID) + config.Doit.Set(config.NS, doctl.ArgDatabaseEngine, "kafka") + + err := RunDatabaseConfigurationUpdate(config) + + assert.NoError(t, err) + }) + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { err := RunDatabaseConfigurationUpdate(config) diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 639fe3959..033c8cb69 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -1739,6 +1739,139 @@ func (dc *MongoDBConfiguration) KV() []map[string]any { return o } +type KafkaConfiguration struct { + KafkaConfig do.KafkaConfig +} + +var _ Displayable = &KafkaConfiguration{} + +func (dc *KafkaConfiguration) JSON(out io.Writer) error { + return writeJSON(dc.KafkaConfig, out) +} + +func (dc *KafkaConfiguration) Cols() []string { + return []string{ + "key", + "value", + } +} + +func (dc *KafkaConfiguration) ColMap() map[string]string { + return map[string]string{ + "key": "key", + "value": "value", + } +} + +func (dc *KafkaConfiguration) KV() []map[string]any { + c := dc.KafkaConfig + o := []map[string]any{} + if c.GroupInitialRebalanceDelayMs != nil { + o = append(o, map[string]any{ + "key": "GroupInitialRebalanceDelayMs", + "value": *c.GroupInitialRebalanceDelayMs, + }) + } + if c.GroupMinSessionTimeoutMs != nil { + o = append(o, map[string]any{ + "key": "GroupMinSessionTimeoutMs", + "value": *c.GroupMinSessionTimeoutMs, + }) + } + if c.GroupMaxSessionTimeoutMs != nil { + o = append(o, map[string]any{ + "key": "GroupMaxSessionTimeoutMs", + "value": *c.GroupMaxSessionTimeoutMs, + }) + } + if c.MessageMaxBytes != nil { + o = append(o, map[string]any{ + "key": "MessageMaxBytes", + "value": *c.MessageMaxBytes, + }) + } + if c.LogCleanerDeleteRetentionMs != nil { + o = append(o, map[string]any{ + "key": "LogCleanerDeleteRetentionMs", + "value": *c.LogCleanerDeleteRetentionMs, + }) + } + if c.LogCleanerMinCompactionLagMs != nil { + o = append(o, map[string]any{ + "key": "LogCleanerMinCompactionLagMs", + "value": *c.LogCleanerMinCompactionLagMs, + }) + } + if c.LogFlushIntervalMs != nil { + o = append(o, map[string]any{ + "key": "LogFlushIntervalMs", + "value": *c.LogFlushIntervalMs, + }) + } + if c.LogIndexIntervalBytes != nil { + o = append(o, map[string]any{ + "key": "LogIndexIntervalBytes", + "value": *c.LogIndexIntervalBytes, + }) + } + if c.LogMessageDownconversionEnable != nil { + o = append(o, map[string]any{ + "key": "LogMessageDownconversionEnable", + "value": *c.LogMessageDownconversionEnable, + }) + } + if c.LogMessageTimestampDifferenceMaxMs != nil { + o = append(o, map[string]any{ + "key": "LogMessageTimestampDifferenceMaxMs", + "value": *c.LogMessageTimestampDifferenceMaxMs, + }) + } + if c.LogPreallocate != nil { + o = append(o, map[string]any{ + "key": "LogPreallocate", + "value": *c.LogPreallocate, + }) + } + if c.LogRetentionBytes != nil { + o = append(o, map[string]any{ + "key": "LogRetentionBytes", + "value": c.LogRetentionBytes.String(), + }) + } + if c.LogRetentionHours != nil { + o = append(o, map[string]any{ + "key": "LogRetentionHours", + "value": *c.LogRetentionHours, + }) + } + if c.LogRetentionMs != nil { + o = append(o, map[string]any{ + "key": "LogRetentionMs", + "value": c.LogRetentionMs.String(), + }) + } + if c.LogRollJitterMs != nil { + o = append(o, map[string]any{ + "key": "LogRollJitterMs", + "value": *c.LogRollJitterMs, + }) + } + if c.LogSegmentDeleteDelayMs != nil { + o = append(o, map[string]any{ + "key": "LogSegmentDeleteDelayMs", + "value": *c.LogSegmentDeleteDelayMs, + }) + } + if c.AutoCreateTopicsEnable != nil { + o = append(o, map[string]any{ + "key": "AutoCreateTopicsEnable", + "value": *c.AutoCreateTopicsEnable, + }) + } + + return o +} + type DatabaseEvents struct { DatabaseEvents do.DatabaseEvents } diff --git a/do/databases.go b/do/databases.go index db6892d54..8dc385031 100644 --- a/do/databases.go +++ b/do/databases.go @@ -125,6 +125,11 @@ type MongoDBConfig struct { *godo.MongoDBConfig } +// KafkaConfig is a wrapper for godo.KafkaConfig +type KafkaConfig struct { + *godo.KafkaConfig +} + // DatabaseTopics is a slice of DatabaseTopic type DatabaseTopics []DatabaseTopic @@ -206,11 +211,13 @@ type DatabasesService interface { GetPostgreSQLConfiguration(databaseID string) (*PostgreSQLConfig, error) GetRedisConfiguration(databaseID string) (*RedisConfig, error) GetMongoDBConfiguration(databaseID string) (*MongoDBConfig, error) + GetKafkaConfiguration(databaseID string) (*KafkaConfig, error) UpdateMySQLConfiguration(databaseID string, confString string) error UpdatePostgreSQLConfiguration(databaseID string, confString string) error UpdateRedisConfiguration(databaseID string, confString string) error UpdateMongoDBConfiguration(databaseID string, confString string) error + UpdateKafkaConfiguration(databaseID string, confString string) error ListTopics(string) (DatabaseTopics, error) GetTopic(string, string) (*DatabaseTopic, error) @@ -713,6 +720,17 @@ func (ds *databasesService) GetMongoDBConfiguration(databaseID string) (*MongoDB }, nil } +func (ds *databasesService) GetKafkaConfiguration(databaseID string) (*KafkaConfig, error) { + cfg, _, err := ds.client.Databases.GetKafkaConfig(context.TODO(), databaseID) + if err != nil { + return nil, err + } + + return &KafkaConfig{ + KafkaConfig: cfg, + }, nil +} + func (ds *databasesService) UpdateMySQLConfiguration(databaseID string, confString string) error { var conf godo.MySQLConfig err := json.Unmarshal([]byte(confString), &conf) @@ -773,6 +791,21 @@ func (ds *databasesService) UpdateMongoDBConfiguration(databaseID string, confSt return nil } +func (ds *databasesService) UpdateKafkaConfiguration(databaseID string, confString string) error { + var conf godo.KafkaConfig + err := json.Unmarshal([]byte(confString), &conf) + if err != nil { + return err + } + + _, err = ds.client.Databases.UpdateKafkaConfig(context.TODO(), databaseID, &conf) + if err != nil { + return err + } + + return nil +} + func (ds *databasesService) ListTopics(databaseID string) (DatabaseTopics, error) { f := func(opt *godo.ListOptions) ([]any, *godo.Response, error) { list, resp, err := ds.client.Databases.ListTopics(context.TODO(), databaseID, opt) diff --git a/do/mocks/DatabasesService.go b/do/mocks/DatabasesService.go index 8de2fcff4..3b6bc1d11 100644 --- a/do/mocks/DatabasesService.go +++ b/do/mocks/DatabasesService.go @@ -303,6 +303,21 @@ func (mr *MockDatabasesServiceMockRecorder) GetFirewallRules(arg0 any) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirewallRules", reflect.TypeOf((*MockDatabasesService)(nil).GetFirewallRules), arg0) } +// GetKafkaConfiguration mocks base method. +func (m *MockDatabasesService) GetKafkaConfiguration(databaseID string) (*do.KafkaConfig, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetKafkaConfiguration", databaseID) + ret0, _ := ret[0].(*do.KafkaConfig) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetKafkaConfiguration indicates an expected call of GetKafkaConfiguration. +func (mr *MockDatabasesServiceMockRecorder) GetKafkaConfiguration(databaseID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKafkaConfiguration", reflect.TypeOf((*MockDatabasesService)(nil).GetKafkaConfiguration), databaseID) +} + // GetMaintenance mocks base method. func (m *MockDatabasesService) GetMaintenance(arg0 string) (*do.DatabaseMaintenanceWindow, error) { m.ctrl.T.Helper() @@ -722,6 +737,20 @@ func (mr *MockDatabasesServiceMockRecorder) UpdateFirewallRules(databaseID, req return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateFirewallRules", reflect.TypeOf((*MockDatabasesService)(nil).UpdateFirewallRules), databaseID, req) } +// UpdateKafkaConfiguration mocks base method. +func (m *MockDatabasesService) UpdateKafkaConfiguration(databaseID, confString string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateKafkaConfiguration", databaseID, confString) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateKafkaConfiguration indicates an expected call of UpdateKafkaConfiguration. +func (mr *MockDatabasesServiceMockRecorder) UpdateKafkaConfiguration(databaseID, confString any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateKafkaConfiguration", reflect.TypeOf((*MockDatabasesService)(nil).UpdateKafkaConfiguration), databaseID, confString) +} + // UpdateMaintenance mocks base method. func (m *MockDatabasesService) UpdateMaintenance(arg0 string, arg1 *godo.DatabaseUpdateMaintenanceRequest) error { m.ctrl.T.Helper() diff --git a/go.sum b/go.sum index 1b4b48721..c33695285 100644 --- a/go.sum +++ b/go.sum @@ -91,10 +91,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/digitalocean/godo v1.125.0 h1:wGPBQRX9Wjo0qCF0o8d25mT3A84Iw8rfHnZOPyvHcMQ= -github.com/digitalocean/godo v1.125.0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc= -github.com/digitalocean/godo v1.125.1-0.20240925184037-40ea734536f0 h1:hEi5W+TPrYUjq1PLt1lJmhrt+ezpzUrAvwYr9f1Xo4U= -github.com/digitalocean/godo v1.125.1-0.20240925184037-40ea734536f0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc= github.com/digitalocean/godo v1.126.0 h1:+Znh7VMQj/E8ArbjWnc7OKGjWfzC+I8OCSRp7r1MdD8= github.com/digitalocean/godo v1.126.0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= diff --git a/integration/database_config_get_test.go b/integration/database_config_get_test.go index a4f1f88ea..033e12fd2 100644 --- a/integration/database_config_get_test.go +++ b/integration/database_config_get_test.go @@ -72,6 +72,18 @@ var _ = suite("database/config/get", func(t *testing.T, when spec.G, it spec.S) } w.Write([]byte(databaseConfigMongoDBGetResponse)) + case "/v2/databases/kafka-database-id/config": + auth := req.Header.Get("Authorization") + if auth != "Bearer some-magic-token" { + w.WriteHeader(http.StatusTeapot) + } + + if req.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + w.Write([]byte(databaseConfigKafkaGetResponse)) default: dump, err := httputil.DumpRequest(req, true) if err != nil { @@ -278,5 +290,27 @@ RedisACLChannelsDefault allchannels "transaction_lifetime_limit_seconds": 60, "verbosity": 1 } + }` + + databaseConfigKafkaGetResponse = `{ + "config": { + "group_initial_rebalance_delay_ms": 3000, + "group_min_session_timeout_ms": 6000, + "group_max_session_timeout_ms": 1800000, + "message_max_bytes": 1048588, + "log_cleaner_delete_retention_ms": 86400000, + "log_cleaner_min_compaction_lag_ms": 0, + "log_flush_interval_ms": 9223372036854776000, + "log_index_interval_bytes": 4096, + "log_message_downconversion_enable": true, + "log_message_timestamp_difference_max_ms": 9223372036854776000, + "log_preallocate": false, + "log_retention_bytes": -1, + "log_retention_hours": 168, + "log_retention_ms": 604800000, + "log_roll_jitter_ms": 0, + "log_segment_delete_delay_ms": 60000, + "auto_create_topics_enable": true + } }` ) diff --git a/integration/database_config_update_test.go b/integration/database_config_update_test.go index ed5c660c6..1ffd608ea 100644 --- a/integration/database_config_update_test.go +++ b/integration/database_config_update_test.go @@ -104,6 +104,26 @@ var _ = suite("database/config/get", func(t *testing.T, when spec.G, it spec.S) } expect.Equal(expected, strings.TrimSpace(string(b))) + w.WriteHeader(http.StatusOK) + case "/v2/databases/kafka-database-id/config": + auth := req.Header.Get("Authorization") + if auth != "Bearer some-magic-token" { + w.WriteHeader(http.StatusTeapot) + } + + if req.Method != http.MethodPatch { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + expected := `{"config":{"group_initial_rebalance_delay_ms":3000}}` + b, err := io.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + expect.Equal(expected, strings.TrimSpace(string(b))) + w.WriteHeader(http.StatusOK) default: dump, err := httputil.DumpRequest(req, true) @@ -191,4 +211,23 @@ var _ = suite("database/config/get", func(t *testing.T, when spec.G, it spec.S) expect.Empty(strings.TrimSpace(string(output))) }) }) + + when("all required flags are passed", func() { + it("updates the kafka database config", func() { + cmd := exec.Command(builtBinaryPath, + "-t", "some-magic-token", + "-u", server.URL, + "database", + "configuration", + "update", + "--engine", "kafka", + "kafka-database-id", + "--config-json", `{"group_initial_rebalance_delay_ms":3000}`, + ) + + output, err := cmd.CombinedOutput() + expect.NoError(err, fmt.Sprintf("received error output: %s", output)) + expect.Empty(strings.TrimSpace(string(output))) + }) + }) })