Skip to content

Commit

Permalink
Merge pull request #754 from bhandras/sweepbatcher-empty-batch-fix
Browse files Browse the repository at this point in the history
sweepbatcher: do not fail on restoring empty batches
  • Loading branch information
bhandras authored May 24, 2024
2 parents 38f0e3a + 14de8f1 commit 563e7be
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 47 deletions.
9 changes: 9 additions & 0 deletions loopdb/sqlc/batch.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions loopdb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions loopdb/sqlc/queries/batch.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ INSERT INTO sweep_batches (
$6
) RETURNING id;

-- name: DropBatch :exec
DELETE FROM sweep_batches WHERE id = $1;

-- name: UpdateBatch :exec
UPDATE sweep_batches SET
confirmed = $2,
Expand Down
22 changes: 22 additions & 0 deletions sweepbatcher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sweepbatcher
import (
"context"
"database/sql"
"fmt"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
Expand Down Expand Up @@ -46,6 +47,9 @@ type BaseDB interface {
InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) (
int32, error)

// DropBatch drops a batch from the database.
DropBatch(ctx context.Context, id int32) error

// UpdateBatch updates a batch in the database.
UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error

Expand Down Expand Up @@ -108,6 +112,24 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch))
}

// DropBatch drops a batch from the database. Note that we only use this call
// for batches that have no sweeps and so we'd not be able to resume.
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
readOpts := loopdb.NewSqlReadOpts()
return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
if err != nil {
return err
}

if len(dbSweeps) != 0 {
return fmt.Errorf("cannot drop a non-empty batch")
}

return tx.DropBatch(ctx, id)
})
}

// UpdateSweepBatch updates a batch in the database.
func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))
Expand Down
6 changes: 6 additions & 0 deletions sweepbatcher/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context,
return id, nil
}

// DropBatch drops a batch from the database.
func (s *StoreMock) DropBatch(ctx context.Context, id int32) error {
delete(s.batches, id)
return nil
}

// UpdateSweepBatch updates a batch in the database.
func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
batch *dbBatch) error {
Expand Down
27 changes: 22 additions & 5 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -48,7 +49,7 @@ const (
)

var (
ErrBatchShuttingDown = fmt.Errorf("batch shutting down")
ErrBatchShuttingDown = errors.New("batch shutting down")
)

// sweep stores any data related to sweeping a specific outpoint.
Expand Down Expand Up @@ -196,7 +197,11 @@ type batch struct {
// main event loop.
callLeave chan struct{}

// quit signals that the batch must stop.
// stopped signals that the batch has stopped.
stopped chan struct{}

// quit is owned by the parent batcher and signals that the batch must
// stop.
quit chan struct{}

// wallet is the wallet client used to create and publish the batch
Expand Down Expand Up @@ -260,6 +265,7 @@ type batchKit struct {
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}

// scheduleNextCall schedules the next call to the batch handler's main event
Expand All @@ -269,6 +275,9 @@ func (b *batch) scheduleNextCall() (func(), error) {
case b.callEnter <- struct{}{}:

case <-b.quit:
return func() {}, ErrBatcherShuttingDown

case <-b.stopped:
return func() {}, ErrBatchShuttingDown
}

Expand All @@ -292,7 +301,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
quit: make(chan struct{}),
stopped: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
Expand All @@ -319,7 +329,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
quit: make(chan struct{}),
stopped: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
Expand Down Expand Up @@ -446,7 +457,7 @@ func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.quit)
close(b.stopped)
b.wg.Wait()
}()

Expand Down Expand Up @@ -539,6 +550,12 @@ func (b *batch) publish(ctx context.Context) error {
coopSuccess bool
)

if len(b.sweeps) == 0 {
b.log.Debugf("skipping publish: no sweeps in the batch")

return nil
}

// Run the RBF rate update.
err = b.updateRbfRate(ctx)
if err != nil {
Expand Down
73 changes: 47 additions & 26 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sweepbatcher

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -46,36 +47,35 @@ const (
type BatcherStore interface {
// FetchUnconfirmedSweepBatches fetches all the batches from the
// database that are not in a confirmed state.
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch,
error)
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error)

// InsertSweepBatch inserts a batch into the database, returning the id
// of the inserted batch.
InsertSweepBatch(ctx context.Context,
batch *dbBatch) (int32, error)
InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error)

