Skip to content
This repository has been archived by the owner on Mar 2, 2023. It is now read-only.

Commit

Permalink
Merge pull request #23 from square/alok/find-block
Browse files Browse the repository at this point in the history
Implement code to map a timestamp -> block number
  • Loading branch information
mbyczkowski authored Oct 18, 2018
2 parents 72a9479 + eba2b1f commit 09b056d
Show file tree
Hide file tree
Showing 11 changed files with 1,210 additions and 39 deletions.
12 changes: 10 additions & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"github.com/square/beancounter/deriver"
time "time"
)

// Backend is an interface which abstracts different types of backends.
Expand All @@ -27,12 +28,14 @@ import (
// forgo the Finish() method and have the Accounter read from the TxResponses channel until it has
// all the data it needs. This would require the Accounter to maintain its own set of transactions.
type Backend interface {
ChainHeight() uint32

AddrRequest(addr *deriver.Address)
AddrResponses() <-chan *AddrResponse
TxRequest(txHash string)
TxResponses() <-chan *TxResponse

ChainHeight() uint32
BlockRequest(height uint32)
BlockResponses() <-chan *BlockResponse

Finish()
}
Expand All @@ -51,6 +54,11 @@ type TxResponse struct {
Hex string
}

type BlockResponse struct {
Height uint32
Timestamp time.Time
}

// HasTransactions returns true if the Response contains any transactions
func (r *AddrResponse) HasTransactions() bool {
return len(r.TxHashes) > 0
Expand Down
69 changes: 60 additions & 9 deletions backend/btcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
// balance and transaction history information for a given address.
// BtcdBackend implements Backend interface.
type BtcdBackend struct {
chainHeight uint32

client *rpcclient.Client
network utils.Network
blockHeightMu sync.Mutex // mutex to guard read/writes to blockHeightLookup map
Expand All @@ -28,7 +30,10 @@ type BtcdBackend struct {
txRequests chan string
txResponses chan *TxResponse

chainHeight uint32
// channels used to communicate with the Blockfinder
blockRequests chan uint32
blockResponses chan *BlockResponse

// internal channels
transactionsMu sync.Mutex // mutex to guard read/writes to transactions map
cachedTransactions map[string]*TxResponse
Expand All @@ -42,6 +47,7 @@ const (
maxTxsPerAddr = 1000

addrRequestsChanSize = 100
blockRequestChanSize = 100

concurrency = 100
)
Expand Down Expand Up @@ -72,21 +78,23 @@ func NewBtcdBackend(hostPort, user, pass string, network utils.Network) (*BtcdBa
if genesis.String() != utils.GenesisBlock(network) {
return nil, errors.New(fmt.Sprintf("Unexpected genesis block %s != %s", genesis.String(), utils.GenesisBlock(network)))
}
fmt.Printf("%+v\n", genesis)

height, err := client.GetBlockCount()
if err != nil {
return nil, errors.Wrap(err, "could not connect to the Btcd server")
}

b := &BtcdBackend{
client: client,
network: network,
chainHeight: uint32(height),
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txRequests: make(chan string, 2*maxTxsPerAddr),
txResponses: make(chan *TxResponse, 2*maxTxsPerAddr),
client: client,
network: network,
chainHeight: uint32(height),
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txRequests: make(chan string, 2*maxTxsPerAddr),
txResponses: make(chan *TxResponse, 2*maxTxsPerAddr),
blockRequests: make(chan uint32, 2*blockRequestChanSize),
blockResponses: make(chan *BlockResponse, 2*blockRequestChanSize),

blockHeightLookup: make(map[string]int64),
cachedTransactions: make(map[string]*TxResponse),
doneCh: make(chan bool),
Expand Down Expand Up @@ -129,6 +137,14 @@ func (b *BtcdBackend) TxResponses() <-chan *TxResponse {
return b.txResponses
}

func (b *BtcdBackend) BlockRequest(height uint32) {
b.blockRequests <- height
}

func (b *BtcdBackend) BlockResponses() <-chan *BlockResponse {
return b.blockResponses
}

// Finish informs the backend to stop doing its work.
func (b *BtcdBackend) Finish() {
close(b.doneCh)
Expand All @@ -152,6 +168,11 @@ func (b *BtcdBackend) processRequests() {
if err != nil {
panic(fmt.Sprintf("processTxRequest failed: %+v", err))
}
case block := <-b.blockRequests:
err := b.processBlockRequest(block)
if err != nil {
panic(fmt.Sprintf("processBlockRequest failed: %+v", err))
}
case <-b.doneCh:
break
}
Expand Down Expand Up @@ -234,6 +255,36 @@ func (b *BtcdBackend) processTxRequest(txHash string) error {
return nil
}

func (b *BtcdBackend) processBlockRequest(height uint32) error {
hash, err := b.client.GetBlockHash(int64(height))
if err != nil {
if jerr, ok := err.(*btcjson.RPCError); ok {
switch jerr.Code {
case btcjson.ErrRPCInvalidAddressOrKey:
return errors.Wrap(err, fmt.Sprintf("blockchain doesn't have block %d", height))
}
}
return errors.Wrap(err, fmt.Sprintf("could not fetch block %d", height))
}

header, err := b.client.GetBlockHeader(hash)
if err != nil {
if jerr, ok := err.(*btcjson.RPCError); ok {
switch jerr.Code {
case btcjson.ErrRPCInvalidAddressOrKey:
return errors.Wrap(err, fmt.Sprintf("blockchain doesn't have block %d", height))
}
}
return errors.Wrap(err, fmt.Sprintf("could not fetch block %d", height))
}

b.blockResponses <- &BlockResponse{
Height: height,
Timestamp: header.Timestamp,
}
return nil
}

func (b *BtcdBackend) cacheTxs(txs []*btcjson.SearchRawTransactionsResult) {
for _, tx := range txs {
b.transactionsMu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"github.com/square/beancounter/utils"
"time"
)

// index, address and transaction and helper structs used by recorder and fixture
Expand All @@ -11,6 +12,7 @@ type index struct {
Metadata metadata `json:"metadata"`
Addresses []address `json:"addresses"`
Transactions []transaction `json:"transactions"`
Blocks []block `json:"blocks"`
}

type metadata struct {
Expand Down Expand Up @@ -43,3 +45,8 @@ type byTransactionID []transaction
func (a byTransactionID) Len() int { return len(a) }
func (a byTransactionID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byTransactionID) Less(i, j int) bool { return a[i].Hash < a[j].Hash }

type block struct {
Height uint32 `json:"height"`
Timestamp time.Time `json:"timestamp"`
}
13 changes: 13 additions & 0 deletions backend/electrum/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ type Header struct {
Hex string `json:"hex"`
}

type Block struct {
Count uint `json:"count"`
Hex string `json:"hex"`
Max uint `json:"max"`
}

func NewNode(addr, port string, network utils.Network) (*Node, error) {
n := &Node{}
var a string
Expand Down Expand Up @@ -261,6 +267,13 @@ func (n *Node) ServerPeersSubscribe() ([]Peer, error) {
return out, nil
}

// BlockchainBlockHeaders returns a block header (160 hex).
func (n *Node) BlockchainBlockHeaders(height uint32, count uint) (Block, error) {
var block Block
err := n.request("blockchain.block.headers", []interface{}{height, count}, &block)
return block, err
}

func (n *Node) request(method string, params []interface{}, result interface{}) error {
msg := RequestMessage{
Id: atomic.AddUint64(&n.nextId, 1),
Expand Down
64 changes: 61 additions & 3 deletions backend/electrum_backend.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package backend

import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"github.com/btcsuite/btcd/wire"
"log"
"strconv"
"strings"
Expand All @@ -24,12 +27,14 @@ import (
// - has crossed the height we are interested in.
// - we then negotiate protocol v1.2
//
// A background goroutine continously connects to peers.
// A background goroutine continuously connects to peers.

// ElectrumBackend wraps Electrum node and its API to provide a simple
// balance and transaction history information for a given address.
// ElectrumBackend implements Backend interface.
type ElectrumBackend struct {
chainHeight uint32

// peer management
nodeMu sync.RWMutex // mutex to guard reads/writes to nodes map
nodes map[string]*electrum.Node
Expand All @@ -44,13 +49,15 @@ type ElectrumBackend struct {
txResponses chan *TxResponse
txRequests chan string

// channels used to communicate with the Blockfinder
blockRequests chan uint32
blockResponses chan *BlockResponse

// internal channels
peersRequests chan struct{}
transactionsMu sync.Mutex // mutex to guard read/writes to transactions map
transactions map[string]int64
doneCh chan bool

chainHeight uint32
}

const (
Expand Down Expand Up @@ -83,6 +90,8 @@ func NewElectrumBackend(addr, port string, network utils.Network) (*ElectrumBack
addrResponses: make(chan *AddrResponse, 2*maxPeers),
txRequests: make(chan string, 2*maxPeers),
txResponses: make(chan *TxResponse, 2*maxPeers),
blockRequests: make(chan uint32, 2*maxPeers),
blockResponses: make(chan *BlockResponse, 2*maxPeers),

peersRequests: make(chan struct{}),
transactions: make(map[string]int64),
Expand Down Expand Up @@ -148,6 +157,14 @@ func (eb *ElectrumBackend) TxResponses() <-chan *TxResponse {
return eb.txResponses
}

func (eb *ElectrumBackend) BlockRequest(height uint32) {
eb.blockRequests <- height
}

func (eb *ElectrumBackend) BlockResponses() <-chan *BlockResponse {
return eb.blockResponses
}

// Finish informs the backend to stop doing its work.
func (eb *ElectrumBackend) Finish() {
close(eb.doneCh)
Expand Down Expand Up @@ -292,6 +309,11 @@ func (eb *ElectrumBackend) processRequests(node *electrum.Node) {
if err != nil {
return
}
case block := <-eb.blockRequests:
err := eb.processBlockRequest(node, block)
if err != nil {
return
}
}
}
}
Expand Down Expand Up @@ -356,6 +378,42 @@ func (eb *ElectrumBackend) getTxHeight(txHash string) (int64, error) {
return height, nil
}

// note: we could be more efficient and batch things up.
func (eb *ElectrumBackend) processBlockRequest(node *electrum.Node, height uint32) error {
block, err := node.BlockchainBlockHeaders(height, 1)
if err != nil {
log.Printf("processBlockRequest failed with: %s, %+v", node.Ident, err)
eb.removeNode(node.Ident)

// requeue request
// TODO: we should have a retry counter and fail gracefully if an address fails too
// many times.
eb.blockRequests <- height
return err
}

// Decode hex to get Timestamp
b, err := hex.DecodeString(block.Hex)
if err != nil {
fmt.Printf("failed to unhex block %d: %s\n", height, block.Hex)
panic(err)
}

var blockHeader wire.BlockHeader
err = blockHeader.Deserialize(bytes.NewReader(b))
if err != nil {
fmt.Printf("failed to parse block %d: %s\n", height, block.Hex)
panic(err)
}

eb.blockResponses <- &BlockResponse{
Height: height,
Timestamp: blockHeader.Timestamp,
}

return nil
}

func (eb *ElectrumBackend) processAddrRequest(node *electrum.Node, addr *deriver.Address) error {
txs, err := node.BlockchainAddressGetHistory(addr.String())
if err != nil {
Expand Down
Loading

0 comments on commit 09b056d

Please sign in to comment.