Skip to content
This repository has been archived by the owner on Mar 2, 2023. It is now read-only.

Fix lookahead bug #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions accounter/accounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package accounter

import (
"encoding/hex"
"fmt"
"log"
"sync"
"time"
Expand All @@ -28,8 +27,10 @@ type Accounter struct {
xpubs []string
blockHeight uint32 // height at which we want to compute the balance

addresses map[string]address // map of address script => (Address, txHashes)
transactions map[string]transaction // map of txhash => transaction
addresses map[string]address // map of address script => (Address, txHashes)
txAddressesMu sync.Mutex
txAddresses map[string][]*deriver.Address // map of txhash => []Address
transactions map[string]transaction // map of txhash => transaction

backend backend.Backend
deriver *deriver.AddressDeriver
Expand Down Expand Up @@ -71,20 +72,19 @@ type vout struct {
}

// New instantiates a new Accounter.
// TODO: find a better way to pass options to the NewCounter. Maybe thru a config or functional option params?
func New(b backend.Backend, addressDeriver *deriver.AddressDeriver, lookahead uint32, blockHeight uint32) *Accounter {
a := &Accounter{
return &Accounter{
blockHeight: blockHeight,
backend: b,
deriver: addressDeriver,
lookahead: lookahead,
lastAddresses: [2]uint32{lookahead, lookahead},
addresses: make(map[string]address),
txAddresses: make(map[string][]*deriver.Address),
transactions: make(map[string]transaction),
addrResponses: b.AddrResponses(),
txResponses: b.TxResponses(),
}
a.addresses = make(map[string]address)
a.transactions = make(map[string]transaction)
a.addrResponses = b.AddrResponses()
a.txResponses = b.TxResponses()
return a
}

func (a *Accounter) ComputeBalance() uint64 {
Expand Down Expand Up @@ -114,31 +114,25 @@ func (a *Accounter) fetchTransactions() {
func (a *Accounter) processTransactions() {
for hash, tx := range a.transactions {
// remove transactions which are too recent
if tx.height > int64(a.blockHeight) {
reporter.GetInstance().Logf("transaction %s has height %d > BLOCK HEIGHT (%d)", hash, tx.height, a.blockHeight)
if (tx.height > int64(a.blockHeight)) || (tx.height == 0) {
log.Printf("backend failed to filter tx %s (%d, %d)", hash, tx.height, a.blockHeight)
delete(a.transactions, hash)
}
// remove transactions which haven't been mined
if tx.height <= 0 {
reporter.GetInstance().Logf("transaction %s has not been mined, yet (height=%d)", hash, tx.height)
delete(a.transactions, hash)
if tx.height < 0 {
log.Panicf("tx %s has negative height %d", hash, tx.height)
}
}
reporter.GetInstance().SetTxAfterFilter(int32(len(a.transactions)))
reporter.GetInstance().Log("done filtering")

// TODO: we could check that scheduled == fetched in the metrics we track in reporter.

// parse the transaction hex
for hash, tx := range a.transactions {
b, err := hex.DecodeString(tx.hex)
if err != nil {
fmt.Printf("failed to unhex transaction %s: %s", hash, tx.hex)
log.Panicf("failed to unhex transaction %s: %s", hash, tx.hex)
}
parsedTx, err := btcutil.NewTxFromBytes(b)
if err != nil {
fmt.Printf("failed to parse transaction %s: %s", hash, tx.hex)
continue
log.Panicf("failed to parse transaction %s: %s", hash, tx.hex)
}
for _, txin := range parsedTx.MsgTx().TxIn {
tx.vin = append(tx.vin, vin{
Expand Down Expand Up @@ -234,7 +228,10 @@ func (a *Accounter) sendWork() {
indexes[change]++
}
}
// apparently no more work for us, so we can sleep a bit
// apparently no more work for now.

// TODO: we should either merge sendWork/recvWork or use some kind of mutex to sleep exactly
// until there's more work that needs to be done. For now, a simple sleep works.
time.Sleep(time.Millisecond * 100)
}
}
Expand All @@ -251,6 +248,7 @@ func (a *Accounter) recvWork() {
continue
}
reporter.GetInstance().IncAddressesFetched()
reporter.GetInstance().Logf("received address: %s", resp.Address)

a.countMu.Lock()
a.processedAddrCount++
Expand All @@ -263,20 +261,23 @@ func (a *Accounter) recvWork() {

a.countMu.Lock()
for _, txHash := range resp.TxHashes {
// TODO: mark this txHash as having been scheduled. So we don't fetch it multiple times.
if _, exists := a.transactions[txHash]; !exists {
a.backend.TxRequest(txHash)
a.seenTxCount++
}
}
a.countMu.Unlock()

// we can only update the lastAddresses after we filter the transaction heights
a.txAddressesMu.Lock()
for _, txHash := range resp.TxHashes {
a.txAddresses[txHash] = append(a.txAddresses[txHash], resp.Address)
}
a.txAddressesMu.Unlock()

reporter.GetInstance().Logf("address %s has %d transactions", resp.Address, len(resp.TxHashes))

if resp.HasTransactions() {
a.countMu.Lock()
a.lastAddresses[resp.Address.Change()] = Max(a.lastAddresses[resp.Address.Change()], resp.Address.Index()+a.lookahead)
a.countMu.Unlock()
}
case resp, ok := <-txResponses:
// channel is closed now, so ignore this case by blocking forever
if !ok {
Expand All @@ -285,6 +286,7 @@ func (a *Accounter) recvWork() {
}

reporter.GetInstance().IncTxFetched()
reporter.GetInstance().Logf("received tx: %s", resp.Hash)

a.countMu.Lock()
a.processedTxCount++
Expand All @@ -297,6 +299,15 @@ func (a *Accounter) recvWork() {
vout: []vout{},
}
a.transactions[resp.Hash] = tx

a.txAddressesMu.Lock()
a.countMu.Lock()
for _, addr := range a.txAddresses[resp.Hash] {
a.lastAddresses[addr.Change()] = Max(a.lastAddresses[addr.Change()], addr.Index()+a.lookahead)
}
a.countMu.Unlock()
a.txAddressesMu.Unlock()

case <-time.Tick(1 * time.Second):
if a.complete() {
return
Expand Down
1 change: 1 addition & 0 deletions accounter/accounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestComputeBalanceTestnet(t *testing.T) {
deriver := deriver.NewAddressDeriver(Testnet, pubs, 1, "")
b, err := backend.NewFixtureBackend("testdata/tpub_data.json")
assert.NoError(t, err)
b.Start(1435169)
a := New(b, deriver, 100, 1435169)

assert.Equal(t, uint64(267893477), a.ComputeBalance())
Expand Down
3 changes: 3 additions & 0 deletions accounter/testdata/tpub_data.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"metadata": {
"height": 1435169
},
"addresses": [
{
"address": "mfsNoNz57ANkYrCzHaLZDLoMGujBW8u3zv",
Expand Down
6 changes: 6 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ import (
// forgo the Finish() method and have the Accounter read from the TxResponses channel until it has
// all the data it needs. This would require the Accounter to maintain its own set of transactions.
type Backend interface {
// Returns chain height. Possibly connects + disconnects from first node.
ChainHeight() uint32

// Gets backend ready to serve requests
Start(blockHeight uint32) error

// Request-response channels
AddrRequest(addr *deriver.Address)
AddrResponses() <-chan *AddrResponse
TxRequest(txHash string)
TxResponses() <-chan *TxResponse
BlockRequest(height uint32)
BlockResponses() <-chan *BlockResponse

// Call this to disconnect from nodes and cleanup
Finish()
}

Expand Down
29 changes: 17 additions & 12 deletions backend/btcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"fmt"
"log"
"math"
"sync"

"github.com/btcsuite/btcd/btcjson"
Expand Down Expand Up @@ -80,15 +81,10 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken
return nil, errors.Errorf("Unexpected genesis block %s != %s", genesis.String(), GenesisBlock(network))
}

height, err := client.GetBlockCount()
if err != nil {
return nil, errors.Wrap(err, "could not connect to the Btcd server")
}

b := &BtcdBackend{
return &BtcdBackend{
client: client,
network: network,
chainHeight: uint32(height),
chainHeight: 0,
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txRequests: make(chan string, 2*maxTxsPerAddr),
Expand All @@ -99,13 +95,26 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken
blockHeightLookup: make(map[string]int64),
cachedTransactions: make(map[string]*TxResponse),
doneCh: make(chan bool),
}, nil
}

func (b *BtcdBackend) ChainHeight() uint32 {
height, err := b.client.GetBlockCount()
PanicOnError(err)
if height <= 0 || height > math.MaxUint32 {
log.Panicf("invalid height: %d", height)
}
return uint32(height)
}

func (b *BtcdBackend) Start(blockHeight uint32) error {
b.chainHeight = blockHeight

// launch
for i := 0; i < concurrency; i++ {
go b.processRequests()
}
return b, nil
return nil
}

// AddrRequest schedules a request to the backend to lookup information related
Expand Down Expand Up @@ -152,10 +161,6 @@ func (b *BtcdBackend) Finish() {
b.client.Disconnect()
}

func (b *BtcdBackend) ChainHeight() uint32 {
return b.chainHeight
}

func (b *BtcdBackend) processRequests() {
for {
select {
Expand Down
4 changes: 3 additions & 1 deletion backend/electrum/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (n *Node) BlockchainAddressGetHistory(address string) ([]*Transaction, erro
// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get
func (n *Node) BlockchainTransactionGet(txid string) (string, error) {
var hex string
err := n.request("blockchain.transaction.get", []interface{}{txid, false}, &hex)
// some servers don't handle the second parameter (even though they advertise version 1.2)
// so we leave it out.
err := n.request("blockchain.transaction.get", []interface{}{txid}, &hex)
return hex, err
}

Expand Down
Loading