Skip to content

Commit

Permalink
support detecting interior deletes in streams
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Aug 23, 2023
1 parent 1f8d13e commit 43ffbea
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
106 changes: 106 additions & 0 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ Finding streams with certain subjects configured:
strSeal.Arg("stream", "The name of the Stream to seal").Required().StringVar(&c.stream)
strSeal.Flag("force", "Force sealing without prompting").Short('f').UnNegatableBoolVar(&c.force)

gapDetect := str.Command("gaps", "Detect gaps in the Stream content that would be reported as deleted messages").Action(c.detectGaps)
gapDetect.Arg("stream", "Stream to act on").StringVar(&c.stream)
gapDetect.Flag("force", "Act without prompting").Short('f').UnNegatableBoolVar(&c.force)
gapDetect.Flag("progress", "Enable progress bar").Default("true").BoolVar(&c.showProgress)
gapDetect.Flag("json", "Show detected gaps in JSON format").UnNegatableBoolVar(&c.json)

strCluster := str.Command("cluster", "Manages a clustered Stream").Alias("c")
strClusterDown := strCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("stepdown").Alias("sd").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown)
strClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
Expand All @@ -389,6 +395,106 @@ func init() {
registerCommand("stream", 16, configureStreamCommand)
}

func (c *streamCmd) detectGaps(_ *fisk.ParseContext) error {
c.connectAndAskStream()

stream, err := c.loadStream(c.stream)
if err != nil {
return err
}

info, err := stream.LatestInformation()
if err != nil {
return err
}

if info.State.NumDeleted == 0 {
if c.json {
fmt.Println("{}")
}

fmt.Printf("No deleted messages in %s\n", c.stream)
return nil
}

if !c.force {
fmt.Println("WARNING: Detecting gaps in a stream consumes the entire stream and can be resource intensive on the Server, Client and Network.")
fmt.Println()
ok, err := askConfirmation(fmt.Sprintf("Really detect gaps in stream %s with %s messages and %s bytes", c.stream, humanize.Comma(int64(info.State.Msgs)), humanize.IBytes(info.State.Bytes)), false)
fisk.FatalIfError(err, "could not obtain confirmation")

if !ok {
return nil
}
}

if c.json {
c.showProgress = false
}

var progress *uiprogress.Bar
var gaps [][2]uint64
var cnt int

progressCb := func(seq uint64, pending uint64) {
if !c.showProgress {
return
}
if progress == nil {
progress = uiprogress.AddBar(int(pending)).AppendCompleted().PrependFunc(func(b *uiprogress.Bar) string {
return fmt.Sprintf("%s / %s", f(b.Current()), f(b.Total))
})
uiprogress.Start()
}
cnt++

progress.Set(int(seq))

if pending == 0 {
progress.Set(progress.Total)
}
}

gapCb := func(start, end uint64) {
gaps = append(gaps, [2]uint64{start, end})
}

err = stream.DetectGaps(ctx, progressCb, gapCb)
if progress != nil {
time.Sleep(250 * time.Millisecond) // let it draw
uiprogress.Stop()
fmt.Println()
}
if err != nil {
return err
}

if c.json {
printJSON(gaps)
return nil
}

if len(gaps) == 0 {
fmt.Printf("No deleted messages in %s\n", c.stream)
return nil
}

var table *tbl
if len(gaps) == 1 {
table = newTableWriter(fmt.Sprintf("1 gap found in Stream %s", c.stream))
} else {
table = newTableWriter(fmt.Sprintf("%s gaps found in Stream %s", f(len(gaps)), c.stream))
}

table.AddHeaders("First Message", "Last Message")
for _, gap := range gaps {
table.AddRow(f(gap[0]), f(gap[1]))
}
fmt.Println(table.Render())

return nil
}

func (c *streamCmd) subjectsAction(_ *fisk.ParseContext) (err error) {
asked := c.connectAndAskStream()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.16.7
github.com/mattn/go-isatty v0.0.19
github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9
github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1
github.com/nats-io/nats-server/v2 v2.9.22-0.20230812221748-b839c53abcec
github.com/nats-io/nats.go v1.28.1-0.20230809215439-7b119491f08f
github.com/nats-io/nuid v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9 h1:iovWg573DqMS60WskqkrsjMyhPoTSDKStjs8dA+FYP8=
github.com/nats-io/jsm.go v0.0.36-0.20230814153802-490edca64ad9/go.mod h1:MyB1zGGllqlJdUXUJBKPI93fajK7LknUVQebNEMmod4=
github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1 h1:FyouF/B3HNh7BLkpkqL3iz0wS0OFpUsBSgM+M/DUwNI=
github.com/nats-io/jsm.go v0.0.36-0.20230823100149-ec0be1e1dfc1/go.mod h1:MyB1zGGllqlJdUXUJBKPI93fajK7LknUVQebNEMmod4=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.22-0.20230812221748-b839c53abcec h1:WlIeTCfqbb94PZXcpjLnikseyGQv2lLMNIew0eF1esw=
Expand Down

0 comments on commit 43ffbea

Please sign in to comment.