Skip to content

Commit

Permalink
sync: Add rescan progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Jun 5, 2024
1 parent 5e7f5c1 commit 1f2c9ca
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 43 deletions.
11 changes: 8 additions & 3 deletions asset/dcr/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"decred.org/dcrwallet/v3/p2p"
"decred.org/dcrwallet/v3/spv"
dcrwallet "decred.org/dcrwallet/v3/wallet"
"github.com/decred/dcrd/addrmgr/v2"
)

Expand Down Expand Up @@ -81,7 +82,11 @@ func (w *Wallet) IsSynced() bool {
return false
}

// RescanFromHeight rescans the wallet from the specified height.
func (w *Wallet) RescanFromHeight(ctx context.Context, startHeight int32) error {
return w.mainWallet.RescanFromHeight(ctx, w.syncer, startHeight)
// RescanProgressFromHeight rescans for relevant transactions in all blocks in
// the main chain starting at startHeight. Progress notifications and any
// errors are sent to the channel p. This function blocks until the rescan
// completes or ends in an error. p is closed before returning.
func (w *Wallet) RescanProgressFromHeight(ctx context.Context,
startHeight int32, p chan<- dcrwallet.RescanProgress) {
w.mainWallet.RescanProgressFromHeight(ctx, w.syncer, startHeight, p)
}
2 changes: 0 additions & 2 deletions asset/dcr/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func (w *Wallet) CloseWallet() error {
}

w.log.Info("Wallet closed")
w.mainWallet = nil
w.db = nil
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions cgo/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newExternalAddress(cName *C.char) *C.char {
return errCResponseWithCode(ErrCodeNotSynced, "newExternalAddress requested on an unsynced wallet")
}

_, err := w.NewExternalAddress(ctx, udb.DefaultAccountNum)
_, err := w.NewExternalAddress(w.ctx, udb.DefaultAccountNum)
if err != nil {
return errCResponse("w.NewExternalAddress error: %v", err)
}
Expand All @@ -68,11 +68,11 @@ func signMessage(cName, cMessage, cAddress, cPassword *C.char) *C.char {
return errCResponse("unable to decode address: %v", err)
}

if err := w.MainWallet().Unlock(ctx, []byte(goString(cPassword)), nil); err != nil {
if err := w.MainWallet().Unlock(w.ctx, []byte(goString(cPassword)), nil); err != nil {
return errCResponse("cannot unlock wallet: %v", err)
}

sig, err := w.MainWallet().SignMessage(ctx, goString(cMessage), addr)
sig, err := w.MainWallet().SignMessage(w.ctx, goString(cMessage), addr)
if err != nil {
return errCResponse("unable to sign message: %v", err)
}
Expand All @@ -89,7 +89,7 @@ func addresses(cName *C.char) *C.char {
return errCResponse("wallet with name %q is not loaded", goString(cName))
}

addrs, err := w.AddressesByAccount(ctx, defaultAccount)
addrs, err := w.AddressesByAccount(w.ctx, defaultAccount)
if err != nil {
return errCResponse("w.AddressesByAccount error: %v", err)
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func defaultPubkey(cName *C.char) *C.char {
return errCResponse("wallet with name %q is not loaded", goString(cName))
}

pubkey, err := w.AccountPubkey(ctx, defaultAccount)
pubkey, err := w.AccountPubkey(w.ctx, defaultAccount)
if err != nil {
return errCResponse("unable to get default pubkey: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion cgo/cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func shutdown() *C.char {
logMtx.Lock()
log.Debug("libwallet cgo shutdown")
logBackend.Close()
log = nil
logMtx.Unlock()

initialized = false
Expand Down
65 changes: 53 additions & 12 deletions cgo/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"decred.org/dcrwallet/v3/spv"
dcrwallet "decred.org/dcrwallet/v3/wallet"
)

//export syncWallet
Expand Down Expand Up @@ -42,6 +43,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
FetchMissingCFiltersStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCFetchingCFilters
w.syncStatusMtx.Unlock()
w.log.Info("Fetching missing cfilters started.")
Expand All @@ -57,6 +62,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
FetchHeadersStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCFetchingHeaders
w.syncStatusMtx.Unlock()
w.log.Info("Fetching headers started.")
Expand All @@ -72,6 +81,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
DiscoverAddressesStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCDiscoveringAddrs
w.syncStatusMtx.Unlock()
w.log.Info("Discover addresses started.")
Expand All @@ -81,6 +94,10 @@ func syncWallet(cName, cPeers *C.char) *C.char {
},
RescanStarted: func() {
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return
}
w.syncStatusCode = SSCRescanning
w.syncStatusMtx.Unlock()
w.log.Info("Rescan started.")
Expand All @@ -95,7 +112,7 @@ func syncWallet(cName, cPeers *C.char) *C.char {
w.log.Info("Rescan finished.")
},
}
if err := w.StartSync(ctx, ntfns, peers...); err != nil {
if err := w.StartSync(w.ctx, ntfns, peers...); err != nil {
return errCResponse(err.Error())
}
return successCResponse("sync started")
Expand All @@ -120,7 +137,7 @@ func syncWalletStatus(cName *C.char) *C.char {
if !is {
return errCResponse("backend is not an spv syncer")
}
targetHeight := spvSyncer.EstimateMainChainTip(ctx)
targetHeight := spvSyncer.EstimateMainChainTip(w.ctx)

// Sometimes it appears we miss a notification during start up. This is
// a bandaid to put us as synced in that case.
Expand Down Expand Up @@ -156,32 +173,56 @@ func syncWalletStatus(cName *C.char) *C.char {

//export rescanFromHeight
func rescanFromHeight(cName, cHeight *C.char) *C.char {
walletsMtx.Lock()
defer walletsMtx.Unlock()
height, err := strconv.ParseUint(goString(cHeight), 10, 32)
if err != nil {
return errCResponse("height is not an uint32: %v", err)
}
name := goString(cName)
w, exists := wallets[name]
w, exists := loadedWallet(cName)
if !exists {
return errCResponse("wallet with name %q does not exist", name)
}
height, err := strconv.ParseUint(goString(cHeight), 10, 32)
if err != nil {
return errCResponse("height is not an uint32: %v", err)
if !w.IsSynced() {
return errCResponseWithCode(ErrCodeNotSynced, "rescanFromHeight requested on an unsynced wallet")
}
// We don't seem to get any feedback from wallet when doing rescans here.
// Just set status to rescanning and then to complete when done.
w.syncStatusMtx.Lock()
if w.rescanning {
w.syncStatusMtx.Unlock()
return errCResponse("wallet %q already rescanning", name)
}
w.syncStatusCode = SSCRescanning
w.rescanning = true
w.rescanHeight = int(height)
w.syncStatusMtx.Unlock()
w.Add(1)
go func() {
defer func() {
w.syncStatusMtx.Lock()
w.syncStatusCode = SSCComplete
w.rescanning = false
w.syncStatusMtx.Unlock()
w.Done()
}()
prog := make(chan dcrwallet.RescanProgress)
go func() {
w.RescanProgressFromHeight(w.ctx, int32(height), prog)
}()
if err := w.RescanFromHeight(ctx, int32(height)); err != nil {
log.Errorf("rescan wallet %q error: %v", name, err)
for {
select {
case p, open := <-prog:
if !open {
return
}
if p.Err != nil {
log.Errorf("rescan wallet %q error: %v", name, err)
return
}
w.syncStatusMtx.Lock()
w.rescanHeight = int(p.ScannedThrough)
w.syncStatusMtx.Unlock()
case <-w.ctx.Done():
return
}
}
}()
return successCResponse("rescan from height %d for wallet %q started", height, name)
Expand Down
16 changes: 8 additions & 8 deletions cgo/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func createSignedTransaction(cName, cCreateSignedTxJSONReq *C.char) *C.char {
ignoreInputs[i] = o
}

if err := w.MainWallet().Unlock(ctx, []byte(req.Password), nil); err != nil {
if err := w.MainWallet().Unlock(w.ctx, []byte(req.Password), nil); err != nil {
return errCResponse("cannot unlock wallet: %v", err)
}
defer w.MainWallet().Lock()

txBytes, txhash, fee, err := w.CreateSignedTransaction(ctx, outputs, inputs, ignoreInputs, uint64(req.FeeRate))
txBytes, txhash, fee, err := w.CreateSignedTransaction(w.ctx, outputs, inputs, ignoreInputs, uint64(req.FeeRate))
if err != nil {
return errCResponse("unable to sign send transaction: %v", err)
}
Expand All @@ -82,7 +82,7 @@ func sendRawTransaction(cName, cTxHex *C.char) *C.char {
if !exists {
return errCResponse("wallet with name %q does not exist", goString(cName))
}
txHash, err := w.SendRawTransaction(ctx, goString(cTxHex))
txHash, err := w.SendRawTransaction(w.ctx, goString(cTxHex))
if err != nil {
return errCResponse("unable to sign send transaction: %v", err)
}
Expand All @@ -95,7 +95,7 @@ func listUnspents(cName *C.char) *C.char {
if !exists {
return errCResponse("wallet with name %q does not exist", goString(cName))
}
res, err := w.MainWallet().ListUnspent(ctx, 1, math.MaxInt32, nil, defaultAccount)
res, err := w.MainWallet().ListUnspent(w.ctx, 1, math.MaxInt32, nil, defaultAccount)
if err != nil {
return errCResponse("unable to get unspents: %v", err)
}
Expand All @@ -107,7 +107,7 @@ func listUnspents(cName *C.char) *C.char {
return errCResponse("unable to decode address: %v", err)
}

ka, err := w.MainWallet().KnownAddress(ctx, addr)
ka, err := w.MainWallet().KnownAddress(w.ctx, addr)
if err != nil {
return errCResponse("unspent address is not known: %v", err)
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func estimateFee(cName, cNBlocks *C.char) *C.char {
if err != nil {
return errCResponse("number of blocks is not a uint64: %v", err)
}
txFee, err := w.FetchFeeFromOracle(ctx, nBlocks)
txFee, err := w.FetchFeeFromOracle(w.ctx, nBlocks)
if err != nil {
return errCResponse("unable to get fee from oracle: %v", err)
}
Expand All @@ -160,7 +160,7 @@ func listTransactions(cName, cFrom, cCount *C.char) *C.char {
if err != nil {
return errCResponse("count is not an int: %v", err)
}
res, err := w.MainWallet().ListTransactions(ctx, int(from), int(count))
res, err := w.MainWallet().ListTransactions(w.ctx, int(from), int(count))
if err != nil {
return errCResponse("unable to get transactions: %v", err)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func bestBlock(cName *C.char) *C.char {
if !exists {
return errCResponse("wallet with name %q does not exist", goString(cName))
}
blockHash, blockHeight := w.MainWallet().MainChainTip(ctx)
blockHash, blockHeight := w.MainWallet().MainChainTip(w.ctx)
res := &BestBlockRes{
Hash: blockHash.String(),
Height: int(blockHeight),
Expand Down
Loading

0 comments on commit 1f2c9ca

Please sign in to comment.