diff --git a/cli/stream_command.go b/cli/stream_command.go index 263a846f..7d6ca06f 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -375,6 +375,12 @@ Finding streams with certain subjects configured: strSeal.Arg("stream", "The name of the Stream to seal").Required().StringVar(&c.stream) strSeal.Flag("force", "Force sealing without prompting").Short('f').UnNegatableBoolVar(&c.force) + gapDetect := str.Command("gaps", "Detect gaps in the Stream content that would be reported as deleted messages").Action(c.detectGaps) + gapDetect.Arg("stream", "Stream to act on").StringVar(&c.stream) + gapDetect.Flag("force", "Act without prompting").Short('f').UnNegatableBoolVar(&c.force) + gapDetect.Flag("progress", "Enable progress bar").Default("true").BoolVar(&c.showProgress) + gapDetect.Flag("json", "Show detected gaps in JSON format").UnNegatableBoolVar(&c.json) + strCluster := str.Command("cluster", "Manages a clustered Stream").Alias("c") strClusterDown := strCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("stepdown").Alias("sd").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown) strClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream) @@ -389,6 +395,106 @@ func init() { registerCommand("stream", 16, configureStreamCommand) } +func (c *streamCmd) detectGaps(_ *fisk.ParseContext) error { + c.connectAndAskStream() + + stream, err := c.loadStream(c.stream) + if err != nil { + return err + } + + info, err := stream.LatestInformation() + if err != nil { + return err + } + + if info.State.NumDeleted == 0 { + if c.json { + fmt.Println("{}") + } + + fmt.Printf("No deleted messages in %s\n", c.stream) + return nil + } + + if !c.force { + fmt.Println("WARNING: Detecting gaps in a stream consumes the entire stream and can be resource intensive on the Server, Client and Network.") + fmt.Println() + ok, err := askConfirmation(fmt.Sprintf("Really detect gaps in stream %s with %s messages and %s bytes", c.stream, humanize.Comma(int64(info.State.Msgs)), humanize.IBytes(info.State.Bytes)), false) + fisk.FatalIfError(err, "could not obtain confirmation") + + if !ok { + return nil + } + } + + if c.json { + c.showProgress = false + } + + var progress *uiprogress.Bar + var gaps [][2]uint64 + var cnt int + + progressCb := func(seq uint64, pending uint64) { + if !c.showProgress { + return + } + if progress == nil { + progress = uiprogress.AddBar(int(pending)).AppendCompleted().PrependFunc(func(b *uiprogress.Bar) string { + return fmt.Sprintf("%s / %s", f(b.Current()), f(b.Total)) + }) + uiprogress.Start() + } + cnt++ + + progress.Set(int(seq)) + + if pending == 0 { + progress.Set(progress.Total) + } + } + + gapCb := func(start, end uint64) { + gaps = append(gaps, [2]uint64{start, end}) + } + + err = stream.DetectGaps(ctx, progressCb, gapCb) + if progress != nil { + time.Sleep(250 * time.Millisecond) // let it draw + uiprogress.Stop() + fmt.Println() + } + if err != nil { + return err + } + + if c.json { + printJSON(gaps) + return nil + } + + if len(gaps) == 0 { + fmt.Printf("No deleted messages in %s\n", c.stream) + return nil + } + + var table *tbl + if len(gaps) == 1 { + table = newTableWriter(fmt.Sprintf("1 gap found in Stream %s", c.stream)) + } else { + table = newTableWriter(fmt.Sprintf("%s gaps found in Stream %s", f(len(gaps)), c.stream)) + } + + table.AddHeaders("First Message", "Last Message") + for _, gap := range gaps { + table.AddRow(f(gap[0]), f(gap[1])) + } + fmt.Println(table.Render()) + + return nil +} + func (c *streamCmd) subjectsAction(_ *fisk.ParseContext) (err error) { asked := c.connectAndAskStream() diff --git a/go.mod b/go.mod index 67a54e0d..5818db03 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/klauspost/compress v1.16.7 github.com/mattn/go-isatty v0.0.19 - github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9 + github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1 github.com/nats-io/nats-server/v2 v2.9.22-0.20230812221748-b839c53abcec github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index c2297b9e..266073f9 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9 h1:iovWg573DqMS60WskqkrsjMyhPoTSDKStjs8dA+FYP8= github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9/go.mod h1:MyB1zGGllqlJdUXUJBKPI93fajK7LknUVQebNEMmod4= +github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1 h1:FyouF/B3HNh7BLkpkqL3iz0wS0OFpUsBSgM+M/DUwNI= +github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1/go.mod h1:MyB1zGGllqlJdUXUJBKPI93fajK7LknUVQebNEMmod4= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.9.22-0.20230812221748-b839c53abcec h1:WlIeTCfqbb94PZXcpjLnikseyGQv2lLMNIew0eF1esw=