From 2a7570df2eb274bd277bef6a566d4e6234eacd07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 12 Jan 2023 17:30:59 -0800 Subject: [PATCH 1/8] Fix guard Remove redundant humanize Make a few things consistent and cleanup a couple redundancies from the merge Implement PR feedback Source details made more compact Make test check for both arguments being non-null to improve readability Adds lag/last seen/external/error for sources in KV info improve stream report Rebase merge from main --- cli/consumer_command.go | 5 - cli/kv_command.go | 40 +++++- cli/stream_command.go | 283 +++++++++++++++++++++++++++------------- 3 files changed, 229 insertions(+), 99 deletions(-) diff --git a/cli/consumer_command.go b/cli/consumer_command.go index 72534565..59df1a35 100644 --- a/cli/consumer_command.go +++ b/cli/consumer_command.go @@ -572,7 +572,6 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo cols.AddRowIfNotEmpty("Name", config.Name) cols.AddRowIf("Durable Name", config.Durable, config.Durable != "" && config.Durable != config.Name) cols.AddRowIfNotEmpty("Description", config.Description) - if config.DeliverSubject != "" { cols.AddRow("Delivery Subject", config.DeliverSubject) } else { @@ -583,7 +582,6 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo } else if len(config.FilterSubjects) > 0 { cols.AddRow("Filter Subjects", config.FilterSubjects) } - switch config.DeliverPolicy { case api.DeliverAll: cols.AddRow("Deliver Policy", "All") @@ -598,7 +596,6 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo case api.DeliverByStartSequence: cols.AddRowf("Deliver Policy", "From Sequence %d", config.OptStartSeq) } - cols.AddRowIf("Deliver Queue Group", config.DeliverGroup, config.DeliverGroup != "" && config.DeliverSubject != "") cols.AddRow("Ack Policy", config.AckPolicy.String()) cols.AddRowIf("Ack Wait", config.AckWait, config.AckPolicy != api.AckNone) @@ -656,7 +653,6 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo } else { cols.AddRowf("Last Delivered Message", "Consumer sequence: %s Stream sequence: %s Last delivery: %s ago", f(state.Delivered.Consumer), f(state.Delivered.Stream), f(sinceRefOrNow(state.TimeStamp, *state.Delivered.Last))) } - if config.AckPolicy != api.AckNone { if state.AckFloor.Last == nil { cols.AddRowf("Acknowledgment Floor", "Consumer sequence: %s Stream sequence: %s", f(state.AckFloor.Consumer), f(state.AckFloor.Stream)) @@ -670,7 +666,6 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo } cols.AddRow("Redelivered Messages", state.NumRedelivered) } - cols.AddRow("Unprocessed Messages", state.NumPending) if config.DeliverSubject == "" { if config.MaxWaiting > 0 { diff --git a/cli/kv_command.go b/cli/kv_command.go index 790857ec..a66e57e3 100644 --- a/cli/kv_command.go +++ b/cli/kv_command.go @@ -58,6 +58,7 @@ type kvCommand struct { repubHeadersOnly bool mirror string mirrorDomain string + sources []string } func configureKVCommand(app commandHost) { @@ -88,6 +89,7 @@ for an indefinite period or a per-bucket configured TTL. add.Flag("republish-headers", "Republish only message headers, no bodies").UnNegatableBoolVar(&c.repubHeadersOnly) add.Flag("mirror", "Creates a mirror of a different bucket").StringVar(&c.mirror) add.Flag("mirror-domain", "When mirroring find the bucket in a different domain").StringVar(&c.mirrorDomain) + add.Flag("source", "Source from a different bucket").PlaceHolder("BUCKET").StringsVar(&c.sources) add.PreAction(c.parseLimitStrings) @@ -464,6 +466,17 @@ func (c *kvCommand) addAction(_ *fisk.ParseContext) error { } } + if len(c.sources) != 0 { + var sources []*nats.StreamSource + + for _, source := range c.sources { + sources = append(sources, &nats.StreamSource{ + Name: source, + }) + } + cfg.Sources = sources + } + store, err := js.CreateKeyValue(cfg) if err != nil { return err @@ -744,16 +757,16 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error { if nfo != nil { cols.AddRowIfNotEmpty("Description", nfo.Config.Description) - cols.AddRow("Bucket Size", humanize.IBytes(nfo.State.Bytes)) + cols.AddRow("Bucket Size", nfo.State.Bytes) if nfo.Config.MaxBytes == -1 { cols.AddRow("Maximum Bucket Size", "unlimited") } else { - cols.AddRow("Maximum Bucket Size", humanize.IBytes(uint64(nfo.Config.MaxBytes))) + cols.AddRow("Maximum Bucket Size", nfo.Config.MaxBytes) } if nfo.Config.MaxMsgSize == -1 { cols.AddRow("Maximum Value Size", "unlimited") } else { - cols.AddRow("Maximum Value Size", humanize.IBytes(uint64(nfo.Config.MaxMsgSize))) + cols.AddRow("Maximum Value Size", nfo.Config.MaxMsgSize) } if nfo.Config.MaxAge <= 0 { cols.AddRow("Maximum Age", "unlimited") @@ -774,7 +787,9 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error { s := nfo.Mirror cols.AddSectionTitle("Mirror Information") cols.AddRow("Origin Bucket", strings.TrimPrefix(s.Name, "KV_")) - cols.AddRowIf("External API", s.External.APIPrefix, s.External != nil) + if s.External != nil { + cols.AddRow("External API", s.External.APIPrefix) + } if s.Active > 0 && s.Active < math.MaxInt64 { cols.AddRow("Last Seen", s.Active) } else { @@ -782,6 +797,23 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error { } cols.AddRow("Lag", s.Lag) } + + if len(nfo.Sources) > 0 { + cols.AddSectionTitle("Sources Information") + for _, source := range nfo.Sources { + cols.AddRow("Source Bucket", strings.TrimPrefix(source.Name, "KV_")) + if source.External != nil { + cols.AddRow("External API", source.External.APIPrefix) + } + if source.Active > 0 && source.Active < math.MaxInt64 { + cols.AddRow("Last Seen", source.Active) + } else { + cols.AddRow("Last Seen", "never") + } + cols.AddRow("Lag", source.Lag) + } + } + if nfo.Cluster != nil { cols.AddSectionTitle("Cluster Information") renderNatsGoClusterInfo(cols, nfo) diff --git a/cli/stream_command.go b/cli/stream_command.go index 11cc8114..ee19be73 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -56,68 +56,76 @@ type streamCmd struct { showAll bool acceptDefaults bool - destination string - subjects []string - ack bool - storage string - maxMsgLimit int64 - maxMsgPerSubjectLimit int64 - maxBytesLimitString string - maxBytesLimit int64 - maxAgeLimit string - maxMsgSizeString string - maxMsgSize int64 - maxConsumers int - reportSortConsumers bool - reportSortMsgs bool - reportSortName bool - reportSortReverse bool - reportSortStorage bool - reportSort string - reportRaw bool - reportLimitCluster string - reportLeaderDistrib bool - discardPolicy string - validateOnly bool - backupDirectory string - showProgress bool - healthCheck bool - snapShotConsumers bool - dupeWindow string - replicas int64 - placementCluster string - placementTags []string - placementClusterSet bool - placementTagsSet bool - peerName string - sources []string - mirror string - interactive bool - purgeKeep uint64 - purgeSubject string - purgeSequence uint64 - description string - repubSource string - repubDest string - repubHeadersOnly bool - noRepub bool - allowRollup bool - allowRollupSet bool - denyDelete bool - denyDeleteSet bool - denyPurge bool - denyPurgeSet bool - allowDirect bool - allowDirectSet bool - allowMirrorDirect bool - allowMirrorDirectSet bool - discardPerSubj bool - discardPerSubjSet bool - showStateOnly bool - metadata map[string]string - metadataIsSet bool - compression string - firstSeq uint64 + destination string + subjects []string + ack bool + storage string + maxMsgLimit int64 + maxMsgPerSubjectLimit int64 + maxBytesLimitString string + maxBytesLimit int64 + maxAgeLimit string + maxMsgSizeString string + maxMsgSize int64 + maxConsumers int + reportSortConsumers bool + reportSortMsgs bool + reportSortName bool + reportSortReverse bool + reportSortStorage bool + reportSort string + reportRaw bool + reportLimitCluster string + reportLeaderDistrib bool + discardPolicy string + validateOnly bool + backupDirectory string + showProgress bool + healthCheck bool + snapShotConsumers bool + dupeWindow string + replicas int64 + placementCluster string + placementTags []string + placementClusterSet bool + placementTagsSet bool + peerName string + sources []string + mirror string + interactive bool + purgeKeep uint64 + purgeSubject string + purgeSequence uint64 + description string + subjectTransformSource string + subjectTransformSourceSet bool + subjectTransformDest string + subjectTransformDestSet bool + noSubjectTransform bool + repubSource string + repubSourceSet bool + repubDest string + repubDestSet bool + repubHeadersOnly bool + repubHeadersOnlySet bool + noRepub bool + allowRollup bool + allowRollupSet bool + denyDelete bool + denyDeleteSet bool + denyPurge bool + denyPurgeSet bool + allowDirect bool + allowDirectSet bool + allowMirrorDirect bool + allowMirrorDirectSet bool + discardPerSubj bool + discardPerSubjSet bool + showStateOnly bool + metadata map[string]string + metadataIsSet bool + compression string + firstSeq uint64 fServer string fCluster string @@ -200,12 +208,15 @@ func configureStreamCommand(app commandHost) { f.Flag("deny-purge", "Deny entire stream or subject purges via the API").IsSetByUser(&c.denyPurgeSet).BoolVar(&c.denyPurge) f.Flag("allow-direct", "Allows fast, direct, access to stream data via the direct get API").IsSetByUser(&c.allowDirectSet).Default("true").BoolVar(&c.allowDirect) f.Flag("allow-mirror-direct", "Allows fast, direct, access to stream data via the direct get API on mirrors").IsSetByUser(&c.allowMirrorDirectSet).BoolVar(&c.allowMirrorDirect) + f.Flag("transform-source", "Stream subject transform source").PlaceHolder("SOURCE").StringVar(&c.subjectTransformSource) + f.Flag("transform-destination", "Stream subject transform destination").PlaceHolder("DEST").StringVar(&c.subjectTransformDest) f.Flag("metadata", "Adds metadata to the stream").PlaceHolder("META").IsSetByUser(&c.metadataIsSet).StringMapVar(&c.metadata) f.Flag("republish-source", "Republish messages to --republish-destination").PlaceHolder("SOURCE").StringVar(&c.repubSource) f.Flag("republish-destination", "Republish destination for messages in --republish-source").PlaceHolder("DEST").StringVar(&c.repubDest) - f.Flag("republish-headers", "Republish only message headers, no bodies").UnNegatableBoolVar(&c.repubHeadersOnly) + f.Flag("republish-headers", "Republish only message headers, no bodies").BoolVar(&c.repubHeadersOnly) if edit { f.Flag("no-republish", "Removes current republish configuration").UnNegatableBoolVar(&c.noRepub) + f.Flag("no-transform", "Removes current subject transform configuration").UnNegatableBoolVar(&c.noSubjectTransform) } f.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json) @@ -1079,8 +1090,10 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error { snode = dg.Node(source.Name) } edge := dg.Edge(snode, node).Attr("color", "green") - if source.FilterSubject != "" { + if source.FilterSubject != "" && source.SubjectTransformDest == "" { edge.Label(source.FilterSubject) + } else if source.FilterSubject != "" && source.SubjectTransformDest != "" { + edge.Label(source.FilterSubject + " to " + source.SubjectTransformDest) } } } @@ -1144,7 +1157,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error { func (c *streamCmd) renderReplication(stats []streamStat) { table := newTableWriter("Replication Report") - table.AddHeaders("Stream", "Kind", "API Prefix", "Source Stream", "Active", "Lag", "Error") + table.AddHeaders("Stream", "Kind", "API Prefix", "Source Stream", "Filter", "Destination", "Active", "Lag", "Error") for _, s := range stats { if len(s.Sources) == 0 && s.Mirror == nil { @@ -1163,7 +1176,7 @@ func (c *streamCmd) renderReplication(stats []streamStat) { } if c.reportRaw { - table.AddRow(s.Name, "Mirror", eApiPrefix, s.Mirror.Name, s.Mirror.Active, s.Mirror.Lag, apierr) + table.AddRow(s.Name, "Mirror", eApiPrefix, s.Mirror.Name, "", "", s.Mirror.Active, s.Mirror.Lag, apierr) } else { table.AddRow(s.Name, "Mirror", eApiPrefix, s.Mirror.Name, f(s.Mirror.Active), f(s.Mirror.Lag), apierr) } @@ -1180,10 +1193,14 @@ func (c *streamCmd) renderReplication(stats []streamStat) { eApiPrefix = source.External.ApiPrefix } + if source.SubjectTransformDest != "" && source.FilterSubject == "" { + source.FilterSubject = ">" + } + if c.reportRaw { - table.AddRow(s.Name, "Source", eApiPrefix, source.Name, source.Active, source.Lag, apierr) + table.AddRow(s.Name, "Source", eApiPrefix, source.Name, source.FilterSubject, source.SubjectTransformDest, source.Active, source.Lag, apierr) } else { - table.AddRow(s.Name, "Source", eApiPrefix, source.Name, f(source.Active), f(source.Lag), apierr) + table.AddRow(s.Name, "Source", eApiPrefix, source.Name, source.FilterSubject, source.SubjectTransformDest, f(source.Active), f(source.Lag), apierr) } } @@ -1364,7 +1381,7 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex } if len(c.sources) > 0 || c.mirror != "" { - return cfg, fmt.Errorf("cannot edit mirrors or sources using the CLI, use --config instead") + return cfg, fmt.Errorf("cannot edit mirrors, or sources using the CLI, use --config instead") } if c.description != "" { @@ -1403,18 +1420,61 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex return cfg, fmt.Errorf("invalid compression algorithm") } - if c.repubDest != "" && c.repubSource != "" { - cfg.RePublish = &api.RePublish{ - Source: c.repubSource, - Destination: c.repubDest, - HeadersOnly: c.repubHeadersOnly, - } + var repubConfig api.RePublish + + if cfg.RePublish != nil { + repubConfig = *cfg.RePublish + } else { + repubConfig = api.RePublish{} + } + + if c.repubSource != "" { + repubConfig.Source = c.repubSource + } + if c.repubDest != "" { + repubConfig.Destination = c.repubDest + } + if c.repubHeadersOnly { + repubConfig.HeadersOnly = true + } else if repubConfig.HeadersOnly { + repubConfig.HeadersOnly = false + } + + if (repubConfig.Source == "" || repubConfig.Source == ">") && repubConfig.Destination == "" { + cfg.RePublish = nil + } else { + cfg.RePublish = &repubConfig } if c.noRepub { cfg.RePublish = nil } + var subjectTransformConfig api.SubjectTransformConfig + + if cfg.SubjectTransform != nil { + subjectTransformConfig = *cfg.SubjectTransform + } else { + subjectTransformConfig = api.SubjectTransformConfig{} + } + + if c.subjectTransformSource != "" { + subjectTransformConfig.Source = c.subjectTransformSource + } + if c.subjectTransformDest != "" { + subjectTransformConfig.Destination = c.subjectTransformDest + } + + if subjectTransformConfig.Source == "" && subjectTransformConfig.Destination == "" { + cfg.SubjectTransform = nil + } else { + cfg.SubjectTransform = &subjectTransformConfig + } + + if c.noSubjectTransform { + cfg.SubjectTransform = nil + } + return cfg, nil } @@ -1584,7 +1644,6 @@ func (c *streamCmd) cpAction(pc *fisk.ParseContext) error { func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) { cols.AddRowIfNotEmpty("Description", cfg.Description) - cols.AddRowIf("Subjects", cfg.Subjects, len(cfg.Subjects) > 0) cols.AddRow("Replicas", cfg.Replicas) cols.AddRowIf("Sealed", true, cfg.Sealed) @@ -1597,6 +1656,16 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) cols.AddRowIfNotEmpty("Placement Cluster", cfg.Placement.Cluster) cols.AddRowIf("Placement Tags", cfg.Placement.Tags, len(cfg.Placement.Tags) > 0) } + + cols.AddSectionTitle("Options") + + if cfg.SubjectTransform != nil && cfg.SubjectTransform.Destination != "" { + source := cfg.SubjectTransform.Source + if source == "" { + source = ">" + } + cols.AddRowf("Subject Transform", "%s to %s", source, cfg.SubjectTransform.Destination) + } if cfg.RePublish != nil { if cfg.RePublish.HeadersOnly { cols.AddRowf("Republishing Headers", "%s to %s", cfg.RePublish.Source, cfg.RePublish.Destination) @@ -1604,9 +1673,6 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) cols.AddRowf("Republishing", "%s to %s", cfg.RePublish.Source, cfg.RePublish.Destination) } } - - cols.AddSectionTitle("Options") - cols.AddRow("Retention", cfg.Retention.String()) cols.AddRow("Acknowledgments", !cfg.NoAck) dnp := cfg.Discard.String() @@ -1636,7 +1702,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) if cfg.MaxBytes == -1 { cols.AddRow("Maximum Bytes", "unlimited") } else { - cols.AddRow("Maximum Bytes", humanize.IBytes(uint64(cfg.MaxBytes))) + cols.AddRow("Maximum Bytes", cfg.MaxBytes) } if cfg.MaxAge <= 0 { cols.AddRow("Maximum Age", "unlimited") @@ -1646,7 +1712,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) if cfg.MaxMsgSize == -1 { cols.AddRow("Maximum Message Size", "unlimited") } else { - cols.AddRow("Maximum Message Size", humanize.IBytes(uint64(cfg.MaxMsgSize))) + cols.AddRow("Maximum Message Size", cfg.MaxMsgSize) } if cfg.MaxConsumers == -1 { cols.AddRow("Maximum Consumers", "unlimited") @@ -1687,7 +1753,22 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string { return "" } - parts := []string{s.Name} + var parts []string + + if s.FilterSubject == "" && s.SubjectTransformDest == "" { + parts = append(parts, s.Name) + } else { + var filter = ">" + if s.FilterSubject != "" { + filter = s.FilterSubject + } + if s.SubjectTransformDest != "" { + parts = append(parts, fmt.Sprintf("%s (%s to %s)", s.Name, filter, s.SubjectTransformDest)) + } else { + parts = append(parts, fmt.Sprintf("%s (%s)", s.Name, s.FilterSubject)) + } + } + if s.OptStartSeq > 0 { parts = append(parts, fmt.Sprintf("Start Seq: %s", f(s.OptStartSeq))) } @@ -1695,9 +1776,6 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string { if s.OptStartTime != nil { parts = append(parts, fmt.Sprintf("Start Time: %v", s.OptStartTime)) } - if s.FilterSubject != "" { - parts = append(parts, fmt.Sprintf("Subject: %s", s.FilterSubject)) - } if s.External != nil { if s.External.ApiPrefix != "" { parts = append(parts, fmt.Sprintf("API Prefix: %s", s.External.ApiPrefix)) @@ -1775,6 +1853,15 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { showSource := func(s *api.StreamSourceInfo) { cols.AddRow("Stream Name", s.Name) + if s.SubjectTransformDest != "" { + var filter = ">" + if s.FilterSubject != "" { + filter = s.FilterSubject + } + cols.AddRowf("Subject Transform", "%s to %s", filter, s.SubjectTransformDest) + } else { + cols.AddRowIfNotEmpty("Subject Filter", s.FilterSubject) + } cols.AddRow("Lag", s.Lag) if s.Active > 0 && s.Active < math.MaxInt64 { cols.AddRow("Last Seen", s.Active) @@ -1791,6 +1878,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { cols.AddRow("Error", s.Error.Description) } } + if info.Mirror != nil { cols.AddSectionTitle("Mirror Information") showSource(info.Mirror) @@ -1816,13 +1904,11 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { } else { cols.AddRowf("First Sequence", "%s @ %s UTC", f(info.State.FirstSeq), f(info.State.FirstTime)) } - if info.State.LastTime.Equal(time.Unix(0, 0)) || info.State.LastTime.IsZero() { cols.AddRow("Last Sequence", info.State.LastSeq) } else { cols.AddRowf("Last Sequence", "%s @ %s UTC", f(info.State.LastSeq), f(info.State.LastTime)) } - if len(info.State.Deleted) > 0 { // backwards compat with older servers cols.AddRow("Deleted Messages", len(info.State.Deleted)) } else if info.State.NumDeleted > 0 { @@ -1834,7 +1920,6 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { if info.State.NumSubjects > 0 { cols.AddRow("Number of Subjects", info.State.NumSubjects) } - if len(info.Alternates) > 0 { lName := 0 lCluster := 0 @@ -1846,7 +1931,6 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { lCluster = len(s.Cluster) } } - for i, s := range info.Alternates { msg := fmt.Sprintf("%s%s: Cluster: %s%s", strings.Repeat(" ", lName-len(s.Name)), s.Name, strings.Repeat(" ", lCluster-len(s.Cluster)), s.Cluster) if s.Domain != "" { @@ -2224,7 +2308,7 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } - if c.repubSource != "" && c.repubDest != "" { + if c.repubSource != "" || c.repubDest != "" { cfg.RePublish = &api.RePublish{ Source: c.repubSource, Destination: c.repubDest, @@ -2232,6 +2316,16 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } + if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { + fisk.Fatalf("must specify both --transform-source and --transform-destination") + } + if c.subjectTransformSource != "" && c.subjectTransformDest != "" { + cfg.SubjectTransform = &api.SubjectTransformConfig{ + Source: c.subjectTransformSource, + Destination: c.subjectTransformDest, + } + } + return cfg } @@ -2336,6 +2430,14 @@ func (c *streamCmd) askSource(name string, prefix string) *api.StreamSource { }, &cfg.FilterSubject) fisk.FatalIfError(err, "could not request filter") + if cfg.FilterSubject != "" { + err = askOne(&survey.Input{ + Message: fmt.Sprintf("%s Subject mapping transform", prefix), + Help: "Map matching subjects according to this transform destination", + }, &cfg.SubjectTransformDest) + fisk.FatalIfError(err, "could not request subject mapping destination transform") + } + ok, err = askConfirmation(fmt.Sprintf("Import %q from a different JetStream domain", name), false) fisk.FatalIfError(err, "Could not request source details") if ok { @@ -2420,6 +2522,7 @@ func (c *streamCmd) validateCfg(cfg *api.StreamConfig) (bool, []byte, []string, } func (c *streamCmd) addAction(pc *fisk.ParseContext) (err error) { + _, mgr, err := prepareHelper("", natsOpts()...) fisk.FatalIfError(err, "could not create Stream") From 9dd9a943a29fd5e60c25ac528b9fc76b8290443f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Tue, 8 Aug 2023 13:26:15 -0700 Subject: [PATCH 2/8] Cleanup unused --- cli/stream_command.go | 131 ++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 68 deletions(-) diff --git a/cli/stream_command.go b/cli/stream_command.go index ee19be73..aa6b7fb5 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -56,74 +56,69 @@ type streamCmd struct { showAll bool acceptDefaults bool - destination string - subjects []string - ack bool - storage string - maxMsgLimit int64 - maxMsgPerSubjectLimit int64 - maxBytesLimitString string - maxBytesLimit int64 - maxAgeLimit string - maxMsgSizeString string - maxMsgSize int64 - maxConsumers int - reportSortConsumers bool - reportSortMsgs bool - reportSortName bool - reportSortReverse bool - reportSortStorage bool - reportSort string - reportRaw bool - reportLimitCluster string - reportLeaderDistrib bool - discardPolicy string - validateOnly bool - backupDirectory string - showProgress bool - healthCheck bool - snapShotConsumers bool - dupeWindow string - replicas int64 - placementCluster string - placementTags []string - placementClusterSet bool - placementTagsSet bool - peerName string - sources []string - mirror string - interactive bool - purgeKeep uint64 - purgeSubject string - purgeSequence uint64 - description string - subjectTransformSource string - subjectTransformSourceSet bool - subjectTransformDest string - subjectTransformDestSet bool - noSubjectTransform bool - repubSource string - repubSourceSet bool - repubDest string - repubDestSet bool - repubHeadersOnly bool - repubHeadersOnlySet bool - noRepub bool - allowRollup bool - allowRollupSet bool - denyDelete bool - denyDeleteSet bool - denyPurge bool - denyPurgeSet bool - allowDirect bool - allowDirectSet bool - allowMirrorDirect bool - allowMirrorDirectSet bool - discardPerSubj bool - discardPerSubjSet bool - showStateOnly bool - metadata map[string]string - metadataIsSet bool + destination string + subjects []string + ack bool + storage string + maxMsgLimit int64 + maxMsgPerSubjectLimit int64 + maxBytesLimitString string + maxBytesLimit int64 + maxAgeLimit string + maxMsgSizeString string + maxMsgSize int64 + maxConsumers int + reportSortConsumers bool + reportSortMsgs bool + reportSortName bool + reportSortReverse bool + reportSortStorage bool + reportSort string + reportRaw bool + reportLimitCluster string + reportLeaderDistrib bool + discardPolicy string + validateOnly bool + backupDirectory string + showProgress bool + healthCheck bool + snapShotConsumers bool + dupeWindow string + replicas int64 + placementCluster string + placementTags []string + placementClusterSet bool + placementTagsSet bool + peerName string + sources []string + mirror string + interactive bool + purgeKeep uint64 + purgeSubject string + purgeSequence uint64 + description string + subjectTransformSource string + subjectTransformDest string + noSubjectTransform bool + repubSource string + repubDest string + repubHeadersOnly bool + noRepub bool + allowRollup bool + allowRollupSet bool + denyDelete bool + denyDeleteSet bool + denyPurge bool + denyPurgeSet bool + allowDirect bool + allowDirectSet bool + allowMirrorDirect bool + allowMirrorDirectSet bool + discardPerSubj bool + discardPerSubjSet bool + showStateOnly bool + metadata map[string]string + metadataIsSet bool compression string firstSeq uint64 From e0237b07043f1c334e85adcf1c84a3c54c937c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 10 Aug 2023 02:52:04 -0700 Subject: [PATCH 3/8] PR feedback Add empty lines around control statements to improve readability Update go.mod for nats.go to the 2.10.0 branch current latest commit --- cli/consumer_command.go | 8 ++++++ cli/kv_command.go | 17 +++++------- cli/stream_command.go | 57 ++++++++++++++++++++++++++--------------- go.mod | 26 +++++++++---------- go.sum | 50 +++++++++++++++++++----------------- 5 files changed, 89 insertions(+), 69 deletions(-) diff --git a/cli/consumer_command.go b/cli/consumer_command.go index 59df1a35..4cbb7464 100644 --- a/cli/consumer_command.go +++ b/cli/consumer_command.go @@ -572,16 +572,19 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo cols.AddRowIfNotEmpty("Name", config.Name) cols.AddRowIf("Durable Name", config.Durable, config.Durable != "" && config.Durable != config.Name) cols.AddRowIfNotEmpty("Description", config.Description) + if config.DeliverSubject != "" { cols.AddRow("Delivery Subject", config.DeliverSubject) } else { cols.AddRow("Pull Mode", true) } + if config.FilterSubject != "" { cols.AddRow("Filter Subject", config.FilterSubject) } else if len(config.FilterSubjects) > 0 { cols.AddRow("Filter Subjects", config.FilterSubjects) } + switch config.DeliverPolicy { case api.DeliverAll: cols.AddRow("Deliver Policy", "All") @@ -596,6 +599,7 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo case api.DeliverByStartSequence: cols.AddRowf("Deliver Policy", "From Sequence %d", config.OptStartSeq) } + cols.AddRowIf("Deliver Queue Group", config.DeliverGroup, config.DeliverGroup != "" && config.DeliverSubject != "") cols.AddRow("Ack Policy", config.AckPolicy.String()) cols.AddRowIf("Ack Wait", config.AckWait, config.AckPolicy != api.AckNone) @@ -653,6 +657,7 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo } else { cols.AddRowf("Last Delivered Message", "Consumer sequence: %s Stream sequence: %s Last delivery: %s ago", f(state.Delivered.Consumer), f(state.Delivered.Stream), f(sinceRefOrNow(state.TimeStamp, *state.Delivered.Last))) } + if config.AckPolicy != api.AckNone { if state.AckFloor.Last == nil { cols.AddRowf("Acknowledgment Floor", "Consumer sequence: %s Stream sequence: %s", f(state.AckFloor.Consumer), f(state.AckFloor.Stream)) @@ -666,7 +671,9 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo } cols.AddRow("Redelivered Messages", state.NumRedelivered) } + cols.AddRow("Unprocessed Messages", state.NumPending) + if config.DeliverSubject == "" { if config.MaxWaiting > 0 { cols.AddRowf("Waiting Pulls", "%s of maximum %s", f(state.NumWaiting), f(config.MaxWaiting)) @@ -693,6 +700,7 @@ func (c *consumerCmd) infoAction(_ *fisk.ParseContext) error { var err error consumer := c.selectedConsumer + if consumer == nil { consumer, err = c.mgr.LoadConsumer(c.stream, c.consumer) fisk.FatalIfError(err, "could not load Consumer %s > %s", c.stream, c.consumer) diff --git a/cli/kv_command.go b/cli/kv_command.go index a66e57e3..268180fc 100644 --- a/cli/kv_command.go +++ b/cli/kv_command.go @@ -466,15 +466,10 @@ func (c *kvCommand) addAction(_ *fisk.ParseContext) error { } } - if len(c.sources) != 0 { - var sources []*nats.StreamSource - - for _, source := range c.sources { - sources = append(sources, &nats.StreamSource{ - Name: source, - }) - } - cfg.Sources = sources + for _, source := range c.sources { + cfg.Sources = append(cfg.Sources, &nats.StreamSource{ + Name: source, + }) } store, err := js.CreateKeyValue(cfg) @@ -761,12 +756,12 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error { if nfo.Config.MaxBytes == -1 { cols.AddRow("Maximum Bucket Size", "unlimited") } else { - cols.AddRow("Maximum Bucket Size", nfo.Config.MaxBytes) + cols.AddRow("Maximum Bucket Size", humanize.IBytes(uint64(nfo.Config.MaxBytes))) } if nfo.Config.MaxMsgSize == -1 { cols.AddRow("Maximum Value Size", "unlimited") } else { - cols.AddRow("Maximum Value Size", nfo.Config.MaxMsgSize) + cols.AddRow("Maximum Value Size", humanize.IBytes(uint64(nfo.Config.MaxMsgSize))) } if nfo.Config.MaxAge <= 0 { cols.AddRow("Maximum Age", "unlimited") diff --git a/cli/stream_command.go b/cli/stream_command.go index aa6b7fb5..3cec4d09 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -427,6 +427,7 @@ func (c *streamCmd) subjectsAction(_ *fisk.ParseContext) (err error) { cols := 1 countWidth := len(f(most)) table := newTableWriter(fmt.Sprintf("%d Subjects in stream %s", len(names), c.stream)) + switch { case longest+countWidth < 20: cols = 3 @@ -1055,6 +1056,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error { if len(info.State.Deleted) > 0 { deleted = len(info.State.Deleted) } + s := streamStat{ Name: info.Config.Name, Consumers: info.State.Consumers, @@ -1068,6 +1070,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error { Sources: info.Sources, Placement: info.Config.Placement, } + if info.State.Lost != nil { s.LostBytes = info.State.Lost.Bytes s.LostMsgs = len(info.State.Lost.Msgs) @@ -1085,6 +1088,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error { snode = dg.Node(source.Name) } edge := dg.Edge(snode, node).Attr("color", "green") + if source.FilterSubject != "" && source.SubjectTransformDest == "" { edge.Label(source.FilterSubject) } else if source.FilterSubject != "" && source.SubjectTransformDest != "" { @@ -1419,16 +1423,16 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex if cfg.RePublish != nil { repubConfig = *cfg.RePublish - } else { - repubConfig = api.RePublish{} } if c.repubSource != "" { repubConfig.Source = c.repubSource } + if c.repubDest != "" { repubConfig.Destination = c.repubDest } + if c.repubHeadersOnly { repubConfig.HeadersOnly = true } else if repubConfig.HeadersOnly { @@ -1449,8 +1453,6 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex if cfg.SubjectTransform != nil { subjectTransformConfig = *cfg.SubjectTransform - } else { - subjectTransformConfig = api.SubjectTransformConfig{} } if c.subjectTransformSource != "" { @@ -1460,12 +1462,6 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex subjectTransformConfig.Destination = c.subjectTransformDest } - if subjectTransformConfig.Source == "" && subjectTransformConfig.Destination == "" { - cfg.SubjectTransform = nil - } else { - cfg.SubjectTransform = &subjectTransformConfig - } - if c.noSubjectTransform { cfg.SubjectTransform = nil } @@ -1644,9 +1640,11 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) cols.AddRowIf("Sealed", true, cfg.Sealed) cols.AddRow("Storage", cfg.Storage.String()) cols.AddRowIf("Compression", cfg.Compression, cfg.Compression != api.NoCompression) + if cfg.FirstSeq > 0 { cols.AddRow("First Sequence", cfg.FirstSeq) } + if cfg.Placement != nil { cols.AddRowIfNotEmpty("Placement Cluster", cfg.Placement.Cluster) cols.AddRowIf("Placement Tags", cfg.Placement.Tags, len(cfg.Placement.Tags) > 0) @@ -1661,6 +1659,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) } cols.AddRowf("Subject Transform", "%s to %s", source, cfg.SubjectTransform.Destination) } + if cfg.RePublish != nil { if cfg.RePublish.HeadersOnly { cols.AddRowf("Republishing Headers", "%s to %s", cfg.RePublish.Source, cfg.RePublish.Destination) @@ -1671,6 +1670,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) cols.AddRow("Retention", cfg.Retention.String()) cols.AddRow("Acknowledgments", !cfg.NoAck) dnp := cfg.Discard.String() + if cfg.DiscardNewPer { dnp = "New Per Subject" } @@ -1723,6 +1723,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) if cfg.Mirror != nil || len(cfg.Sources) > 0 { cols.AddSectionTitle("Replication") } + cols.AddRowIfNotEmpty("Mirror", c.renderSource(cfg.Mirror)) if len(cfg.Sources) > 0 { @@ -1753,10 +1754,11 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string { if s.FilterSubject == "" && s.SubjectTransformDest == "" { parts = append(parts, s.Name) } else { - var filter = ">" + filter := ">" if s.FilterSubject != "" { filter = s.FilterSubject } + if s.SubjectTransformDest != "" { parts = append(parts, fmt.Sprintf("%s (%s to %s)", s.Name, filter, s.SubjectTransformDest)) } else { @@ -1771,10 +1773,12 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string { if s.OptStartTime != nil { parts = append(parts, fmt.Sprintf("Start Time: %v", s.OptStartTime)) } + if s.External != nil { if s.External.ApiPrefix != "" { parts = append(parts, fmt.Sprintf("API Prefix: %s", s.External.ApiPrefix)) } + if s.External.DeliverPrefix != "" { parts = append(parts, fmt.Sprintf("Delivery Prefix: %s", s.External.DeliverPrefix)) } @@ -1848,8 +1852,9 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { showSource := func(s *api.StreamSourceInfo) { cols.AddRow("Stream Name", s.Name) + if s.SubjectTransformDest != "" { - var filter = ">" + filter := ">" if s.FilterSubject != "" { filter = s.FilterSubject } @@ -1857,18 +1862,22 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { } else { cols.AddRowIfNotEmpty("Subject Filter", s.FilterSubject) } + cols.AddRow("Lag", s.Lag) + if s.Active > 0 && s.Active < math.MaxInt64 { cols.AddRow("Last Seen", s.Active) } else { cols.AddRow("Last Seen", "never") } + if s.External != nil { cols.AddRow("Ext. API Prefix", s.External.ApiPrefix) if s.External.DeliverPrefix != "" { cols.AddRow("Ext. Delivery Prefix", s.External.DeliverPrefix) } } + if s.Error != nil { cols.AddRow("Error", s.Error.Description) } @@ -1890,6 +1899,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { cols.AddSectionTitle("State") cols.AddRow("Messages", info.State.Msgs) cols.AddRow("Bytes", humanize.IBytes(info.State.Bytes)) + if info.State.Lost != nil && len(info.State.Lost.Msgs) > 0 { cols.AddRowf("Lost Messages", "%s (%s)", f(len(info.State.Lost.Msgs)), humanize.IBytes(info.State.Lost.Bytes)) } @@ -1899,11 +1909,13 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { } else { cols.AddRowf("First Sequence", "%s @ %s UTC", f(info.State.FirstSeq), f(info.State.FirstTime)) } + if info.State.LastTime.Equal(time.Unix(0, 0)) || info.State.LastTime.IsZero() { cols.AddRow("Last Sequence", info.State.LastSeq) } else { cols.AddRowf("Last Sequence", "%s @ %s UTC", f(info.State.LastSeq), f(info.State.LastTime)) } + if len(info.State.Deleted) > 0 { // backwards compat with older servers cols.AddRow("Deleted Messages", len(info.State.Deleted)) } else if info.State.NumDeleted > 0 { @@ -1915,6 +1927,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { if info.State.NumSubjects > 0 { cols.AddRow("Number of Subjects", info.State.NumSubjects) } + if len(info.Alternates) > 0 { lName := 0 lCluster := 0 @@ -1926,6 +1939,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) { lCluster = len(s.Cluster) } } + for i, s := range info.Alternates { msg := fmt.Sprintf("%s%s: Cluster: %s%s", strings.Repeat(" ", lName-len(s.Name)), s.Name, strings.Repeat(" ", lCluster-len(s.Cluster)), s.Cluster) if s.Domain != "" { @@ -2165,6 +2179,7 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } var maxAge time.Duration + if c.maxBytesLimit == 0 { reqd := "" defltSize := "-1" @@ -2205,11 +2220,13 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } var dupeWindow time.Duration + if c.dupeWindow == "" && c.mirror == "" { defaultDW := (2 * time.Minute).String() if maxAge > 0 && maxAge < 2*time.Minute { defaultDW = maxAge.String() } + if c.acceptDefaults { c.dupeWindow = defaultDW } else { @@ -2303,18 +2320,15 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } - if c.repubSource != "" || c.repubDest != "" { - cfg.RePublish = &api.RePublish{ - Source: c.repubSource, - Destination: c.repubDest, - HeadersOnly: c.repubHeadersOnly, - } + cfg.RePublish = &api.RePublish{ + Source: c.repubSource, + Destination: c.repubDest, + HeadersOnly: c.repubHeadersOnly, } if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { fisk.Fatalf("must specify both --transform-source and --transform-destination") - } - if c.subjectTransformSource != "" && c.subjectTransformDest != "" { + cfg.SubjectTransform = &api.SubjectTransformConfig{ Source: c.subjectTransformSource, Destination: c.subjectTransformDest, @@ -2477,6 +2491,7 @@ func (c *streamCmd) askSource(name string, prefix string) *api.StreamSource { func (c *streamCmd) parseStreamSource(source string) (*api.StreamSource, error) { ss := &api.StreamSource{} + if isJsonString(source) { err := json.Unmarshal([]byte(source), ss) if err != nil { @@ -2517,7 +2532,6 @@ func (c *streamCmd) validateCfg(cfg *api.StreamConfig) (bool, []byte, []string, } func (c *streamCmd) addAction(pc *fisk.ParseContext) (err error) { - _, mgr, err := prepareHelper("", natsOpts()...) fisk.FatalIfError(err, "could not create Stream") @@ -2738,6 +2752,7 @@ func (c *streamCmd) renderStreamsAsTable(streams []*jsm.Stream, missing []string } else { table = newTableWriter(fmt.Sprintf("Streams matching %s", c.filterSubject)) } + table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message") for _, s := range streams { nfo, _ := s.LatestInformation() diff --git a/go.mod b/go.mod index 8cd21212..1b7d1186 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/choria-io/fisk v0.5.2 github.com/dustin/go-humanize v1.0.1 - github.com/emicklei/dot v1.6.0 + github.com/emicklei/dot v1.5.0 github.com/fatih/color v1.15.0 github.com/ghodss/yaml v1.0.0 github.com/google/go-cmp v0.5.9 @@ -18,39 +18,39 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/klauspost/compress v1.16.7 github.com/mattn/go-isatty v0.0.19 - github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a - github.com/nats-io/nats-server/v2 v2.9.22-0.20230810031310-0f7fa284cc00 - github.com/nats-io/nats.go v1.28.0 + github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a + github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e + github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/common v0.44.0 - github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 + github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 github.com/tylertreat/hdrhistogram-writer v0.0.0-20210816161836-2e440612a39f github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 - golang.org/x/crypto v0.12.0 - golang.org/x/term v0.11.0 + golang.org/x/crypto v0.11.0 + golang.org/x/term v0.10.0 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/antonmedv/expr v1.12.7 // indirect + github.com/antonmedv/expr v1.12.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gosuri/uilive v0.0.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/procfs v0.11.1 // indirect + github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 85c4be4c..24956b40 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s= github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/antonmedv/expr v1.12.7 h1:jfV/l/+dHWAadLwAtESXNxXdfbK9bE4+FNMHYCMntwk= -github.com/antonmedv/expr v1.12.7/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= +github.com/antonmedv/expr v1.12.5 h1:Fq4okale9swwL3OeLLs9WD9H6GbgBLJyN/NUHRv+n0E= +github.com/antonmedv/expr v1.12.5/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -24,8 +24,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/emicklei/dot v1.6.0 h1:vUzuoVE8ipzS7QkES4UfxdpCwdU2U97m2Pb2tQCoYRY= -github.com/emicklei/dot v1.6.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/emicklei/dot v1.5.0 h1:tc9eKdCBTgoR68vJ6OcgMtI0SdrGDwLPPVaPA6XhX50= +github.com/emicklei/dot v1.5.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= @@ -71,8 +71,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= -github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= @@ -82,12 +82,14 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a h1:AAB4LXHKHgH5+TM+c21GluPaM/89trxEPIzE8YVAL1w= github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a/go.mod h1:gtvtVHnRzl9KoOlPdvmGQLqsKG9H3QkNa2D1vkkNq24= +github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a h1:NFY9/gAHlxF9tiaiQFxI4bTE4NEfwSSs98FMzaWFEy4= +github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a/go.mod h1:gtvtVHnRzl9KoOlPdvmGQLqsKG9H3QkNa2D1vkkNq24= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.22-0.20230810031310-0f7fa284cc00 h1:L+11xBhf6HDl5GZheTnanpwm0rnmpQLQ44z7n98243c= -github.com/nats-io/nats-server/v2 v2.9.22-0.20230810031310-0f7fa284cc00/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e h1:7CezZb5EZKnDLJSnqpcz2RDDHXKwowCbOPw/Y6oZOEQ= +github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= +github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f h1:G5p5Ehrr7ma7Ryr4+qrD5he/VP+o+sdTNscbnlqwIC4= +github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -102,14 +104,14 @@ github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUo github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= -github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= -github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= -github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 h1:uIkTLo0AGRc8l7h5l9r+GcYi9qfVPt6lD4/bhmzfiKo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -126,8 +128,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -144,8 +146,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -161,18 +163,18 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= -golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c= +golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= From 4ba45ec94bebfb00b0ae19e46c8779371b5dafd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 10 Aug 2023 17:40:26 -0700 Subject: [PATCH 4/8] Missing humanizes to IBytes Clean up republish flag Simplify and fix config checking for republish Add empty lines around control statements to improve readability Update go.mod for nats.go to be able to build --- cli/kv_command.go | 2 +- cli/stream_command.go | 82 +++++++++++++++++++++---------------------- go.mod | 7 ++-- go.sum | 12 ++++--- 4 files changed, 54 insertions(+), 49 deletions(-) diff --git a/cli/kv_command.go b/cli/kv_command.go index 268180fc..cacc1d0b 100644 --- a/cli/kv_command.go +++ b/cli/kv_command.go @@ -752,7 +752,7 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error { if nfo != nil { cols.AddRowIfNotEmpty("Description", nfo.Config.Description) - cols.AddRow("Bucket Size", nfo.State.Bytes) + cols.AddRow("Bucket Size", humanize.IBytes(nfo.State.Bytes)) if nfo.Config.MaxBytes == -1 { cols.AddRow("Maximum Bucket Size", "unlimited") } else { diff --git a/cli/stream_command.go b/cli/stream_command.go index 3cec4d09..66ca4cd5 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -208,7 +208,7 @@ func configureStreamCommand(app commandHost) { f.Flag("metadata", "Adds metadata to the stream").PlaceHolder("META").IsSetByUser(&c.metadataIsSet).StringMapVar(&c.metadata) f.Flag("republish-source", "Republish messages to --republish-destination").PlaceHolder("SOURCE").StringVar(&c.repubSource) f.Flag("republish-destination", "Republish destination for messages in --republish-source").PlaceHolder("DEST").StringVar(&c.repubDest) - f.Flag("republish-headers", "Republish only message headers, no bodies").BoolVar(&c.repubHeadersOnly) + f.Flag("republish-headers", "Republish only message headers, no bodies").UnNegatableBoolVar(&c.repubHeadersOnly) if edit { f.Flag("no-republish", "Removes current republish configuration").UnNegatableBoolVar(&c.noRepub) f.Flag("no-transform", "Removes current subject transform configuration").UnNegatableBoolVar(&c.noSubjectTransform) @@ -1419,51 +1419,43 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex return cfg, fmt.Errorf("invalid compression algorithm") } - var repubConfig api.RePublish - - if cfg.RePublish != nil { - repubConfig = *cfg.RePublish - } - - if c.repubSource != "" { - repubConfig.Source = c.repubSource - } - - if c.repubDest != "" { - repubConfig.Destination = c.repubDest - } - - if c.repubHeadersOnly { - repubConfig.HeadersOnly = true - } else if repubConfig.HeadersOnly { - repubConfig.HeadersOnly = false - } - - if (repubConfig.Source == "" || repubConfig.Source == ">") && repubConfig.Destination == "" { + if c.noRepub { cfg.RePublish = nil } else { - cfg.RePublish = &repubConfig - } + var repubConfig api.RePublish - if c.noRepub { - cfg.RePublish = nil - } + if cfg.RePublish != nil { + repubConfig = *cfg.RePublish + } - var subjectTransformConfig api.SubjectTransformConfig + if c.repubSource != "" { + repubConfig.Source = c.repubSource + } - if cfg.SubjectTransform != nil { - subjectTransformConfig = *cfg.SubjectTransform - } + if c.repubDest != "" { + repubConfig.Destination = c.repubDest + } - if c.subjectTransformSource != "" { - subjectTransformConfig.Source = c.subjectTransformSource - } - if c.subjectTransformDest != "" { - subjectTransformConfig.Destination = c.subjectTransformDest + repubConfig.HeadersOnly = c.repubHeadersOnly + cfg.RePublish = &repubConfig } if c.noSubjectTransform { cfg.SubjectTransform = nil + } else { + var subjectTransformConfig api.SubjectTransformConfig + + if cfg.SubjectTransform != nil { + subjectTransformConfig = *cfg.SubjectTransform + } + + if c.subjectTransformSource != "" { + subjectTransformConfig.Source = c.subjectTransformSource + } + + if c.subjectTransformDest != "" { + subjectTransformConfig.Destination = c.subjectTransformDest + } } return cfg, nil @@ -1697,7 +1689,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) if cfg.MaxBytes == -1 { cols.AddRow("Maximum Bytes", "unlimited") } else { - cols.AddRow("Maximum Bytes", cfg.MaxBytes) + cols.AddRow("Maximum Bytes", humanize.IBytes(uint64(cfg.MaxBytes))) } if cfg.MaxAge <= 0 { cols.AddRow("Maximum Age", "unlimited") @@ -2320,15 +2312,23 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } - cfg.RePublish = &api.RePublish{ - Source: c.repubSource, - Destination: c.repubDest, - HeadersOnly: c.repubHeadersOnly, + if (c.repubSource != "" && c.repubDest == "") || (c.repubSource == "" && c.repubDest != "") || (c.repubHeadersOnly && (c.repubSource == "" || c.repubDest == "")) { + fisk.Fatalf("must specify both --republish-source and --republish-destination") + } + + if c.repubSource != "" && c.repubDest != "" { + cfg.RePublish = &api.RePublish{ + Source: c.repubSource, + Destination: c.repubDest, + HeadersOnly: c.repubHeadersOnly, + } } if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { fisk.Fatalf("must specify both --transform-source and --transform-destination") + } + if c.subjectTransformSource != "" && c.subjectTransformDest != "" { cfg.SubjectTransform = &api.SubjectTransformConfig{ Source: c.subjectTransformSource, Destination: c.subjectTransformDest, diff --git a/go.mod b/go.mod index 1b7d1186..cdb3a706 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/klauspost/compress v1.16.7 github.com/mattn/go-isatty v0.0.19 - github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a - github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e + github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a + github.com/nats-io/nats-server/v2 v2.9.22-0.20230810081221-37d3220dfba3 github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.16.0 @@ -48,7 +48,8 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect - golang.org/x/net v0.12.0 // indirect + go.uber.org/automaxprocs v1.5.3 // indirect + golang.org/x/net v0.11.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index 24956b40..65dc3534 100644 --- a/go.sum +++ b/go.sum @@ -82,12 +82,14 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a h1:AAB4LXHKHgH5+TM+c21GluPaM/89trxEPIzE8YVAL1w= github.com/nats-io/jsm.go v0.0.36-0.20230727100143-0353b93c9d5a/go.mod h1:gtvtVHnRzl9KoOlPdvmGQLqsKG9H3QkNa2D1vkkNq24= -github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a h1:NFY9/gAHlxF9tiaiQFxI4bTE4NEfwSSs98FMzaWFEy4= -github.com/nats-io/jsm.go v0.0.36-0.20230810092625-fdb1b7aec83a/go.mod h1:gtvtVHnRzl9KoOlPdvmGQLqsKG9H3QkNa2D1vkkNq24= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e h1:7CezZb5EZKnDLJSnqpcz2RDDHXKwowCbOPw/Y6oZOEQ= github.com/nats-io/nats-server/v2 v2.9.21-0.20230727091008-b22cdf18fe7e/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= +github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk= +github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= +github.com/nats-io/nats-server/v2 v2.9.22-0.20230810081221-37d3220dfba3 h1:41IJRms/ujdqlYOW+KW6YVZz1Max4RD63ZoYuMfKnrk= +github.com/nats-io/nats-server/v2 v2.9.22-0.20230810081221-37d3220dfba3/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f h1:G5p5Ehrr7ma7Ryr4+qrD5he/VP+o+sdTNscbnlqwIC4= github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= @@ -125,6 +127,8 @@ github.com/tylertreat/hdrhistogram-writer v0.0.0-20210816161836-2e440612a39f/go. github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 h1:gmD7q6cCJfBbcuobWQe/KzLsd9Cd3amS1Mq5f3uU1qo= github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5/go.mod h1:fVwOndYN3s5IaGlMucfgxwMhqwcaJtlGejBU6zX6Yxw= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= +go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -146,8 +150,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= From 0f60ce9ede2ac6af32c32f7c9e269f17d0214f9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 10 Aug 2023 23:58:49 -0700 Subject: [PATCH 5/8] Fix stream copy Improved checking of republish and transform arguments being passed in create, copy and edit --- cli/stream_command.go | 43 ++++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/cli/stream_command.go b/cli/stream_command.go index 66ca4cd5..200bb9d9 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -1280,6 +1280,16 @@ func (c *streamCmd) loadConfigFile(file string) (*api.StreamConfig, error) { return &cfg, nil } +func (c *streamCmd) checkRepubTransform() { + if (c.repubSource != "" && c.repubDest == "") || (c.repubSource == "" && c.repubDest != "") || (c.repubHeadersOnly && (c.repubSource == "" || c.repubDest == "")) { + fisk.Fatalf("must specify both --republish-source and --republish-destination") + } + + if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { + fisk.Fatalf("must specify both --transform-source and --transform-destination") + } +} + func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContext) (api.StreamConfig, error) { var err error @@ -1296,6 +1306,8 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex return *cfg, nil } + c.checkRepubTransform() + cfg.NoAck = !c.ack if c.discardPolicy != "" { @@ -1419,25 +1431,12 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex return cfg, fmt.Errorf("invalid compression algorithm") } - if c.noRepub { - cfg.RePublish = nil - } else { - var repubConfig api.RePublish - - if cfg.RePublish != nil { - repubConfig = *cfg.RePublish - } - - if c.repubSource != "" { - repubConfig.Source = c.repubSource - } - - if c.repubDest != "" { - repubConfig.Destination = c.repubDest + if !c.noRepub && c.repubSource != "" && c.repubDest != "" { + cfg.RePublish = &api.RePublish{ + Source: c.repubSource, + Destination: c.repubDest, + HeadersOnly: c.repubHeadersOnly, } - - repubConfig.HeadersOnly = c.repubHeadersOnly - cfg.RePublish = &repubConfig } if c.noSubjectTransform { @@ -2312,9 +2311,7 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } - if (c.repubSource != "" && c.repubDest == "") || (c.repubSource == "" && c.repubDest != "") || (c.repubHeadersOnly && (c.repubSource == "" || c.repubDest == "")) { - fisk.Fatalf("must specify both --republish-source and --republish-destination") - } + c.checkRepubTransform() if c.repubSource != "" && c.repubDest != "" { cfg.RePublish = &api.RePublish{ @@ -2324,10 +2321,6 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St } } - if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { - fisk.Fatalf("must specify both --transform-source and --transform-destination") - } - if c.subjectTransformSource != "" && c.subjectTransformDest != "" { cfg.SubjectTransform = &api.SubjectTransformConfig{ Source: c.subjectTransformSource, From 9d4d9b5e755424194bcdb73f280ca6897483f130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Fri, 11 Aug 2023 00:05:28 -0700 Subject: [PATCH 6/8] Mention --headers-only in the error message --- cli/stream_command.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/cli/stream_command.go b/cli/stream_command.go index 200bb9d9..85c7f32e 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -1282,11 +1282,22 @@ func (c *streamCmd) loadConfigFile(file string) (*api.StreamConfig, error) { func (c *streamCmd) checkRepubTransform() { if (c.repubSource != "" && c.repubDest == "") || (c.repubSource == "" && c.repubDest != "") || (c.repubHeadersOnly && (c.repubSource == "" || c.repubDest == "")) { - fisk.Fatalf("must specify both --republish-source and --republish-destination") + msg := "must specify both --republish-source and --republish-destination" + + if c.repubHeadersOnly { + msg = msg + " when using --headers-only" + } + fisk.Fatalf(msg) } if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") { - fisk.Fatalf("must specify both --transform-source and --transform-destination") + msg := "must specify both --transform-source and --transform-destination" + + if c.repubHeadersOnly { + msg = msg + " when using --headers-only" + } + + fisk.Fatalf(msg) } } From e462d19b61c0350a24b91e253d912101504eaaba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Fri, 11 Aug 2023 00:05:52 -0700 Subject: [PATCH 7/8] Format --- cli/stream_command.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/stream_command.go b/cli/stream_command.go index 85c7f32e..63815d71 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -1287,6 +1287,7 @@ func (c *streamCmd) checkRepubTransform() { if c.repubHeadersOnly { msg = msg + " when using --headers-only" } + fisk.Fatalf(msg) } From 48b8e9d7daee41ad0de63e50e1157e1e30f579c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Fri, 11 Aug 2023 00:11:07 -0700 Subject: [PATCH 8/8] Humanize --- cli/stream_command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/stream_command.go b/cli/stream_command.go index 63815d71..0662ff23 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -1710,7 +1710,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig) if cfg.MaxMsgSize == -1 { cols.AddRow("Maximum Message Size", "unlimited") } else { - cols.AddRow("Maximum Message Size", cfg.MaxMsgSize) + cols.AddRow("Maximum Message Size", humanize.IBytes(uint64(cfg.MaxMsgSize))) } if cfg.MaxConsumers == -1 { cols.AddRow("Maximum Consumers", "unlimited")