Skip to content
This repository has been archived by the owner on Mar 2, 2023. It is now read-only.

Commit

Permalink
Filter transactions based on height as early as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
alokmenghrajani committed Oct 30, 2018
1 parent ba5ec47 commit 0c5ad1e
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 184 deletions.
67 changes: 39 additions & 28 deletions accounter/accounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package accounter

import (
"encoding/hex"
"fmt"
"log"
"sync"
"time"
Expand All @@ -28,8 +27,10 @@ type Accounter struct {
xpubs []string
blockHeight uint32 // height at which we want to compute the balance

addresses map[string]address // map of address script => (Address, txHashes)
transactions map[string]transaction // map of txhash => transaction
addresses map[string]address // map of address script => (Address, txHashes)
txAddressesMu sync.Mutex
txAddresses map[string][]*deriver.Address // map of txhash => []Address
transactions map[string]transaction // map of txhash => transaction

backend backend.Backend
deriver *deriver.AddressDeriver
Expand Down Expand Up @@ -71,20 +72,19 @@ type vout struct {
}

// New instantiates a new Accounter.
// TODO: find a better way to pass options to the NewCounter. Maybe thru a config or functional option params?
func New(b backend.Backend, addressDeriver *deriver.AddressDeriver, lookahead uint32, blockHeight uint32) *Accounter {
a := &Accounter{
return &Accounter{
blockHeight: blockHeight,
backend: b,
deriver: addressDeriver,
lookahead: lookahead,
lastAddresses: [2]uint32{lookahead, lookahead},
addresses: make(map[string]address),
txAddresses: make(map[string][]*deriver.Address),
transactions: make(map[string]transaction),
addrResponses: b.AddrResponses(),
txResponses: b.TxResponses(),
}
a.addresses = make(map[string]address)
a.transactions = make(map[string]transaction)
a.addrResponses = b.AddrResponses()
a.txResponses = b.TxResponses()
return a
}

func (a *Accounter) ComputeBalance() uint64 {
Expand Down Expand Up @@ -114,31 +114,25 @@ func (a *Accounter) fetchTransactions() {
func (a *Accounter) processTransactions() {
for hash, tx := range a.transactions {
// remove transactions which are too recent
if tx.height > int64(a.blockHeight) {
reporter.GetInstance().Logf("transaction %s has height %d > BLOCK HEIGHT (%d)", hash, tx.height, a.blockHeight)
if (tx.height > int64(a.blockHeight)) || (tx.height == 0) {
log.Printf("backend failed to filter tx %s (%d, %d)", hash, tx.height, a.blockHeight)
delete(a.transactions, hash)
}
// remove transactions which haven't been mined
if tx.height <= 0 {
reporter.GetInstance().Logf("transaction %s has not been mined, yet (height=%d)", hash, tx.height)
delete(a.transactions, hash)
if tx.height < 0 {
log.Panicf("tx %s has negative height %d", hash, tx.height)
}
}
reporter.GetInstance().SetTxAfterFilter(int32(len(a.transactions)))
reporter.GetInstance().Log("done filtering")

// TODO: we could check that scheduled == fetched in the metrics we track in reporter.

// parse the transaction hex
for hash, tx := range a.transactions {
b, err := hex.DecodeString(tx.hex)
if err != nil {
fmt.Printf("failed to unhex transaction %s: %s", hash, tx.hex)
log.Panicf("failed to unhex transaction %s: %s", hash, tx.hex)
}
parsedTx, err := btcutil.NewTxFromBytes(b)
if err != nil {
fmt.Printf("failed to parse transaction %s: %s", hash, tx.hex)
continue
log.Panicf("failed to parse transaction %s: %s", hash, tx.hex)
}
for _, txin := range parsedTx.MsgTx().TxIn {
tx.vin = append(tx.vin, vin{
Expand Down Expand Up @@ -234,7 +228,10 @@ func (a *Accounter) sendWork() {
indexes[change]++
}
}
// apparently no more work for us, so we can sleep a bit
// apparently no more work for now.

// TODO: we should either merge sendWork/recvWork or use some kind of mutex to sleep exactly
// until there's more work that needs to be done. For now, a simple sleep works.
time.Sleep(time.Millisecond * 100)
}
}
Expand All @@ -251,6 +248,7 @@ func (a *Accounter) recvWork() {
continue
}
reporter.GetInstance().IncAddressesFetched()
reporter.GetInstance().Logf("received address: %s", resp.Address)

a.countMu.Lock()
a.processedAddrCount++
Expand All @@ -263,20 +261,23 @@ func (a *Accounter) recvWork() {

a.countMu.Lock()
for _, txHash := range resp.TxHashes {
// TODO: mark this txHash as having been scheduled. So we don't fetch it multiple times.
if _, exists := a.transactions[txHash]; !exists {
a.backend.TxRequest(txHash)
a.seenTxCount++
}
}
a.countMu.Unlock()

// we can only update the lastAddresses after we filter the transaction heights
a.txAddressesMu.Lock()
for _, txHash := range resp.TxHashes {
a.txAddresses[txHash] = append(a.txAddresses[txHash], resp.Address)
}
a.txAddressesMu.Unlock()

reporter.GetInstance().Logf("address %s has %d transactions", resp.Address, len(resp.TxHashes))

if resp.HasTransactions() {
a.countMu.Lock()
a.lastAddresses[resp.Address.Change()] = Max(a.lastAddresses[resp.Address.Change()], resp.Address.Index()+a.lookahead)
a.countMu.Unlock()
}
case resp, ok := <-txResponses:
// channel is closed now, so ignore this case by blocking forever
if !ok {
Expand All @@ -285,6 +286,7 @@ func (a *Accounter) recvWork() {
}

reporter.GetInstance().IncTxFetched()
reporter.GetInstance().Logf("received tx: %s", resp.Hash)

a.countMu.Lock()
a.processedTxCount++
Expand All @@ -297,6 +299,15 @@ func (a *Accounter) recvWork() {
vout: []vout{},
}
a.transactions[resp.Hash] = tx

a.txAddressesMu.Lock()
a.countMu.Lock()
for _, addr := range a.txAddresses[resp.Hash] {
a.lastAddresses[addr.Change()] = Max(a.lastAddresses[addr.Change()], addr.Index()+a.lookahead)
}
a.countMu.Unlock()
a.txAddressesMu.Unlock()

case <-time.Tick(1 * time.Second):
if a.complete() {
return
Expand Down
1 change: 1 addition & 0 deletions accounter/accounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestComputeBalanceTestnet(t *testing.T) {
deriver := deriver.NewAddressDeriver(Testnet, pubs, 1, "")
b, err := backend.NewFixtureBackend("testdata/tpub_data.json")
assert.NoError(t, err)
b.Start(1435169)
a := New(b, deriver, 100, 1435169)

assert.Equal(t, uint64(267893477), a.ComputeBalance())
Expand Down
3 changes: 3 additions & 0 deletions accounter/testdata/tpub_data.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"metadata": {
"height": 1435169
},
"addresses": [
{
"address": "mfsNoNz57ANkYrCzHaLZDLoMGujBW8u3zv",
Expand Down
6 changes: 6 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ import (
// forgo the Finish() method and have the Accounter read from the TxResponses channel until it has
// all the data it needs. This would require the Accounter to maintain its own set of transactions.
type Backend interface {
// Returns chain height. Possibly connects + disconnects from first node.
ChainHeight() uint32

// Gets backend ready to serve requests
Start(blockHeight uint32) error

// Request-response channels
AddrRequest(addr *deriver.Address)
AddrResponses() <-chan *AddrResponse
TxRequest(txHash string)
TxResponses() <-chan *TxResponse
BlockRequest(height uint32)
BlockResponses() <-chan *BlockResponse

// Call this to disconnect from nodes and cleanup
Finish()
}

Expand Down
29 changes: 17 additions & 12 deletions backend/btcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"fmt"
"log"
"math"
"sync"

"github.com/btcsuite/btcd/btcjson"
Expand Down Expand Up @@ -80,15 +81,10 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken
return nil, errors.Errorf("Unexpected genesis block %s != %s", genesis.String(), GenesisBlock(network))
}

height, err := client.GetBlockCount()
if err != nil {
return nil, errors.Wrap(err, "could not connect to the Btcd server")
}

b := &BtcdBackend{
return &BtcdBackend{
client: client,
network: network,
chainHeight: uint32(height),
chainHeight: 0,
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txRequests: make(chan string, 2*maxTxsPerAddr),
Expand All @@ -99,13 +95,26 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken
blockHeightLookup: make(map[string]int64),
cachedTransactions: make(map[string]*TxResponse),
doneCh: make(chan bool),
}, nil
}

func (b *BtcdBackend) ChainHeight() uint32 {
height, err := b.client.GetBlockCount()
PanicOnError(err)
if height <= 0 || height > math.MaxUint32 {
log.Panicf("invalid height: %d", height)
}
return uint32(height)
}

func (b *BtcdBackend) Start(blockHeight uint32) error {
b.chainHeight = blockHeight

// launch
for i := 0; i < concurrency; i++ {
go b.processRequests()
}
return b, nil
return nil
}

// AddrRequest schedules a request to the backend to lookup information related
Expand Down Expand Up @@ -152,10 +161,6 @@ func (b *BtcdBackend) Finish() {
b.client.Disconnect()
}

func (b *BtcdBackend) ChainHeight() uint32 {
return b.chainHeight
}

func (b *BtcdBackend) processRequests() {
for {
select {
Expand Down
4 changes: 3 additions & 1 deletion backend/electrum/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (n *Node) BlockchainAddressGetHistory(address string) ([]*Transaction, erro
// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get
func (n *Node) BlockchainTransactionGet(txid string) (string, error) {
var hex string
err := n.request("blockchain.transaction.get", []interface{}{txid, false}, &hex)
// some servers don't handle the second parameter (even though they advertise version 1.2)
// so we leave it out.
err := n.request("blockchain.transaction.get", []interface{}{txid}, &hex)
return hex, err
}

Expand Down
Loading

0 comments on commit 0c5ad1e

Please sign in to comment.