// DropBatch drops a batch from the database. This should only be used
// when a batch is empty.
DropBatch(ctx context.Context, id int32) error

// UpdateSweepBatch updates a batch in the database.
UpdateSweepBatch(ctx context.Context,
batch *dbBatch) error
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error

// ConfirmBatch confirms a batch by setting its state to confirmed.
ConfirmBatch(ctx context.Context, id int32) error

// FetchBatchSweeps fetches all the sweeps that belong to a batch.
FetchBatchSweeps(ctx context.Context,
id int32) ([]*dbSweep, error)
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)

// UpsertSweep inserts a sweep into the database, or updates an existing
// sweep if it already exists.
UpsertSweep(ctx context.Context, sweep *dbSweep) error

// GetSweepStatus returns the completed status of the sweep.
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (
bool, error)
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error)

// GetParentBatch returns the parent batch of a (completed) sweep.
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (
*dbBatch, error)
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch,
error)

// TotalSweptAmount returns the total amount swept by a (confirmed)
// batch.
Expand Down Expand Up @@ -135,7 +135,7 @@ type SpendNotifier struct {
}

var (
ErrBatcherShuttingDown = fmt.Errorf("batcher shutting down")
ErrBatcherShuttingDown = errors.New("batcher shutting down")
)

// Batcher is a system that is responsible for accepting sweep requests and
Expand All @@ -153,6 +153,10 @@ type Batcher struct {
// quit signals that the batch must stop.
quit chan struct{}

// initDone is a channel that is closed when the batcher has been
// initialized.
initDone chan struct{}

// wallet is the wallet kit client that is used by batches.
wallet lndclient.WalletKitClient

Expand Down Expand Up @@ -200,6 +204,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
Expand All @@ -216,6 +221,7 @@ func (b *Batcher) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.quit)

for _, batch := range b.batches {
batch.Wait()
Expand All @@ -238,6 +244,9 @@ func (b *Batcher) Run(ctx context.Context) error {
}
}

// Signal that the batcher has been initialized.
close(b.initDone)

for {
select {
case sweepReq := <-b.sweepReqs:
Expand Down Expand Up @@ -306,7 +315,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,

if batch.sweepExists(sweep.swapHash) {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil {
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
return err
}

Expand All @@ -321,7 +330,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
// If one of the batches accepts the sweep, we provide it to that batch.
for _, batch := range b.batches {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil && err != ErrBatchShuttingDown {
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
return err
}

Expand Down Expand Up @@ -379,6 +388,7 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}

batch := NewBatch(cfg, batchKit)
Expand Down Expand Up @@ -407,23 +417,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
// spinUpBatchDB spins up a batch that already existed in storage, then
// returns it.
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
cfg := batchConfig{
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
batchConfTarget: defaultBatchConfTarget,
}

rbfCache := rbfCache{
LastHeight: batch.rbfCache.LastHeight,
FeeRate: batch.rbfCache.FeeRate,
}

dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
if err != nil {
return err
}

if len(dbSweeps) == 0 {
return fmt.Errorf("batch %d has no sweeps", batch.id)
log.Infof("skipping restored batch %d as it has no sweeps",
batch.id)

// It is safe to drop this empty batch as it has no sweeps.
err := b.store.DropBatch(ctx, batch.id)
if err != nil {
log.Warnf("unable to drop empty batch %d: %v",
batch.id, err)
}

return nil
}

primarySweep := dbSweeps[0]
Expand All @@ -439,6 +449,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
sweeps[sweep.swapHash] = *sweep
}

rbfCache := rbfCache{
LastHeight: batch.rbfCache.LastHeight,
FeeRate: batch.rbfCache.FeeRate,
}

batchKit := batchKit{
id: batch.id,
batchTxid: batch.batchTxid,
Expand All @@ -456,6 +471,12 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
purger: b.AddSweep,
store: b.store,
log: batchPrefixLogger(fmt.Sprintf("%d", batch.id)),
quit: b.quit,
}

cfg := batchConfig{
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
batchConfTarget: defaultBatchConfTarget,
}

newBatch := NewBatchFromDB(cfg, batchKit)
Expand Down
Loading

0 comments on commit 563e7be

Please sign in to comment.