Skip to content

Commit

Permalink
sync: Add rescan progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed May 16, 2024
1 parent 5e7f5c1 commit a28d2cf
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 15 deletions.
11 changes: 8 additions & 3 deletions asset/dcr/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"decred.org/dcrwallet/v3/p2p"
"decred.org/dcrwallet/v3/spv"
dcrwallet "decred.org/dcrwallet/v3/wallet"
"github.com/decred/dcrd/addrmgr/v2"
)

Expand Down Expand Up @@ -81,7 +82,11 @@ func (w *Wallet) IsSynced() bool {
return false
}

// RescanFromHeight rescans the wallet from the specified height.
func (w *Wallet) RescanFromHeight(ctx context.Context, startHeight int32) error {
return w.mainWallet.RescanFromHeight(ctx, w.syncer, startHeight)
// RescanProgressFromHeight rescans for relevant transactions in all blocks in
// the main chain starting at startHeight. Progress notifications and any
// errors are sent to the channel p. This function blocks until the rescan
// completes or ends in an error. p is closed before returning.
func (w *Wallet) RescanProgressFromHeight(ctx context.Context,
startHeight int32, p chan<- dcrwallet.RescanProgress) {
w.mainWallet.RescanProgressFromHeight(ctx, w.syncer, startHeight, p)
}
2 changes: 0 additions & 2 deletions asset/dcr/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func (w *Wallet) CloseWallet() error {
}

w.log.Info("Wallet closed")
w.mainWallet = nil
w.db = nil
return nil
}

Expand Down
51 changes: 47 additions & 4 deletions cgo/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"decred.org/dcrwallet/v3/spv"
dcrwallet "decred.org/dcrwallet/v3/wallet"
)

//export syncWallet
Expand Down Expand Up @@ -42,6 +43,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
FetchMissingCFiltersStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCFetchingCFilters
w.syncStatusMtx.Unlock()
w.log.Info("Fetching missing cfilters started.")
Expand All @@ -57,6 +62,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
FetchHeadersStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCFetchingHeaders
w.syncStatusMtx.Unlock()
w.log.Info("Fetching headers started.")
Expand All @@ -72,6 +81,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
DiscoverAddressesStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCDiscoveringAddrs
w.syncStatusMtx.Unlock()
w.log.Info("Discover addresses started.")
Expand All @@ -81,6 +94,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
RescanStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCRescanning
w.syncStatusMtx.Unlock()
w.log.Info("Rescan started.")
Expand Down Expand Up @@ -163,25 +180,51 @@ func rescanFromHeight(cName, cHeight *C.char) *C.char {
if !exists {
return errCResponse("wallet with name %q does not exist", name)
}
w.syncStatusMtx.RLock()
if w.rescanning {
w.syncStatusMtx.RUnlock()
return errCResponse("wallet %q already rescanning", name)
}
w.syncStatusMtx.RUnlock()
height, err := strconv.ParseUint(goString(cHeight), 10, 32)
if err != nil {
return errCResponse("height is not an uint32: %v", err)
}
// We don't seem to get any feedback from wallet when doing rescans here.
// Just set status to rescanning and then to complete when done.
w.syncStatusMtx.Lock()
w.syncStatusCode = SSCRescanning
w.rescanning = true
w.syncStatusMtx.Unlock()
w.Add(1)
go func() {
defer func() {
w.syncStatusMtx.Lock()
w.syncStatusCode = SSCComplete
w.rescanning = false
w.syncStatusMtx.Unlock()
w.Done()
}()
prog := make(chan dcrwallet.RescanProgress)
go func() {
w.RescanProgressFromHeight(ctx, int32(height), prog)
}()
if err := w.RescanFromHeight(ctx, int32(height)); err != nil {
log.Errorf("rescan wallet %q error: %v", name, err)
for {
select {
case p, open := <-prog:
if !open {
return
}
if p.Err != nil {
log.Errorf("rescan wallet %q error: %v", name, err)
return
}
w.syncStatusMtx.Lock()
w.rescanHeight = int(p.ScannedThrough)
w.syncStatusMtx.Unlock()
case <-ctx.Done():
return
case <-w.shutdown:
return
}
}
}()
return successCResponse("rescan from height %d for wallet %q started", height, name)
Expand Down
20 changes: 14 additions & 6 deletions cgo/walletloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type wallet struct {
*dcr.Wallet
log slog.Logger

sync.WaitGroup
shutdown chan struct{}

syncStatusMtx sync.RWMutex
syncStatusCode SyncStatusCode
targetHeight, cfiltersHeight, headersHeight, rescanHeight, numPeers int
Expand Down Expand Up @@ -73,8 +76,9 @@ func createWallet(cName, cDataDir, cNet, cPass, cMnemonic *C.char) *C.char {
}

wallets[name] = &wallet{
Wallet: w,
log: logger,
Wallet: w,
log: logger,
shutdown: make(chan struct{}),
}
return successCResponse("wallet created")
}
Expand Down Expand Up @@ -114,8 +118,9 @@ func createWatchOnlyWallet(cName, cDataDir, cNet, cPub *C.char) *C.char {
}

wallets[name] = &wallet{
Wallet: w,
log: logger,
Wallet: w,
log: logger,
shutdown: make(chan struct{}),
}
return successCResponse("wallet created")
}
Expand Down Expand Up @@ -156,8 +161,9 @@ func loadWallet(cName, cDataDir, cNet *C.char) *C.char {
}

wallets[name] = &wallet{
Wallet: w,
log: logger,
Wallet: w,
log: logger,
shutdown: make(chan struct{}),
}
return successCResponse(fmt.Sprintf("wallet %q loaded", name))
}
Expand Down Expand Up @@ -220,6 +226,8 @@ func closeWallet(cName *C.char) *C.char {
if err := w.CloseWallet(); err != nil {
return errCResponse("close wallet %q error: %v", name, err.Error())
}
close(w.shutdown)
w.Wait()
delete(wallets, name)
return successCResponse("wallet %q shutdown", name)
}
Expand Down

0 comments on commit a28d2cf

Please sign in to comment.