Skip to content

Commit

Permalink
stores: fix subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 22, 2024
1 parent b6394c2 commit 72ccf8c
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 530 deletions.
41 changes: 27 additions & 14 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
Expand Down Expand Up @@ -404,10 +405,18 @@ func (b *bus) consensusAcceptBlock(jc jape.Context) {

// TODO: should we extend the API with a way to accept multiple blocks at once?
// TODO: should we deprecate this route in favor of /addblocks

if jc.Check("failed to accept block", b.cm.AddBlocks([]types.Block{block})) != nil {
return
}

if block.V2 == nil {
b.s.BroadcastHeader(gateway.BlockHeader{
ParentID: block.ParentID,
Nonce: block.Nonce,
Timestamp: block.Timestamp,
MerkleRoot: block.MerkleRoot(),
})
}
}

func (b *bus) syncerAddrHandler(jc jape.Context) {
Expand All @@ -418,7 +427,6 @@ func (b *bus) syncerAddrHandler(jc jape.Context) {
func (b *bus) syncerPeersHandler(jc jape.Context) {
var peers []string
for _, p := range b.s.Peers() {
fmt.Println(p.String())
peers = append(peers, p.String())
}
jc.Encode(peers)
Expand Down Expand Up @@ -453,11 +461,17 @@ func (b *bus) txpoolTransactionsHandler(jc jape.Context) {

func (b *bus) txpoolBroadcastHandler(jc jape.Context) {
var txnSet []types.Transaction
if jc.Decode(&txnSet) == nil {
// TODO: should we handle 'known' return value
_, err := b.cm.AddPoolTransactions(txnSet)
jc.Check("couldn't broadcast transaction set", err)
if jc.Decode(&txnSet) != nil {
return
}

// TODO: should we handle 'known' return value
_, err := b.cm.AddPoolTransactions(txnSet)
if jc.Check("couldn't broadcast transaction set", err) != nil {
return
}

b.s.BroadcastTransactionSet(txnSet)
}

func (b *bus) bucketsHandlerGET(jc jape.Context) {
Expand Down Expand Up @@ -603,15 +617,13 @@ func (b *bus) walletFundHandler(jc jape.Context) {
}
txn := wfr.Transaction

// TODO: add fee
// if len(txn.MinerFees) == 0 {
// // if no fees are specified, we add some
// fee := b.cm.RecommendedFee().Mul64(b.cm.TipState().TransactionWeight(txn))
// txn.MinerFees = []types.Currency{fee}
// }
if len(txn.MinerFees) == 0 {
// if no fees are specified, we add some
fee := b.cm.RecommendedFee().Mul64(b.cm.TipState().TransactionWeight(txn))
txn.MinerFees = []types.Currency{fee}
}

amount := wfr.Amount // .Add(txn.MinerFees[0])
toSign, err := b.w.FundTransaction(&txn, amount, wfr.UseUnconfirmedTxns)
toSign, err := b.w.FundTransaction(&txn, wfr.Amount.Add(txn.MinerFees[0]), wfr.UseUnconfirmedTxns)
if jc.Check("couldn't fund transaction", err) != nil {
return
}
Expand Down Expand Up @@ -697,6 +709,7 @@ func (b *bus) walletPrepareFormHandler(jc jape.Context) {
if jc.Check("couldn't fund transaction", err) != nil {
return
}

b.w.SignTransaction(&txn, toSign, ExplicitCoveredFields(txn))

// TODO: UnconfirmedParents needs a ctx (be sure to release inputs on err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/montanaflynn/stats v0.7.1
gitlab.com/NebulousLabs/encoding v0.0.0-20200604091946-456c3dc907fe
go.sia.tech/core v0.2.1
go.sia.tech/coreutils v0.0.2
go.sia.tech/coreutils v0.0.3
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2
go.sia.tech/hostd v1.0.2-beta.2.0.20240131203318-9d84aad6ef13
go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ github.com/aws/aws-sdk-go v1.50.1 h1:AwnLUM7TcH9vMZqA4TcDKmGfLmDW5VXwT5tPH6kXylo
github.com/aws/aws-sdk-go v1.50.1/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/br0xen/boltbrowser v0.0.0-20230531143731-fcc13603daaf h1:NyqdH+vWNYPwQIK9jNv7sdIVbRGclwIdFhQk3+qlNEs=
github.com/br0xen/boltbrowser v0.0.0-20230531143731-fcc13603daaf/go.mod h1:uhjRwoqgy4g6fCwo7OJHjCxDOmx/YSCz2rnAYb63ZhY=
github.com/br0xen/termbox-util v0.0.0-20170904143325-de1d4c83380e/go.mod h1:x9wJlgOj74OFTOBwXOuO8pBguW37EgYNx51Dbjkfzo4=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/cloudflare-go v0.86.0 h1:jEKN5VHNYNYtfDL2lUFLTRo+nOVNPFxpXTstVx0rqHI=
Expand Down Expand Up @@ -130,6 +133,7 @@ github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZb
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand All @@ -149,6 +153,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nsf/termbox-go v1.1.1/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -191,9 +196,14 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -239,12 +249,16 @@ gitlab.com/NebulousLabs/threadgroup v0.0.0-20200608151952-38921fbef213/go.mod h1
gitlab.com/NebulousLabs/writeaheadlog v0.0.0-20200618142844-c59a90f49130/go.mod h1:SxigdS5Q1ui+OMgGAXt1E/Fg3RB6PvKXMov2O3gvIzs=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY=
go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/coreutils v0.0.2 h1:vDqMDM6dW6b/R3sO1ycr8fAnJXUiAvrzxehEIq/AsKA=
go.sia.tech/coreutils v0.0.2/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0=
go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y=
go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg=
go.sia.tech/hostd v1.0.2-beta.2.0.20240131203318-9d84aad6ef13 h1:JcyVUtJfzeMh+zJAW20BMVhBYekg+h0T8dMeF7GzAFs=
Expand Down Expand Up @@ -332,8 +346,10 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, logger *zap.Logger
UniqueID: gateway.GenerateUniqueID(),
NetAddress: syncerAddr,
}
s := syncer.New(l, cm, sqlStore, header)
s := syncer.New(l, cm, sqlStore, header, syncer.WithSyncInterval(100*time.Millisecond), syncer.WithLogger(logger.Named("syncer")))

b, err := bus.New(alertsMgr, wh, cm, s, w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, logger)
if err != nil {
Expand Down
37 changes: 29 additions & 8 deletions internal/testing/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,10 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster {

// Fund the bus.
if funding {
cluster.MineBlocks(latestHardforkHeight + 144)
// TODO: latest hardforkheight is wrong here, and we should be able to
// mine 144 passed that height but if we don't mine more we get
// "invalid: siacoin input 25 has immature parent"
cluster.MineBlocks(latestHardforkHeight + 200)
tt.Retry(1000, 100*time.Millisecond, func() error {
resp, err := busClient.ConsensusState(ctx)
if err != nil {
Expand Down Expand Up @@ -610,7 +613,7 @@ func (c *TestCluster) MineToRenewWindow() {
if cs.BlockHeight >= renewWindowStart {
c.tt.Fatalf("already in renew window: bh: %v, currentPeriod: %v, periodLength: %v, renewWindow: %v", cs.BlockHeight, ap.CurrentPeriod, ap.Config.Contracts.Period, renewWindowStart)
}
c.MineBlocks(int(renewWindowStart - cs.BlockHeight))
c.MineBlocks(renewWindowStart - cs.BlockHeight)
c.Sync()
}

Expand Down Expand Up @@ -649,7 +652,7 @@ func (c *TestCluster) synced(hosts []*Host) (bool, error) {
}

// MineBlocks uses the bus' miner to mine n blocks.
func (c *TestCluster) MineBlocks(n int) {
func (c *TestCluster) MineBlocks(n uint64) {
c.tt.Helper()
wallet, err := c.Bus.Wallet(context.Background())
c.tt.OK(err)
Expand All @@ -658,10 +661,12 @@ func (c *TestCluster) MineBlocks(n int) {
if len(c.hosts) == 0 {
c.tt.OK(c.mineBlocks(wallet.Address, n))
c.Sync()
return
}

// Otherwise mine blocks in batches of 3 to avoid going out of sync with
// hosts by too many blocks.
for mined := 0; mined < n; {
for mined := uint64(0); mined < n; {
toMine := n - mined
if toMine > 10 {
toMine = 10
Expand Down Expand Up @@ -785,7 +790,15 @@ func (c *TestCluster) AddHost(h *Host) {
res, err := c.Bus.Wallet(context.Background())
c.tt.OK(err)

fundAmt := res.Confirmed.Div64(2).Div64(uint64(len(c.hosts))) // 50% of bus balance
// Fund 1MS
fundAmt := types.Siacoins(1e6)
for fundAmt.Cmp(res.Confirmed) > 0 {
fundAmt = fundAmt.Div64(2)
if fundAmt.Cmp(types.Siacoins(1)) < 0 {
c.tt.Fatal("not enough funds to fund host")
}
}

var scos []types.SiacoinOutput
for i := 0; i < 10; i++ {
scos = append(scos, types.SiacoinOutput{
Expand Down Expand Up @@ -941,8 +954,8 @@ func (c *TestCluster) waitForHostContracts(hosts map[types.PublicKey]struct{}) {
})
}

func (c *TestCluster) mineBlocks(addr types.Address, n int) error {
for i := 0; i < n; i++ {
func (c *TestCluster) mineBlocks(addr types.Address, n uint64) error {
for i := uint64(0); i < n; i++ {
if block, found := coreutils.MineBlock(c.cm, addr, time.Second); !found {
return errors.New("failed to find block")
} else if err := c.Bus.AcceptBlock(context.Background(), block); err != nil {
Expand All @@ -966,7 +979,15 @@ func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) {
func testNetwork() (*consensus.Network, types.Block) {
// use a modified version of Zen
n, genesis := chain.TestnetZen()
n.InitialTarget = types.BlockID{0xFF}

// we have to set the initial target to 128 to ensure blocks we mine match
// the PoW testnet in siad testnet consensu
n.InitialTarget = types.BlockID{0x80}

// we have to make minimum coinbase get hit after 10 blocks to ensure we
// match the siad test network settings, otherwise the blocksubsidy is
// considered invalid after 10 blocks
n.MinimumCoinbase = types.Siacoins(299990)
n.HardforkDevAddr.Height = 1
n.HardforkTax.Height = 1
n.HardforkStorageProof.Height = 1
Expand Down
4 changes: 2 additions & 2 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewTestCluster(t *testing.T) {
// revision first.
cs, err := cluster.Bus.ConsensusState(context.Background())
tt.OK(err)
cluster.MineBlocks(int(contract.WindowStart - cs.BlockHeight - 4))
cluster.MineBlocks(contract.WindowStart - cs.BlockHeight - 4)
cluster.Sync()
if cs.LastBlockTime.IsZero() {
t.Fatal("last block time not set")
Expand Down Expand Up @@ -1341,7 +1341,7 @@ func TestContractArchival(t *testing.T) {
endHeight := contracts[0].WindowEnd
cs, err := cluster.Bus.ConsensusState(context.Background())
tt.OK(err)
cluster.MineBlocks(int(endHeight - cs.BlockHeight + 1))
cluster.MineBlocks(endHeight - cs.BlockHeight + 1)

// check that we have 0 contracts
tt.Retry(100, 100*time.Millisecond, func() error {
Expand Down
33 changes: 0 additions & 33 deletions stores/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,39 +906,6 @@ func (ss *SQLStore) RecordPriceTables(ctx context.Context, priceTableUpdate []ho
})
}

func (ss *SQLStore) processConsensusChangeHostDB(cc modules.ConsensusChange) {
height := uint64(cc.InitialHeight())
for range cc.RevertedBlocks {
height--
}

var newAnnouncements []announcement
for _, sb := range cc.AppliedBlocks {
var b types.Block
convertToCore(sb, (*types.V1Block)(&b))

// Process announcements, but only if they are not too old.
if b.Timestamp.After(time.Now().Add(-ss.announcementMaxAge)) {
chain.ForEachHostAnnouncement(types.Block(b), func(hk types.PublicKey, ha chain.HostAnnouncement) {
if ha.NetAddress == "" {
return
}
newAnnouncements = append(newAnnouncements, announcement{
blockHeight: height,
blockID: b.ID(),
hk: hk,
timestamp: b.Timestamp,
HostAnnouncement: ha,
})
ss.unappliedHostKeys[hk] = struct{}{}
})
}
height++
}

ss.unappliedAnnouncements = append(ss.unappliedAnnouncements, newAnnouncements...)
}

// excludeBlocked can be used as a scope for a db transaction to exclude blocked
// hosts.
func (ss *SQLStore) excludeBlocked(db *gorm.DB) *gorm.DB {
Expand Down
Loading

0 comments on commit 72ccf8c

Please sign in to comment.