Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
Add empty lines around control statements to improve readability
Update go.mod for nats.go to the 2.10.0 branch current latest commit
  • Loading branch information
jnmoyne committed Aug 10, 2023
1 parent 9dd9a94 commit e0237b0
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 69 deletions.
8 changes: 8 additions & 0 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,16 +572,19 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
cols.AddRowIfNotEmpty("Name", config.Name)
cols.AddRowIf("Durable Name", config.Durable, config.Durable != "" && config.Durable != config.Name)
cols.AddRowIfNotEmpty("Description", config.Description)

if config.DeliverSubject != "" {
cols.AddRow("Delivery Subject", config.DeliverSubject)
} else {
cols.AddRow("Pull Mode", true)
}

if config.FilterSubject != "" {
cols.AddRow("Filter Subject", config.FilterSubject)
} else if len(config.FilterSubjects) > 0 {
cols.AddRow("Filter Subjects", config.FilterSubjects)
}

switch config.DeliverPolicy {
case api.DeliverAll:
cols.AddRow("Deliver Policy", "All")
Expand All @@ -596,6 +599,7 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
case api.DeliverByStartSequence:
cols.AddRowf("Deliver Policy", "From Sequence %d", config.OptStartSeq)
}

cols.AddRowIf("Deliver Queue Group", config.DeliverGroup, config.DeliverGroup != "" && config.DeliverSubject != "")
cols.AddRow("Ack Policy", config.AckPolicy.String())
cols.AddRowIf("Ack Wait", config.AckWait, config.AckPolicy != api.AckNone)
Expand Down Expand Up @@ -653,6 +657,7 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
} else {
cols.AddRowf("Last Delivered Message", "Consumer sequence: %s Stream sequence: %s Last delivery: %s ago", f(state.Delivered.Consumer), f(state.Delivered.Stream), f(sinceRefOrNow(state.TimeStamp, *state.Delivered.Last)))
}

if config.AckPolicy != api.AckNone {
if state.AckFloor.Last == nil {
cols.AddRowf("Acknowledgment Floor", "Consumer sequence: %s Stream sequence: %s", f(state.AckFloor.Consumer), f(state.AckFloor.Stream))
Expand All @@ -666,7 +671,9 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
}
cols.AddRow("Redelivered Messages", state.NumRedelivered)
}

cols.AddRow("Unprocessed Messages", state.NumPending)

