From 7d10a1540b7091d11b309e1e62983d4879cbb846 Mon Sep 17 00:00:00 2001 From: Alok Menghrajani Date: Wed, 17 Oct 2018 18:22:43 -0700 Subject: [PATCH] Filter transactions based on height as early as possible --- accounter/accounter.go | 67 ++++++++++++++---------- accounter/accounter_test.go | 1 + accounter/testdata/tpub_data.json | 3 ++ backend/backend.go | 6 +++ backend/btcd_backend.go | 29 ++++++----- backend/electrum/blockchain.go | 4 +- backend/electrum_backend.go | 86 ++++++++++++++++++++++--------- backend/electrum_backend_test.go | 17 +----- backend/fixture_backend.go | 23 ++++++--- backend/fixture_backend_test.go | 7 +-- backend/recorder_backend.go | 18 ++++--- blockfinder/blockfinder_test.go | 1 + main.go | 78 +++++++++------------------- reporter/reporter.go | 37 +------------ utils/utils.go | 2 +- 15 files changed, 190 insertions(+), 189 deletions(-) diff --git a/accounter/accounter.go b/accounter/accounter.go index 1dee634..3f856ff 100644 --- a/accounter/accounter.go +++ b/accounter/accounter.go @@ -2,7 +2,6 @@ package accounter import ( "encoding/hex" - "fmt" "log" "sync" "time" @@ -28,8 +27,10 @@ type Accounter struct { xpubs []string blockHeight uint32 // height at which we want to compute the balance - addresses map[string]address // map of address script => (Address, txHashes) - transactions map[string]transaction // map of txhash => transaction + addresses map[string]address // map of address script => (Address, txHashes) + txAddressesMu sync.Mutex + txAddresses map[string][]*deriver.Address // map of txhash => []Address + transactions map[string]transaction // map of txhash => transaction backend backend.Backend deriver *deriver.AddressDeriver @@ -71,20 +72,19 @@ type vout struct { } // New instantiates a new Accounter. -// TODO: find a better way to pass options to the NewCounter. Maybe thru a config or functional option params? func New(b backend.Backend, addressDeriver *deriver.AddressDeriver, lookahead uint32, blockHeight uint32) *Accounter { - a := &Accounter{ + return &Accounter{ blockHeight: blockHeight, backend: b, deriver: addressDeriver, lookahead: lookahead, lastAddresses: [2]uint32{lookahead, lookahead}, + addresses: make(map[string]address), + txAddresses: make(map[string][]*deriver.Address), + transactions: make(map[string]transaction), + addrResponses: b.AddrResponses(), + txResponses: b.TxResponses(), } - a.addresses = make(map[string]address) - a.transactions = make(map[string]transaction) - a.addrResponses = b.AddrResponses() - a.txResponses = b.TxResponses() - return a } func (a *Accounter) ComputeBalance() uint64 { @@ -114,31 +114,25 @@ func (a *Accounter) fetchTransactions() { func (a *Accounter) processTransactions() { for hash, tx := range a.transactions { // remove transactions which are too recent - if tx.height > int64(a.blockHeight) { - reporter.GetInstance().Logf("transaction %s has height %d > BLOCK HEIGHT (%d)", hash, tx.height, a.blockHeight) + if (tx.height > int64(a.blockHeight)) || (tx.height == 0) { + log.Printf("backend failed to filter tx %s (%d, %d)", hash, tx.height, a.blockHeight) delete(a.transactions, hash) } - // remove transactions which haven't been mined - if tx.height <= 0 { - reporter.GetInstance().Logf("transaction %s has not been mined, yet (height=%d)", hash, tx.height) - delete(a.transactions, hash) + if tx.height < 0 { + log.Panicf("tx %s has negative height %d", hash, tx.height) } } - reporter.GetInstance().SetTxAfterFilter(int32(len(a.transactions))) - reporter.GetInstance().Log("done filtering") // TODO: we could check that scheduled == fetched in the metrics we track in reporter. - // parse the transaction hex for hash, tx := range a.transactions { b, err := hex.DecodeString(tx.hex) if err != nil { - fmt.Printf("failed to unhex transaction %s: %s", hash, tx.hex) + log.Panicf("failed to unhex transaction %s: %s", hash, tx.hex) } parsedTx, err := btcutil.NewTxFromBytes(b) if err != nil { - fmt.Printf("failed to parse transaction %s: %s", hash, tx.hex) - continue + log.Panicf("failed to parse transaction %s: %s", hash, tx.hex) } for _, txin := range parsedTx.MsgTx().TxIn { tx.vin = append(tx.vin, vin{ @@ -234,7 +228,10 @@ func (a *Accounter) sendWork() { indexes[change]++ } } - // apparently no more work for us, so we can sleep a bit + // apparently no more work for now. + + // TODO: we should either merge sendWork/recvWork or use some kind of mutex to sleep exactly + // until there's more work that needs to be done. For now, a simple sleep works. time.Sleep(time.Millisecond * 100) } } @@ -251,6 +248,7 @@ func (a *Accounter) recvWork() { continue } reporter.GetInstance().IncAddressesFetched() + reporter.GetInstance().Logf("received address: %s", resp.Address) a.countMu.Lock() a.processedAddrCount++ @@ -263,6 +261,7 @@ func (a *Accounter) recvWork() { a.countMu.Lock() for _, txHash := range resp.TxHashes { + // TODO: mark this txHash as having been scheduled. So we don't fetch it multiple times. if _, exists := a.transactions[txHash]; !exists { a.backend.TxRequest(txHash) a.seenTxCount++ @@ -270,13 +269,15 @@ func (a *Accounter) recvWork() { } a.countMu.Unlock() + // we can only update the lastAddresses after we filter the transaction heights + a.txAddressesMu.Lock() + for _, txHash := range resp.TxHashes { + a.txAddresses[txHash] = append(a.txAddresses[txHash], resp.Address) + } + a.txAddressesMu.Unlock() + reporter.GetInstance().Logf("address %s has %d transactions", resp.Address, len(resp.TxHashes)) - if resp.HasTransactions() { - a.countMu.Lock() - a.lastAddresses[resp.Address.Change()] = Max(a.lastAddresses[resp.Address.Change()], resp.Address.Index()+a.lookahead) - a.countMu.Unlock() - } case resp, ok := <-txResponses: // channel is closed now, so ignore this case by blocking forever if !ok { @@ -285,6 +286,7 @@ func (a *Accounter) recvWork() { } reporter.GetInstance().IncTxFetched() + reporter.GetInstance().Logf("received tx: %s", resp.Hash) a.countMu.Lock() a.processedTxCount++ @@ -297,6 +299,15 @@ func (a *Accounter) recvWork() { vout: []vout{}, } a.transactions[resp.Hash] = tx + + a.txAddressesMu.Lock() + a.countMu.Lock() + for _, addr := range a.txAddresses[resp.Hash] { + a.lastAddresses[addr.Change()] = Max(a.lastAddresses[addr.Change()], addr.Index()+a.lookahead) + } + a.countMu.Unlock() + a.txAddressesMu.Unlock() + case <-time.Tick(1 * time.Second): if a.complete() { return diff --git a/accounter/accounter_test.go b/accounter/accounter_test.go index 6d67f88..46b9348 100644 --- a/accounter/accounter_test.go +++ b/accounter/accounter_test.go @@ -94,6 +94,7 @@ func TestComputeBalanceTestnet(t *testing.T) { deriver := deriver.NewAddressDeriver(Testnet, pubs, 1, "") b, err := backend.NewFixtureBackend("testdata/tpub_data.json") assert.NoError(t, err) + b.Start(1435169) a := New(b, deriver, 100, 1435169) assert.Equal(t, uint64(267893477), a.ComputeBalance()) diff --git a/accounter/testdata/tpub_data.json b/accounter/testdata/tpub_data.json index a375b29..2badadb 100644 --- a/accounter/testdata/tpub_data.json +++ b/accounter/testdata/tpub_data.json @@ -1,4 +1,7 @@ { + "metadata": { + "height": 1435169 + }, "addresses": [ { "address": "mfsNoNz57ANkYrCzHaLZDLoMGujBW8u3zv", diff --git a/backend/backend.go b/backend/backend.go index b98a943..0177385 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -28,8 +28,13 @@ import ( // forgo the Finish() method and have the Accounter read from the TxResponses channel until it has // all the data it needs. This would require the Accounter to maintain its own set of transactions. type Backend interface { + // Returns chain height. Possibly connects + disconnects from first node. ChainHeight() uint32 + // Gets backend ready to serve requests + Start(blockHeight uint32) error + + // Request-response channels AddrRequest(addr *deriver.Address) AddrResponses() <-chan *AddrResponse TxRequest(txHash string) @@ -37,6 +42,7 @@ type Backend interface { BlockRequest(height uint32) BlockResponses() <-chan *BlockResponse + // Call this to disconnect from nodes and cleanup Finish() } diff --git a/backend/btcd_backend.go b/backend/btcd_backend.go index 1b5a398..14cabc4 100644 --- a/backend/btcd_backend.go +++ b/backend/btcd_backend.go @@ -3,6 +3,7 @@ package backend import ( "fmt" "log" + "math" "sync" "github.com/btcsuite/btcd/btcjson" @@ -80,15 +81,10 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken return nil, errors.Errorf("Unexpected genesis block %s != %s", genesis.String(), GenesisBlock(network)) } - height, err := client.GetBlockCount() - if err != nil { - return nil, errors.Wrap(err, "could not connect to the Btcd server") - } - - b := &BtcdBackend{ + return &BtcdBackend{ client: client, network: network, - chainHeight: uint32(height), + chainHeight: 0, addrRequests: make(chan *deriver.Address, addrRequestsChanSize), addrResponses: make(chan *AddrResponse, addrRequestsChanSize), txRequests: make(chan string, 2*maxTxsPerAddr), @@ -99,13 +95,26 @@ func NewBtcdBackend(host, port, user, pass string, network Network) (*BtcdBacken blockHeightLookup: make(map[string]int64), cachedTransactions: make(map[string]*TxResponse), doneCh: make(chan bool), + }, nil +} + +func (b *BtcdBackend) ChainHeight() uint32 { + height, err := b.client.GetBlockCount() + PanicOnError(err) + if height <= 0 || height > math.MaxUint32 { + log.Panicf("invalid height: %d", height) } + return uint32(height) +} + +func (b *BtcdBackend) Start(blockHeight uint32) error { + b.chainHeight = blockHeight // launch for i := 0; i < concurrency; i++ { go b.processRequests() } - return b, nil + return nil } // AddrRequest schedules a request to the backend to lookup information related @@ -152,10 +161,6 @@ func (b *BtcdBackend) Finish() { b.client.Disconnect() } -func (b *BtcdBackend) ChainHeight() uint32 { - return b.chainHeight -} - func (b *BtcdBackend) processRequests() { for { select { diff --git a/backend/electrum/blockchain.go b/backend/electrum/blockchain.go index cb2dae5..f9e943b 100644 --- a/backend/electrum/blockchain.go +++ b/backend/electrum/blockchain.go @@ -226,7 +226,9 @@ func (n *Node) BlockchainAddressGetHistory(address string) ([]*Transaction, erro // https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get func (n *Node) BlockchainTransactionGet(txid string) (string, error) { var hex string - err := n.request("blockchain.transaction.get", []interface{}{txid, false}, &hex) + // some servers don't handle the second parameter (even though they advertise version 1.2) + // so we leave it out. + err := n.request("blockchain.transaction.get", []interface{}{txid}, &hex) return hex, err } diff --git a/backend/electrum_backend.go b/backend/electrum_backend.go index de8be67..5482653 100644 --- a/backend/electrum_backend.go +++ b/backend/electrum_backend.go @@ -33,7 +33,8 @@ import ( // balance and transaction history information for a given address. // ElectrumBackend implements Backend interface. type ElectrumBackend struct { - chainHeight uint32 + blockHeight uint32 + addr, port string // peer management nodeMu sync.RWMutex // mutex to guard reads/writes to nodes map @@ -77,12 +78,14 @@ var ( // NewElectrumBackend returns a new ElectrumBackend structs or errors. // Initially connects to 1 node. A background job handles connecting to // additional peers. The background job fails if there are no peers left. -func NewElectrumBackend(addr, port string, network Network) (*ElectrumBackend, error) { - +func NewElectrumBackend(addr, port string, network Network) *ElectrumBackend { // TODO: should the channels have k * maxPeers buffers? Each node needs to enqueue a // potentially large number of transactions. If all nodes are doing that at the same time, // there's a deadlock risk? - eb := &ElectrumBackend{ + return &ElectrumBackend{ + blockHeight: 0, + addr: addr, + port: port, nodes: make(map[string]*electrum.Node), blacklistedNodes: make(map[string]struct{}), network: network, @@ -97,18 +100,55 @@ func NewElectrumBackend(addr, port string, network Network) (*ElectrumBackend, e transactions: make(map[string]int64), doneCh: make(chan bool), } +} + +// Connect to a node without registering it, fetch height and disconnect. +func (eb *ElectrumBackend) ChainHeight() uint32 { + log.Printf("connecting to %s:%s", eb.addr, eb.port) + node, err := electrum.NewNode(eb.addr, eb.port, eb.network) + if err != nil { + PanicOnError(err) + } + defer node.Disconnect() + + // Get the server's features + feature, err := node.ServerFeatures() + if err != nil { + PanicOnError(err) + } + // Check genesis block + if feature.Genesis != GenesisBlock(eb.network) { + log.Panicf("incorrect genesis block") + } + // TODO: check pruning. Currently, servers currently don't prune, so it's fine to skip for now. + + // Check version + err = checkVersion(feature.Protocol) + if err != nil { + PanicOnError(err) + } - // Connect to a node to fetch the height - height, err := eb.getHeight(addr, port, network) + // Negotiate version + err = node.ServerVersion("1.2") + if err != nil { + PanicOnError(err) + } + + header, err := node.BlockchainHeadersSubscribe() if err != nil { - return nil, err + PanicOnError(err) } - eb.chainHeight = height + + return header.Height +} + +func (eb *ElectrumBackend) Start(blockHeight uint32) error { + eb.blockHeight = blockHeight // Connect to a node and handle requests - if err := eb.addNode(addr, port, network); err != nil { - fmt.Printf("failed to connect to initial node: %+v", err) - return nil, err + if err := eb.addNode(eb.addr, eb.port, eb.network); err != nil { + log.Printf("failed to connect to initial node: %+v", err) + return err } // goroutine to continuously fetch additional peers @@ -124,7 +164,7 @@ func NewElectrumBackend(addr, port string, network Network) (*ElectrumBackend, e } }() - return eb, nil + return nil } // AddrRequest schedules a request to the backend to lookup information related @@ -173,14 +213,11 @@ func (eb *ElectrumBackend) Finish() { // program is going to terminate soon anyways. } -func (eb *ElectrumBackend) ChainHeight() uint32 { - return eb.chainHeight -} - // Connect to a node and add it to the map of nodes func (eb *ElectrumBackend) addNode(addr, port string, network Network) error { ident := electrum.NodeIdent(addr, port) + // note: this code contains a TOCTOU bug. We risk connecting to the same node multiple times. eb.nodeMu.RLock() _, existsGood := eb.nodes[ident] _, existsBad := eb.blacklistedNodes[ident] @@ -422,8 +459,9 @@ func (eb *ElectrumBackend) processAddrRequest(node *electrum.Node, addr *deriver txHashes := make([]string, 0, len(txs)) for _, tx := range txs { - txHashes = append(txHashes, tx.Hash) - // fetch additional data if needed + if tx.Height > 0 && tx.Height <= eb.blockHeight { + txHashes = append(txHashes, tx.Hash) + } } eb.cacheTxs(txs) @@ -443,7 +481,7 @@ func (eb *ElectrumBackend) cacheTxs(txs []*electrum.Transaction) { for _, tx := range txs { height, exists := eb.transactions[tx.Hash] if exists && (height != int64(tx.Height)) { - log.Panicf("inconsistent cache: %s %d != %d", tx.Hash, height, tx.Height) + log.Panicf("inconsistent transactions cache: %s %d != %d", tx.Hash, height, tx.Height) } eb.transactions[tx.Hash] = int64(tx.Height) } @@ -495,19 +533,19 @@ func (eb *ElectrumBackend) findPeers() { func (eb *ElectrumBackend) addPeer(peer electrum.Peer) { if strings.HasSuffix(peer.Host, ".onion") { - log.Printf("skipping %s because of .onion\n", peer.Host) + log.Printf("skipping %s because of .onion", peer.Host) return } err := checkVersion(peer.Version) if err != nil { - log.Printf("skipping %s because of protocol version %s\n", peer.Host, peer.Version) + log.Printf("skipping %s because of protocol version %s", peer.Host, peer.Version) return } for _, feature := range peer.Features { if strings.HasPrefix(feature, "t") { go func(addr, feature string, network Network) { if err := eb.addNode(addr, feature, network); err != nil { - log.Printf("error on addNode: %+v\n", err) + log.Printf("error on addNode: %+v", err) } }(peer.IP, feature, eb.network) return @@ -517,11 +555,11 @@ func (eb *ElectrumBackend) addPeer(peer electrum.Peer) { if strings.HasPrefix(feature, "s") { go func(addr, feature string, network Network) { if err := eb.addNode(addr, feature, network); err != nil { - log.Printf("error on addNode: %+v\n", err) + log.Printf("error on addNode: %+v", err) } }(peer.IP, feature, eb.network) return } } - log.Printf("skipping %s because of feature mismatch: %+v\n", peer, peer.Features) + log.Printf("skipping %s because of feature mismatch: %+v", peer, peer.Features) } diff --git a/backend/electrum_backend_test.go b/backend/electrum_backend_test.go index 8f97a00..1d99aa6 100644 --- a/backend/electrum_backend_test.go +++ b/backend/electrum_backend_test.go @@ -2,28 +2,13 @@ package backend import ( "github.com/square/beancounter/backend/electrum" - "github.com/square/beancounter/deriver" . "github.com/square/beancounter/utils" "github.com/stretchr/testify/assert" "testing" ) func TestTransactionCache(t *testing.T) { - // TODO: refactor ElectrumBackend to make it easier to test - - eb := &ElectrumBackend{ - nodes: make(map[string]*electrum.Node), - blacklistedNodes: make(map[string]struct{}), - network: Testnet, - addrRequests: make(chan *deriver.Address, 2*maxPeers), - addrResponses: make(chan *AddrResponse, 2*maxPeers), - txRequests: make(chan string, 2*maxPeers), - txResponses: make(chan *TxResponse, 2*maxPeers), - - peersRequests: make(chan struct{}), - transactions: make(map[string]int64), - doneCh: make(chan bool), - } + eb := NewElectrumBackend("foobar", "1234", Testnet) tx1 := electrum.Transaction{Hash: "aaaaaa", Height: 100} tx2 := electrum.Transaction{Hash: "bbbbbb", Height: 100} diff --git a/backend/fixture_backend.go b/backend/fixture_backend.go index b29145b..51c6e08 100644 --- a/backend/fixture_backend.go +++ b/backend/fixture_backend.go @@ -45,7 +45,7 @@ type FixtureBackend struct { // NewFixtureBackend returns a new FixtureBackend structs or errors. func NewFixtureBackend(filepath string) (*FixtureBackend, error) { - cb := &FixtureBackend{ + fb := &FixtureBackend{ addrRequests: make(chan *deriver.Address, 10), addrResponses: make(chan *AddrResponse, 10), txRequests: make(chan string, 1000), @@ -65,12 +65,23 @@ func NewFixtureBackend(filepath string) (*FixtureBackend, error) { } defer f.Close() - if err := cb.loadFromFile(f); err != nil { + if err := fb.loadFromFile(f); err != nil { return nil, pkgerr.Wrap(err, "cannot load data from a fixture file") } - go cb.processRequests() - return cb, nil + return fb, nil +} + +func (fb *FixtureBackend) ChainHeight() uint32 { + return fb.height +} + +func (fb *FixtureBackend) Start(blockHeight uint32) error { + if fb.height < blockHeight { + log.Panicf("recorded height %d < %d", fb.height, blockHeight) + } + go fb.processRequests() + return nil } // AddrRequest schedules a request to the backend to lookup information related @@ -116,10 +127,6 @@ func (fb *FixtureBackend) Finish() { close(fb.doneCh) } -func (fb *FixtureBackend) ChainHeight() uint32 { - return fb.height -} - func (fb *FixtureBackend) processRequests() { for { select { diff --git a/backend/fixture_backend_test.go b/backend/fixture_backend_test.go index 7234f27..4f07c93 100644 --- a/backend/fixture_backend_test.go +++ b/backend/fixture_backend_test.go @@ -54,11 +54,8 @@ func TestNoAddress(t *testing.T) { fetchResults(b, &addrs, &txs, 100*time.Millisecond) - // we don't know if address is legit or not, we just know if the address shows up - // in the blockchain. The default behavior is just to return the address with no transactions - assert.Len(t, addrs, 1) - assert.False(t, addrs[0].HasTransactions()) - assert.Equal(t, "BAD_ADDRESS", addrs[0].Address.String()) + // The address has no transactions, so it gets pruned. + assert.Len(t, addrs, 0) assert.Len(t, txs, 0) } diff --git a/backend/recorder_backend.go b/backend/recorder_backend.go index 9e78bcc..9f23164 100644 --- a/backend/recorder_backend.go +++ b/backend/recorder_backend.go @@ -3,6 +3,7 @@ package backend import ( "encoding/json" "fmt" + "github.com/square/beancounter/utils" "os" "sort" "sync" @@ -40,7 +41,7 @@ type RecorderBackend struct { // RecorderBackend passes requests to another backend and ten records // address and transaction responses to a file. The file can later be used by a // FixtureBackend to reply those responses. -func NewRecorderBackend(b Backend, filepath string) (*RecorderBackend, error) { +func NewRecorderBackend(b Backend, filepath string) *RecorderBackend { rb := &RecorderBackend{ backend: b, addrResponses: make(chan *AddrResponse, addrRequestsChanSize), @@ -52,9 +53,18 @@ func NewRecorderBackend(b Backend, filepath string) (*RecorderBackend, error) { doneCh: make(chan bool), outputFilepath: filepath, } + return rb +} + +func (rb *RecorderBackend) ChainHeight() uint32 { + return rb.backend.ChainHeight() +} +func (rb *RecorderBackend) Start(blockHeight uint32) error { + err := rb.backend.Start(blockHeight) + utils.PanicOnError(err) go rb.processRequests() - return rb, nil + return nil } // AddrRequest schedules a request to the backend to lookup information related @@ -101,10 +111,6 @@ func (rb *RecorderBackend) Finish() { } } -func (rb *RecorderBackend) ChainHeight() uint32 { - return rb.backend.ChainHeight() -} - func (rb *RecorderBackend) processRequests() { backendAddrResponses := rb.backend.AddrResponses() backendTxResponses := rb.backend.TxResponses() diff --git a/blockfinder/blockfinder_test.go b/blockfinder/blockfinder_test.go index e730b84..9bfb7b8 100644 --- a/blockfinder/blockfinder_test.go +++ b/blockfinder/blockfinder_test.go @@ -10,6 +10,7 @@ import ( func TestFindBLock(t *testing.T) { b, err := backend.NewFixtureBackend("../fixtures/blocks.json") assert.NoError(t, err) + b.Start(546110) bf := New(b) height, median, timestamp := bf.Search(time.Unix(1533153600, 0)) diff --git a/main.go b/main.go index 6057e62..25f321c 100644 --- a/main.go +++ b/main.go @@ -96,8 +96,7 @@ func doKeytree() { // Check that all the addresses have the same prefix for i := 1; i < *keytreeN; i++ { if xpubs[0][0:4] != xpubs[i][0:4] { - fmt.Printf("Prefixes must match: %s %s\n", xpubs[0], xpubs[i]) - return + log.Panicf("Prefixes must match: %s %s", xpubs[0], xpubs[i]) } } @@ -143,8 +142,7 @@ func doFindAddr() { // Check that all the addresses have the same prefix for i := 1; i < *findAddrN; i++ { if xpubs[0][0:4] != xpubs[i][0:4] { - fmt.Printf("Prefixes must match: %s %s\n", xpubs[0], xpubs[i]) - return + log.Panicf("Prefixes must match: %s %s", xpubs[0], xpubs[i]) } } network := XpubToNetwork(xpubs[0]) @@ -163,7 +161,7 @@ func doFindAddr() { } } } - fmt.Printf("not found\n") + log.Panic("not found") } func doFindBlock() { @@ -206,6 +204,7 @@ func doComputeBalance() { if *computeBalanceType == "single-address" { fmt.Printf("Enter single address:\n") singleAddress, _ = reader.ReadString('\n') + singleAddress = strings.TrimSpace(singleAddress) network = AddressToNetwork(singleAddress) } else { for i := 0; i < *computeBalanceN; i++ { @@ -228,15 +227,18 @@ func doComputeBalance() { backend, err := computeBalanceBuildBackend(network) PanicOnError(err) - // If blockHeight is 0, we default to current height - 6. + // If blockHeight is 0, we default to current height - 5. + chainHeight := backend.ChainHeight() if *computeBalanceBlockHeight == 0 { - *computeBalanceBlockHeight = backend.ChainHeight() - minConfirmations + *computeBalanceBlockHeight = chainHeight - minConfirmations + 1 } - if *computeBalanceBlockHeight > backend.ChainHeight()-minConfirmations { - log.Panicf("blockHeight %d is too high (> %d - %d)", *computeBalanceBlockHeight, backend.ChainHeight(), minConfirmations) + if *computeBalanceBlockHeight > chainHeight-minConfirmations+1 { + log.Panicf("blockHeight %d is too high (> %d - %d + 1)", *computeBalanceBlockHeight, backend.ChainHeight(), minConfirmations) } fmt.Printf("Going to compute balance at %d\n", *computeBalanceBlockHeight) + backend.Start(*computeBalanceBlockHeight) + tb := accounter.New(backend, deriver, *computeBalanceLookahead, *computeBalanceBlockHeight) balance := tb.ComputeBalance() @@ -246,102 +248,72 @@ func doComputeBalance() { // TODO: copy-pasta func findBlockBuildBackend(network Network) (backend.Backend, error) { - var b backend.Backend - var err error switch *findBlockBackend { case "electrum": addr, port := GetDefaultServer(network, Electrum, *findBlockAddr) - b, err = backend.NewElectrumBackend(addr, port, network) - if err != nil { - return nil, err - } + return backend.NewElectrumBackend(addr, port, network), nil case "btcd": addr, port := GetDefaultServer(network, Btcd, *findBlockAddr) - b, err = backend.NewBtcdBackend(addr, port, *findBlockRpcUser, *findBlockRpcPass, network) - if err != nil { - return nil, err - } + return backend.NewBtcdBackend(addr, port, *findBlockRpcUser, *findBlockRpcPass, network) case "electrum-recorder": if *findBlockFixtureFile == "" { panic("electrum-recorder backend requires output --fixture-file.") } addr, port := GetDefaultServer(network, Electrum, *findBlockAddr) - b, err = backend.NewElectrumBackend(addr, port, network) - if err != nil { - return nil, err - } - b, err = backend.NewRecorderBackend(b, *findBlockFixtureFile) + b := backend.NewElectrumBackend(addr, port, network) + return backend.NewRecorderBackend(b, *findBlockFixtureFile), nil case "btcd-recorder": if *findBlockFixtureFile == "" { panic("btcd-recorder backend requires output --fixture-file.") } addr, port := GetDefaultServer(network, Btcd, *findBlockAddr) - b, err = backend.NewBtcdBackend(addr, port, *findBlockRpcUser, *findBlockRpcPass, network) + b, err := backend.NewBtcdBackend(addr, port, *findBlockRpcUser, *findBlockRpcPass, network) if err != nil { return nil, err } - b, err = backend.NewRecorderBackend(b, *findBlockFixtureFile) + return backend.NewRecorderBackend(b, *findBlockFixtureFile), nil case "fixture": if *findBlockFixtureFile == "" { panic("fixture backend requires input --fixture-file.") } - b, err = backend.NewFixtureBackend(*findBlockFixtureFile) - if err != nil { - return nil, err - } + return backend.NewFixtureBackend(*findBlockFixtureFile) default: return nil, fmt.Errorf("unreachable") } - return b, err } // TODO: return *backend.Backend, error instead? func computeBalanceBuildBackend(network Network) (backend.Backend, error) { - var b backend.Backend - var err error switch *computeBalanceBackend { case "electrum": addr, port := GetDefaultServer(network, Electrum, *computeBalanceAddr) - b, err = backend.NewElectrumBackend(addr, port, network) - if err != nil { - return nil, err - } + return backend.NewElectrumBackend(addr, port, network), nil case "btcd": addr, port := GetDefaultServer(network, Btcd, *computeBalanceAddr) - b, err = backend.NewBtcdBackend(addr, port, *computeBalanceRpcUser, *computeBalanceRpcPass, network) - if err != nil { - return nil, err - } + return backend.NewBtcdBackend(addr, port, *computeBalanceRpcUser, *computeBalanceRpcPass, network) case "electrum-recorder": if *computeBalanceFixtureFile == "" { panic("electrum-recorder backend requires output --fixture-file.") } addr, port := GetDefaultServer(network, Electrum, *computeBalanceAddr) - b, err = backend.NewElectrumBackend(addr, port, network) - if err != nil { - return nil, err - } - b, err = backend.NewRecorderBackend(b, *computeBalanceFixtureFile) + b := backend.NewElectrumBackend(addr, port, network) + return backend.NewRecorderBackend(b, *computeBalanceFixtureFile), nil case "btcd-recorder": if *computeBalanceFixtureFile == "" { panic("btcd-recorder backend requires output --fixture-file.") } addr, port := GetDefaultServer(network, Btcd, *computeBalanceAddr) - b, err = backend.NewBtcdBackend(addr, port, *computeBalanceRpcUser, *computeBalanceRpcPass, network) + b, err := backend.NewBtcdBackend(addr, port, *computeBalanceRpcUser, *computeBalanceRpcPass, network) if err != nil { return nil, err } - b, err = backend.NewRecorderBackend(b, *computeBalanceFixtureFile) + return backend.NewRecorderBackend(b, *computeBalanceFixtureFile), nil case "fixture": if *computeBalanceFixtureFile == "" { panic("fixture backend requires input --fixture-file.") } - b, err = backend.NewFixtureBackend(*computeBalanceFixtureFile) - if err != nil { - return nil, err - } + return backend.NewFixtureBackend(*computeBalanceFixtureFile) default: return nil, fmt.Errorf("unreachable") } - return b, err } diff --git a/reporter/reporter.go b/reporter/reporter.go index b547f03..95fffc6 100644 --- a/reporter/reporter.go +++ b/reporter/reporter.go @@ -14,7 +14,6 @@ type Reporter struct { addressesFetched uint32 txScheduled uint32 txFetched uint32 - txAfterFilter int32 peers int32 } @@ -29,8 +28,8 @@ func GetInstance() *Reporter { } func (r *Reporter) Log(msg string) { - fmt.Printf("%d/%d %d/%d/%d %d: %s\n", r.GetAddressesScheduled(), r.GetAddressesFetched(), - r.GetTxScheduled(), r.GetTxFetched(), r.GetTxAfterFilter(), r.GetPeers(), msg) + fmt.Printf("%d/%d %d/%d %d: %s\n", r.GetAddressesScheduled(), r.GetAddressesFetched(), + r.GetTxScheduled(), r.GetTxFetched(), r.GetPeers(), msg) } func (r *Reporter) Logf(format string, args ...interface{}) { @@ -45,10 +44,6 @@ func (r *Reporter) GetAddressesFetched() uint32 { return atomic.LoadUint32(&r.addressesFetched) } -func (r *Reporter) SetAddressesFetched(n uint32) { - atomic.StoreUint32(&r.addressesFetched, n) -} - func (r *Reporter) IncAddressesScheduled() { atomic.AddUint32(&r.addressesScheduled, 1) } @@ -57,10 +52,6 @@ func (r *Reporter) GetAddressesScheduled() uint32 { return atomic.LoadUint32(&r.addressesScheduled) } -func (r *Reporter) SetddressesScheduled(n uint32) { - atomic.StoreUint32(&r.addressesScheduled, n) -} - func (r *Reporter) IncTxFetched() { atomic.AddUint32(&r.txFetched, 1) } @@ -69,10 +60,6 @@ func (r *Reporter) GetTxFetched() uint32 { return atomic.LoadUint32(&r.txFetched) } -func (r *Reporter) SetTxFetched(n uint32) { - atomic.StoreUint32(&r.txFetched, n) -} - func (r *Reporter) IncTxScheduled() { atomic.AddUint32(&r.txScheduled, 1) } @@ -81,26 +68,6 @@ func (r *Reporter) GetTxScheduled() uint32 { return atomic.LoadUint32(&r.txScheduled) } -func (r *Reporter) SetTxScheduled(n uint32) { - atomic.StoreUint32(&r.txScheduled, n) -} - -func (r *Reporter) IncTxAfterFilter() { - atomic.AddInt32(&r.txAfterFilter, 1) -} - -func (r *Reporter) GetTxAfterFilter() int32 { - return atomic.LoadInt32(&r.txAfterFilter) -} - -func (r *Reporter) SetTxAfterFilter(n int32) { - atomic.StoreInt32(&r.txAfterFilter, n) -} - -func (r *Reporter) IncPeers() { - atomic.AddInt32(&r.peers, 1) -} - func (r *Reporter) GetPeers() int32 { return atomic.LoadInt32(&r.peers) } diff --git a/utils/utils.go b/utils/utils.go index 5038e86..b1d7c7a 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -67,7 +67,7 @@ func AddressToNetwork(addr string) Network { case 'n': return Testnet // pubkey hash case '2': - return Testnet //script hash + return Testnet // script hash case '1': return Mainnet // pubkey hash case '3':