Skip to content

Commit

Permalink
feat: add kafka resources operations (#465)
Browse files Browse the repository at this point in the history
* add kafka cluster resource operations

* add topic cmds

* add bats tests

* adjust test timings

* make docs

* remove flaky test fucntion

---------

Co-authored-by: Gabriela Limberea <[email protected]>
  • Loading branch information
digna-ionos and glimberea authored Dec 23, 2024
1 parent f052dbd commit 98ca45a
Show file tree
Hide file tree
Showing 74 changed files with 13,486 additions and 1 deletion.
15 changes: 14 additions & 1 deletion commands/cloudapi-v6/completer/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func AttachedCdromsIds(datacenterId, serverId string) []string {
return attachedCdromsIds
}

func DataCentersIds() []string {
func DataCentersIds(filters ...func(datacenter ionoscloud.Datacenter) bool) []string {
datacenterSvc := resources.NewDataCenterService(client.Must(), context.Background())
datacenters, _, err := datacenterSvc.List(resources.ListQueryParams{})
if err != nil {
Expand All @@ -65,6 +65,19 @@ func DataCentersIds() []string {
if item.Id == nil {
continue
}

skip := false
for _, filter := range filters {
if !filter(item) {
skip = true
break
}
}

if skip {
continue
}

completion = *item.Id
if props, ok := item.GetPropertiesOk(); ok {
if name, ok := props.GetNameOk(); ok {
Expand Down
34 changes: 34 additions & 0 deletions commands/kafka/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cluster

import (
"github.com/ionos-cloud/ionosctl/v6/internal/constants"
"github.com/ionos-cloud/ionosctl/v6/internal/core"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/tabheaders"
"github.com/spf13/cobra"
)

var (
allCols = []string{"Id", "Name", "Version", "Size", "DatacenterId", "LanId", "BrokerAddresses", "State"}
defaultCols = []string{"Id", "Name", "Version", "Size", "DatacenterId", "LanId", "BrokerAddresses", "State"}
)

func Command() *core.Command {
cmd := &core.Command{
Command: &cobra.Command{
Use: "cluster",
Short: "The sub-commands of 'ionosctl kafka cluster' allow you to manage kafka clusters",
Aliases: []string{"cl"},
TraverseChildren: true,
},
}
cmd.Command.PersistentFlags().StringSlice(constants.ArgCols, nil, tabheaders.ColsMessage(allCols))
_ = cmd.Command.RegisterFlagCompletionFunc(constants.ArgCols, func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return allCols, cobra.ShellCompDirectiveNoFileComp
})

cmd.AddCommand(List())
cmd.AddCommand(FindByID())
cmd.AddCommand(Delete())
cmd.AddCommand(Create())
return cmd
}
139 changes: 139 additions & 0 deletions commands/kafka/cluster/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package cluster

import (
"context"
"fmt"

cloudapiv6completer "github.com/ionos-cloud/ionosctl/v6/commands/cloudapi-v6/completer"
"github.com/ionos-cloud/ionosctl/v6/internal/client"
"github.com/ionos-cloud/ionosctl/v6/internal/constants"
"github.com/ionos-cloud/ionosctl/v6/internal/core"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/json2table/jsonpaths"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/jsontabwriter"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/tabheaders"
"github.com/ionos-cloud/ionosctl/v6/pkg/pointer"
kafka "github.com/ionos-cloud/sdk-go-kafka"
ionoscloud "github.com/ionos-cloud/sdk-go/v6"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func Create() *core.Command {
cmd := core.NewCommand(
context.Background(), nil, core.CommandBuilder{
Namespace: "kafka",
Resource: "cluster",
Verb: "create",
Aliases: []string{"c", "post"},
ShortDesc: "Create a kafka cluster. Wiki: https://docs.ionos.com/cloud/data-analytics/kafka/api-howtos/create-kafka",
Example: "ionosctl kafka cl create --name my-cluster --version 3.7.0 --size S --location de/txl --datacenter-id DATACENTER_ID --lan-id LAN_ID --broker-addresses 127.0.0.1/24,127.0.0.2/24,127.0.0.3/24",
PreCmdRun: func(c *core.PreCommandConfig) error {
if err := core.CheckRequiredFlags(
c.Command, c.NS,
constants.FlagName, constants.FlagVersion, constants.FlagSize, constants.FlagLocation,
constants.FlagDatacenterId, constants.FlagLanId, constants.FlagKafkaBrokerAddresses,
); err != nil {
return err
}

return nil
},
CmdRun: func(c *core.CommandConfig) error {
input := &kafka.Cluster{}
if err := setPropertiesFromFlags(c, input); err != nil {
return err
}

res, _, err := client.Must().Kafka.ClustersApi.ClustersPost(context.Background()).
ClusterCreate(
kafka.ClusterCreate{
Properties: input,
},
).Execute()
if err != nil {
return err
}

return printCluster(c, res)
},
InitClient: true,
},
)

cmd.Command.SilenceUsage = true
cmd.Command.Flags().SortFlags = false
return addClusterCreateFlags(cmd)
}

func addClusterCreateFlags(cmd *core.Command) *core.Command {
cmd.AddStringFlag(constants.FlagName, "", "", "The name of the kafka cluster", core.RequiredFlagOption())
cmd.AddStringFlag(constants.FlagVersion, "", "", "The version of the kafka cluster", core.RequiredFlagOption())
cmd.AddSetFlag(
constants.FlagSize, "", "", []string{"XS", "S", "M", "L", "XL"}, "The size of the kafka cluster",
core.RequiredFlagOption(),
)

cmd.AddStringFlag(
constants.FlagDatacenterId, "", "", "The ID of the datacenter", core.RequiredFlagOption(),
)
_ = cmd.Command.RegisterFlagCompletionFunc(
constants.FlagDatacenterId,
func(c *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return cloudapiv6completer.DataCentersIds(
func(datacenter ionoscloud.Datacenter) bool {
location, _ := cmd.Command.Flags().GetString(constants.FlagLocation)
return *datacenter.Properties.Location == location
},
), cobra.ShellCompDirectiveNoFileComp
},
)

cmd.AddStringFlag(constants.FlagLanId, "", "", "The ID of the LAN", core.RequiredFlagOption())
_ = cmd.Command.RegisterFlagCompletionFunc(
constants.FlagLanId,
func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return cloudapiv6completer.LansIds(cmd.Flag(constants.FlagDatacenterId).Value.String()), cobra.ShellCompDirectiveNoFileComp
},
)

cmd.AddStringSliceFlag(
constants.FlagKafkaBrokerAddresses, "", []string{}, "The list of broker addresses", core.RequiredFlagOption(),
)
return cmd
}

func setPropertiesFromFlags(c *core.CommandConfig, p *kafka.Cluster) error {
p.Name = pointer.From(viper.GetString(core.GetFlagName(c.NS, constants.FlagName)))
p.Version = pointer.From(viper.GetString(core.GetFlagName(c.NS, constants.FlagVersion)))
p.Size = pointer.From(viper.GetString(core.GetFlagName(c.NS, constants.FlagSize)))

p.Connections = &[]kafka.KafkaClusterConnection{
{
DatacenterId: pointer.From(viper.GetString(core.GetFlagName(c.NS, constants.FlagDatacenterId))),
LanId: pointer.From(viper.GetString(core.GetFlagName(c.NS, constants.FlagLanId))),
BrokerAddresses: pointer.From(
viper.GetStringSlice(
core.GetFlagName(
c.NS, constants.FlagKafkaBrokerAddresses,
),
),
),
},
}

return nil
}

func printCluster(c *core.CommandConfig, d kafka.ClusterRead) error {
cols, _ := c.Command.Command.Flags().GetStringSlice(constants.ArgCols)
out, err := jsontabwriter.GenerateOutput(
"", jsonpaths.KafkaCluster, d,
tabheaders.GetHeadersAllDefault(defaultCols, cols),
)
if err != nil {
return err
}

fmt.Fprintf(c.Command.Command.OutOrStdout(), out)
return nil
}
103 changes: 103 additions & 0 deletions commands/kafka/cluster/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package cluster

import (
"context"
"fmt"

"github.com/ionos-cloud/ionosctl/v6/commands/kafka/completer"
"github.com/ionos-cloud/ionosctl/v6/internal/client"
"github.com/ionos-cloud/ionosctl/v6/internal/constants"
"github.com/ionos-cloud/ionosctl/v6/pkg/confirm"
"github.com/ionos-cloud/ionosctl/v6/pkg/functional"
kafka "github.com/ionos-cloud/sdk-go-kafka"
"github.com/spf13/viper"

"github.com/ionos-cloud/ionosctl/v6/internal/core"
)

func Delete() *core.Command {
cmd := core.NewCommand(
context.Background(), nil, core.CommandBuilder{
Namespace: "kafka",
Resource: "cluster",
Verb: "delete",
Aliases: []string{"del", "d"},
ShortDesc: "Delete a cluster",
Example: `ionosctl kafka cl delete --cluster-id ID`,
PreCmdRun: func(c *core.PreCommandConfig) error {
if err := core.CheckRequiredFlagsSets(
c.Command, c.NS,
[]string{constants.FlagClusterId, constants.FlagLocation},
[]string{constants.ArgAll, constants.FlagLocation},
); err != nil {
return err
}

return nil
},
CmdRun: func(c *core.CommandConfig) error {
if all := viper.GetBool(core.GetFlagName(c.NS, constants.ArgAll)); all {
return deleteAll(c)
}

return deleteSingle(c, viper.GetString(core.GetFlagName(c.NS, constants.FlagClusterId)))
},
InitClient: true,
},
)

cmd.AddStringFlag(
constants.FlagClusterId, constants.FlagIdShort, "", "The ID of the cluster you want to retrieve",
core.RequiredFlagOption(),
core.WithCompletion(
func() []string {
return completer.ClustersProperty(
func(k kafka.ClusterRead) string {
return *k.Id
},
)
}, constants.KafkaApiRegionalURL,
),
)

cmd.AddBoolFlag(
constants.ArgAll, constants.ArgAllShort, false, "Delete all records if set", core.RequiredFlagOption(),
)

cmd.Command.SilenceUsage = true
cmd.Command.Flags().SortFlags = false

return cmd
}

func deleteAll(c *core.CommandConfig) error {
records, err := completer.Clusters()
if err != nil {
return fmt.Errorf("failed getting all clusters: %w", err)
}

return functional.ApplyAndAggregateErrors(
*records.GetItems(), func(d kafka.ClusterRead) error {
return deleteSingle(c, *d.Id)
},
)
}

func deleteSingle(c *core.CommandConfig, id string) error {
d, _, err := client.Must().Kafka.ClustersApi.ClustersFindById(context.Background(), id).Execute()
if err != nil {
return fmt.Errorf("cluster not found: %w", err)
}

yes := confirm.FAsk(
c.Command.Command.InOrStdin(),
fmt.Sprintf("Are you sure you want to delete cluster %s with name %s", *d.Id, *d.Properties.Name),
viper.GetBool(constants.ArgForce),
)
if !yes {
return fmt.Errorf("user cancelled deletion")
}

_, err = client.Must().Kafka.ClustersApi.ClustersDelete(context.Background(), *d.Id).Execute()
return err
}
77 changes: 77 additions & 0 deletions commands/kafka/cluster/get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cluster

import (
"context"
"fmt"

"github.com/ionos-cloud/ionosctl/v6/commands/kafka/completer"
"github.com/ionos-cloud/ionosctl/v6/internal/client"
"github.com/ionos-cloud/ionosctl/v6/internal/constants"
"github.com/ionos-cloud/ionosctl/v6/internal/core"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/json2table/jsonpaths"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/jsontabwriter"
"github.com/ionos-cloud/ionosctl/v6/internal/printer/tabheaders"
kafka "github.com/ionos-cloud/sdk-go-kafka"
"github.com/spf13/viper"
)

func FindByID() *core.Command {
cmd := core.NewCommand(
context.Background(), nil, core.CommandBuilder{
Namespace: "kafka",
Resource: "cluster",
Verb: "get",
Aliases: []string{"g"},
ShortDesc: "Retrieve a cluster",
Example: "ionosctl kafka cl get --cluster-id ID",
PreCmdRun: func(c *core.PreCommandConfig) error {
if err := core.CheckRequiredFlags(
c.Command, c.NS, constants.FlagClusterId, constants.FlagLocation,
); err != nil {
return err
}

return nil
},
CmdRun: func(c *core.CommandConfig) error {
clusterID := viper.GetString(core.GetFlagName(c.NS, constants.FlagClusterId))
r, _, err := client.Must().Kafka.ClustersApi.ClustersFindById(
context.Background(),
clusterID,
).Execute()
if err != nil {
return err
}

cols, _ := c.Command.Command.Flags().GetStringSlice(constants.ArgCols)
out, err := jsontabwriter.GenerateOutput(
"", jsonpaths.KafkaCluster, r,
tabheaders.GetHeadersAllDefault(defaultCols, cols),
)
if err != nil {
return err
}

fmt.Fprintf(c.Command.Command.OutOrStdout(), out)
return nil
},
InitClient: true,
},
)
cmd.AddStringFlag(
constants.FlagClusterId, constants.FlagIdShort, "", "The ID of the cluster you want to retrieve",
core.RequiredFlagOption(), core.WithCompletion(
func() []string {
return completer.ClustersProperty(
func(k kafka.ClusterRead) string {
return *k.Id
},
)
}, constants.KafkaApiRegionalURL,
),
)

cmd.Command.SilenceUsage = true
cmd.Command.Flags().SortFlags = false
return cmd
}
Loading

0 comments on commit 98ca45a

Please sign in to comment.