if config.DeliverSubject == "" {
if config.MaxWaiting > 0 {
cols.AddRowf("Waiting Pulls", "%s of maximum %s", f(state.NumWaiting), f(config.MaxWaiting))
Expand All @@ -693,6 +700,7 @@ func (c *consumerCmd) infoAction(_ *fisk.ParseContext) error {

var err error
consumer := c.selectedConsumer

if consumer == nil {
consumer, err = c.mgr.LoadConsumer(c.stream, c.consumer)
fisk.FatalIfError(err, "could not load Consumer %s > %s", c.stream, c.consumer)
Expand Down
17 changes: 6 additions & 11 deletions cli/kv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,10 @@ func (c *kvCommand) addAction(_ *fisk.ParseContext) error {
}
}

if len(c.sources) != 0 {
var sources []*nats.StreamSource

for _, source := range c.sources {
sources = append(sources, &nats.StreamSource{
Name: source,
})
}
cfg.Sources = sources
for _, source := range c.sources {
cfg.Sources = append(cfg.Sources, &nats.StreamSource{
Name: source,
})
}

store, err := js.CreateKeyValue(cfg)
Expand Down Expand Up @@ -761,12 +756,12 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error {
if nfo.Config.MaxBytes == -1 {
cols.AddRow("Maximum Bucket Size", "unlimited")
} else {
cols.AddRow("Maximum Bucket Size", nfo.Config.MaxBytes)
cols.AddRow("Maximum Bucket Size", humanize.IBytes(uint64(nfo.Config.MaxBytes)))
}
if nfo.Config.MaxMsgSize == -1 {
cols.AddRow("Maximum Value Size", "unlimited")
} else {
cols.AddRow("Maximum Value Size", nfo.Config.MaxMsgSize)
cols.AddRow("Maximum Value Size", humanize.IBytes(uint64(nfo.Config.MaxMsgSize)))
}
if nfo.Config.MaxAge <= 0 {
cols.AddRow("Maximum Age", "unlimited")
Expand Down
57 changes: 36 additions & 21 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (c *streamCmd) subjectsAction(_ *fisk.ParseContext) (err error) {
cols := 1
countWidth := len(f(most))
table := newTableWriter(fmt.Sprintf("%d Subjects in stream %s", len(names), c.stream))

switch {
case longest+countWidth < 20:
cols = 3
Expand Down Expand Up @@ -1055,6 +1056,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error {
if len(info.State.Deleted) > 0 {
deleted = len(info.State.Deleted)
}

s := streamStat{
Name: info.Config.Name,
Consumers: info.State.Consumers,
Expand All @@ -1068,6 +1070,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error {
Sources: info.Sources,
Placement: info.Config.Placement,
}

if info.State.Lost != nil {
s.LostBytes = info.State.Lost.Bytes
s.LostMsgs = len(info.State.Lost.Msgs)
Expand All @@ -1085,6 +1088,7 @@ func (c *streamCmd) reportAction(_ *fisk.ParseContext) error {
snode = dg.Node(source.Name)
}
edge := dg.Edge(snode, node).Attr("color", "green")

if source.FilterSubject != "" && source.SubjectTransformDest == "" {
edge.Label(source.FilterSubject)
} else if source.FilterSubject != "" && source.SubjectTransformDest != "" {
Expand Down Expand Up @@ -1419,16 +1423,16 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex

if cfg.RePublish != nil {
repubConfig = *cfg.RePublish
} else {
repubConfig = api.RePublish{}
}

if c.repubSource != "" {
repubConfig.Source = c.repubSource
}

if c.repubDest != "" {
repubConfig.Destination = c.repubDest
}

if c.repubHeadersOnly {
repubConfig.HeadersOnly = true
} else if repubConfig.HeadersOnly {
Expand All @@ -1449,8 +1453,6 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex

if cfg.SubjectTransform != nil {
subjectTransformConfig = *cfg.SubjectTransform
} else {
subjectTransformConfig = api.SubjectTransformConfig{}
}

if c.subjectTransformSource != "" {
Expand All @@ -1460,12 +1462,6 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig, pc *fisk.ParseContex
subjectTransformConfig.Destination = c.subjectTransformDest
}

if subjectTransformConfig.Source == "" && subjectTransformConfig.Destination == "" {
cfg.SubjectTransform = nil
} else {
cfg.SubjectTransform = &subjectTransformConfig
}

if c.noSubjectTransform {
cfg.SubjectTransform = nil
}
Expand Down Expand Up @@ -1644,9 +1640,11 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig)
cols.AddRowIf("Sealed", true, cfg.Sealed)
cols.AddRow("Storage", cfg.Storage.String())
cols.AddRowIf("Compression", cfg.Compression, cfg.Compression != api.NoCompression)

if cfg.FirstSeq > 0 {
cols.AddRow("First Sequence", cfg.FirstSeq)
}

if cfg.Placement != nil {
cols.AddRowIfNotEmpty("Placement Cluster", cfg.Placement.Cluster)
cols.AddRowIf("Placement Tags", cfg.Placement.Tags, len(cfg.Placement.Tags) > 0)
Expand All @@ -1661,6 +1659,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig)
}
cols.AddRowf("Subject Transform", "%s to %s", source, cfg.SubjectTransform.Destination)
}

if cfg.RePublish != nil {
if cfg.RePublish.HeadersOnly {
cols.AddRowf("Republishing Headers", "%s to %s", cfg.RePublish.Source, cfg.RePublish.Destination)
Expand All @@ -1671,6 +1670,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig)
cols.AddRow("Retention", cfg.Retention.String())
cols.AddRow("Acknowledgments", !cfg.NoAck)
dnp := cfg.Discard.String()

if cfg.DiscardNewPer {
dnp = "New Per Subject"
}
Expand Down Expand Up @@ -1723,6 +1723,7 @@ func (c *streamCmd) showStreamConfig(cols *columns.Writer, cfg api.StreamConfig)
if cfg.Mirror != nil || len(cfg.Sources) > 0 {
cols.AddSectionTitle("Replication")
}

cols.AddRowIfNotEmpty("Mirror", c.renderSource(cfg.Mirror))

if len(cfg.Sources) > 0 {
Expand Down Expand Up @@ -1753,10 +1754,11 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string {
if s.FilterSubject == "" && s.SubjectTransformDest == "" {
parts = append(parts, s.Name)
} else {
var filter = ">"
filter := ">"
if s.FilterSubject != "" {
filter = s.FilterSubject
}

if s.SubjectTransformDest != "" {
parts = append(parts, fmt.Sprintf("%s (%s to %s)", s.Name, filter, s.SubjectTransformDest))
} else {
Expand All @@ -1771,10 +1773,12 @@ func (c *streamCmd) renderSource(s *api.StreamSource) string {
if s.OptStartTime != nil {
parts = append(parts, fmt.Sprintf("Start Time: %v", s.OptStartTime))
}

if s.External != nil {
if s.External.ApiPrefix != "" {
parts = append(parts, fmt.Sprintf("API Prefix: %s", s.External.ApiPrefix))
}

if s.External.DeliverPrefix != "" {
parts = append(parts, fmt.Sprintf("Delivery Prefix: %s", s.External.DeliverPrefix))
}
Expand Down Expand Up @@ -1848,27 +1852,32 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) {

showSource := func(s *api.StreamSourceInfo) {
cols.AddRow("Stream Name", s.Name)

if s.SubjectTransformDest != "" {
var filter = ">"
filter := ">"
if s.FilterSubject != "" {
filter = s.FilterSubject
}
cols.AddRowf("Subject Transform", "%s to %s", filter, s.SubjectTransformDest)
} else {
cols.AddRowIfNotEmpty("Subject Filter", s.FilterSubject)
}

cols.AddRow("Lag", s.Lag)

if s.Active > 0 && s.Active < math.MaxInt64 {
cols.AddRow("Last Seen", s.Active)
} else {
cols.AddRow("Last Seen", "never")
}

if s.External != nil {
cols.AddRow("Ext. API Prefix", s.External.ApiPrefix)
if s.External.DeliverPrefix != "" {
cols.AddRow("Ext. Delivery Prefix", s.External.DeliverPrefix)
}
}

if s.Error != nil {
cols.AddRow("Error", s.Error.Description)
}
Expand All @@ -1890,6 +1899,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) {
cols.AddSectionTitle("State")
cols.AddRow("Messages", info.State.Msgs)
cols.AddRow("Bytes", humanize.IBytes(info.State.Bytes))

if info.State.Lost != nil && len(info.State.Lost.Msgs) > 0 {
cols.AddRowf("Lost Messages", "%s (%s)", f(len(info.State.Lost.Msgs)), humanize.IBytes(info.State.Lost.Bytes))
}
Expand All @@ -1899,11 +1909,13 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) {
} else {
cols.AddRowf("First Sequence", "%s @ %s UTC", f(info.State.FirstSeq), f(info.State.FirstTime))
}

if info.State.LastTime.Equal(time.Unix(0, 0)) || info.State.LastTime.IsZero() {
cols.AddRow("Last Sequence", info.State.LastSeq)
} else {
cols.AddRowf("Last Sequence", "%s @ %s UTC", f(info.State.LastSeq), f(info.State.LastTime))
}

if len(info.State.Deleted) > 0 { // backwards compat with older servers
cols.AddRow("Deleted Messages", len(info.State.Deleted))
} else if info.State.NumDeleted > 0 {
Expand All @@ -1915,6 +1927,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) {
if info.State.NumSubjects > 0 {
cols.AddRow("Number of Subjects", info.State.NumSubjects)
}

if len(info.Alternates) > 0 {
lName := 0
lCluster := 0
Expand All @@ -1926,6 +1939,7 @@ func (c *streamCmd) showStreamInfo(info *api.StreamInfo) {
lCluster = len(s.Cluster)
}
}

for i, s := range info.Alternates {
msg := fmt.Sprintf("%s%s: Cluster: %s%s", strings.Repeat(" ", lName-len(s.Name)), s.Name, strings.Repeat(" ", lCluster-len(s.Cluster)), s.Cluster)
if s.Domain != "" {
Expand Down Expand Up @@ -2165,6 +2179,7 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St
}

var maxAge time.Duration

if c.maxBytesLimit == 0 {
reqd := ""
defltSize := "-1"
Expand Down Expand Up @@ -2205,11 +2220,13 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St
}

var dupeWindow time.Duration

if c.dupeWindow == "" && c.mirror == "" {
defaultDW := (2 * time.Minute).String()
if maxAge > 0 && maxAge < 2*time.Minute {
defaultDW = maxAge.String()
}

if c.acceptDefaults {
c.dupeWindow = defaultDW
} else {
Expand Down Expand Up @@ -2303,18 +2320,15 @@ func (c *streamCmd) prepareConfig(_ *fisk.ParseContext, requireSize bool) api.St
}
}

if c.repubSource != "" || c.repubDest != "" {
cfg.RePublish = &api.RePublish{
Source: c.repubSource,
Destination: c.repubDest,
HeadersOnly: c.repubHeadersOnly,
}
cfg.RePublish = &api.RePublish{
Source: c.repubSource,
Destination: c.repubDest,
HeadersOnly: c.repubHeadersOnly,
}

if (c.subjectTransformSource != "" && c.subjectTransformDest == "") || (c.subjectTransformSource == "" && c.subjectTransformDest != "") {
fisk.Fatalf("must specify both --transform-source and --transform-destination")
}
if c.subjectTransformSource != "" && c.subjectTransformDest != "" {

cfg.SubjectTransform = &api.SubjectTransformConfig{
Source: c.subjectTransformSource,
Destination: c.subjectTransformDest,
Expand Down Expand Up @@ -2477,6 +2491,7 @@ func (c *streamCmd) askSource(name string, prefix string) *api.StreamSource {

func (c *streamCmd) parseStreamSource(source string) (*api.StreamSource, error) {
ss := &api.StreamSource{}

if isJsonString(source) {
err := json.Unmarshal([]byte(source), ss)
if err != nil {
Expand Down Expand Up @@ -2517,7 +2532,6 @@ func (c *streamCmd) validateCfg(cfg *api.StreamConfig) (bool, []byte, []string,
}

func (c *streamCmd) addAction(pc *fisk.ParseContext) (err error) {

_, mgr, err := prepareHelper("", natsOpts()...)
fisk.FatalIfError(err, "could not create Stream")

Expand Down Expand Up @@ -2738,6 +2752,7 @@ func (c *streamCmd) renderStreamsAsTable(streams []*jsm.Stream, missing []string
} else {
table = newTableWriter(fmt.Sprintf("Streams matching %s", c.filterSubject))
}

table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message")
for _, s := range streams {
nfo, _ := s.LatestInformation()
Expand Down
Loading

0 comments on commit e0237b0

Please sign in to comment.