Skip to content

Commit

Permalink
Merge pull request #1151 from ploubser/issue_1145
Browse files Browse the repository at this point in the history
(#1145) Support interactive edit for consumers
  • Loading branch information
ripienaar authored Sep 12, 2024
2 parents ff01a9b + 7a4e814 commit 0a41d0b
Showing 1 changed file with 145 additions and 76 deletions.
221 changes: 145 additions & 76 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"math/rand"
"os"
"os/exec"
"os/signal"
"regexp"
"sort"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/nats-io/natscli/columns"
iu "github.com/nats-io/natscli/internal/util"
terminal "golang.org/x/term"
"gopkg.in/yaml.v3"

"github.com/AlecAivazis/survey/v2"
"github.com/choria-io/fisk"
Expand Down Expand Up @@ -116,6 +118,7 @@ type consumerCmd struct {
fInvert bool
fExpression string
fLeader string
interactive bool
}

func configureConsumerCommand(app commandHost) {
Expand Down Expand Up @@ -190,6 +193,7 @@ func configureConsumerCommand(app commandHost) {
edit.Arg("consumer", "Consumer name").StringVar(&c.consumer)
edit.Flag("config", "JSON file to read configuration from").ExistingFileVar(&c.inputFile)
edit.Flag("force", "Force removal without prompting").Short('f').UnNegatableBoolVar(&c.force)
edit.Flag("interactive", "Edit the configuring using your editor").Short('i').BoolVar(&c.interactive)
edit.Flag("dry-run", "Only shows differences, do not edit the stream").UnNegatableBoolVar(&c.dryRun)
addCreateFlags(edit, true)

Expand Down Expand Up @@ -556,108 +560,173 @@ func (c *consumerCmd) leaderStandDownAction(_ *fisk.ParseContext) error {
return nil
}

func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
c.connectAndSetup(true, true)
var err error
func (c *consumerCmd) interactiveEdit(cfg api.ConsumerConfig) (*api.ConsumerConfig, error) {
editor := os.Getenv("EDITOR")
if editor == "" {
return &api.ConsumerConfig{}, fmt.Errorf("set EDITOR environment variable to your chosen editor")
}

if c.selectedConsumer == nil {
c.selectedConsumer, err = c.mgr.LoadConsumer(c.stream, c.consumer)
fisk.FatalIfError(err, "could not load Consumer")
cj, err := decoratedYamlMarshal(cfg)
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}

if !c.selectedConsumer.IsDurable() {
return fmt.Errorf("only durable consumers can be edited")
tfile, err := os.CreateTemp("", "*.yaml")
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}
defer os.Remove(tfile.Name())

// lazy deep copy
t := c.selectedConsumer.Configuration()
tj, err := json.Marshal(t)
_, err = fmt.Fprint(tfile, string(cj))
if err != nil {
return err
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}
var ncfg *api.ConsumerConfig

tfile.Close()

cmd := exec.Command(editor, tfile.Name())
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err = cmd.Run()
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}

nb, err := os.ReadFile(tfile.Name())
if err != nil {
return &api.ConsumerConfig{}, err
}

ncfg := api.ConsumerConfig{}
err = yaml.Unmarshal(nb, &ncfg)
if err != nil {
return &api.ConsumerConfig{}, err
}

// some yaml quirks
if len(ncfg.BackOff) == 0 {
ncfg.BackOff = nil
}

return &ncfg, nil
}

func (c *consumerCmd) copyAndEditConsumer(cfg api.ConsumerConfig) (*api.ConsumerConfig, error) {
var err error

if c.inputFile != "" {
ncfg, err = c.loadConfigFile(c.inputFile)
if err != nil {
return err
}
} else {
err = json.Unmarshal(tj, &ncfg)
if err != nil {
return err
}
return c.loadConfigFile(c.inputFile)
}

if c.description != "" {
ncfg.Description = c.description
}
if c.description != "" {
cfg.Description = c.description
}

if c.inactiveThreshold != 0 {
ncfg.InactiveThreshold = c.inactiveThreshold
}
if c.inactiveThreshold != 0 {
cfg.InactiveThreshold = c.inactiveThreshold
}

if c.maxDeliver != 0 {
ncfg.MaxDeliver = c.maxDeliver
}
if c.maxDeliver != 0 {
cfg.MaxDeliver = c.maxDeliver
}

if c.maxAckPending != -1 {
ncfg.MaxAckPending = c.maxAckPending
}
if c.maxAckPending != -1 {
cfg.MaxAckPending = c.maxAckPending
}

if c.ackWait != -1*time.Second {
ncfg.AckWait = c.ackWait
}
if c.ackWait != -1*time.Second {
cfg.AckWait = c.ackWait
}

if c.maxWaiting != 0 {
ncfg.MaxWaiting = c.maxWaiting
}
if c.maxWaiting != 0 {
cfg.MaxWaiting = c.maxWaiting
}

if c.samplePct != -1 {
ncfg.SampleFrequency = c.sampleFreqFromInt(c.samplePct)
}
if c.samplePct != -1 {
cfg.SampleFrequency = c.sampleFreqFromInt(c.samplePct)
}

if c.maxPullBatch > 0 {
ncfg.MaxRequestBatch = c.maxPullBatch
}
if c.maxPullBatch > 0 {
cfg.MaxRequestBatch = c.maxPullBatch
}

if c.maxPullExpire > 0 {
ncfg.MaxRequestExpires = c.maxPullExpire
}
if c.maxPullExpire > 0 {
cfg.MaxRequestExpires = c.maxPullExpire
}

if c.maxPullBytes > 0 {
ncfg.MaxRequestMaxBytes = c.maxPullBytes
}
if c.maxPullBytes > 0 {
cfg.MaxRequestMaxBytes = c.maxPullBytes
}

if c.backoffMode != "" {
ncfg.BackOff, err = c.backoffPolicy()
if err != nil {
return fmt.Errorf("could not determine backoff policy: %v", err)
}
if c.backoffMode != "" {
cfg.BackOff, err = c.backoffPolicy()
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not determine backoff policy: %v", err)
}
}

if c.delivery != "" {
ncfg.DeliverSubject = c.delivery
}
if c.delivery != "" {
cfg.DeliverSubject = c.delivery
}

if c.hdrsOnlySet {
ncfg.HeadersOnly = c.hdrsOnly
}
if c.hdrsOnlySet {
cfg.HeadersOnly = c.hdrsOnly
}

if len(c.filterSubjects) == 1 {
ncfg.FilterSubject = c.filterSubjects[0]
ncfg.FilterSubjects = nil
} else if len(c.filterSubjects) > 1 {
ncfg.FilterSubjects = c.filterSubjects
ncfg.FilterSubject = ""
}
if len(c.filterSubjects) == 1 {
cfg.FilterSubject = c.filterSubjects[0]
cfg.FilterSubjects = nil
} else if len(c.filterSubjects) > 1 {
cfg.FilterSubjects = c.filterSubjects
cfg.FilterSubject = ""
}

if c.replicas > 0 {
ncfg.Replicas = c.replicas
}
if c.replicas > 0 {
cfg.Replicas = c.replicas
}

if c.metadataIsSet {
ncfg.Metadata = c.metadata
}
if c.metadataIsSet {
cfg.Metadata = c.metadata
}

return &cfg, nil
}
func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
c.connectAndSetup(true, true)
var err error

if c.selectedConsumer == nil {
c.selectedConsumer, err = c.mgr.LoadConsumer(c.stream, c.consumer)
fisk.FatalIfError(err, "could not load Consumer")
}

if !c.selectedConsumer.IsDurable() {
return fmt.Errorf("only durable consumers can be edited")
}

// lazy deep copy
t := c.selectedConsumer.Configuration()
t.Metadata = iu.RemoveReservedMetadata(t.Metadata)

tj, err := json.Marshal(t)
if err != nil {
return err
}

var ncfg *api.ConsumerConfig
err = json.Unmarshal(tj, &ncfg)
if err != nil {
return err
}

if c.interactive {
ncfg, err = c.interactiveEdit(t)
fisk.FatalIfError(err, "could not create new configuration for Consumer %s", c.selectedConsumer.Name())
} else {
ncfg, err = c.copyAndEditConsumer(t)
fisk.FatalIfError(err, "could not create new configuration for Consumer %s", c.selectedConsumer.Name())
}

if len(ncfg.BackOff) > 0 && ncfg.AckWait != t.AckWait {
Expand Down

0 comments on commit 0a41d0b

Please sign in to comment.