Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate renterd subscriber to coreutils types #1098

Merged
merged 99 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
a1ba198
chain: introduce subscriber
peterjan Mar 25, 2024
d2fbe63
stores: add TestApplyChainUpdate
peterjan Mar 25, 2024
fb39286
bus: fix interface
peterjan Mar 25, 2024
a8560e5
testing: fix TestWalletTransactions
peterjan Mar 25, 2024
9358313
subscriber: commit update on close
peterjan Mar 25, 2024
8e91afd
test: fix TestUploadDownloadSpending
peterjan Mar 25, 2024
4735104
stores: update interface
peterjan Mar 26, 2024
0e80949
chain: update subscriber
peterjan Mar 26, 2024
c6a1898
testing: remove TODOs
peterjan Mar 26, 2024
de5402e
subscriber: get rid of OnReorg
peterjan Mar 27, 2024
0b012bc
subscriber: re-add OnReorg
peterjan Mar 27, 2024
452c39e
subscriber: get rid of sync interval entirely
peterjan Mar 27, 2024
ad998b4
chain: handle state elements
peterjan Mar 28, 2024
ba97b55
chain: get rid of update in favour of exposing a transaction from the…
peterjan Apr 5, 2024
1acaabd
stores: remove chain test
peterjan Apr 5, 2024
13d5415
worker: undo change
peterjan Apr 5, 2024
2a03ff4
debug: add logging
peterjan Apr 5, 2024
e341c92
Tmp commit
peterjan Apr 16, 2024
4fe7400
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Apr 16, 2024
b1065c1
Merge branch 'pj/default-db-logger' into pj/subscription-api
peterjan Apr 16, 2024
2174d81
node: change start order
peterjan Apr 16, 2024
b04ef80
subscriber: add waitgroup
peterjan Apr 16, 2024
0f88914
testing: fix TestBusRecordedMetrics
peterjan Apr 16, 2024
eb055f2
subscriber: check for close inside retry loop
peterjan Apr 16, 2024
22e93ea
testing: avoid NDF in TestWalletRedistribute
peterjan Apr 16, 2024
df62a81
node: sleep after closing listener
peterjan Apr 16, 2024
4b20878
testing: fix TestBusRecordedMetrics
peterjan Apr 16, 2024
d0518d3
stores: add rollback
peterjan Apr 16, 2024
c133e85
testing: add debug logger
peterjan Apr 17, 2024
37b4a8e
testing: configure MySQL
peterjan Apr 17, 2024
4571ceb
subscriber: update concurrency
peterjan Apr 17, 2024
1a88963
go.mod: update coreutils dep
peterjan Apr 17, 2024
bf0cf3b
testing: test for contracts length
peterjan Apr 17, 2024
0923817
subscriber: fix index
peterjan Apr 17, 2024
b87aa78
testing: fix TestBusRecordedMetrics NDF
peterjan Apr 17, 2024
a523bed
testing: add logging
peterjan Apr 17, 2024
21fe4ba
debug: add logging
peterjan Apr 17, 2024
f249ee8
node: pass WithPeerDiscoveryInterval
peterjan Apr 17, 2024
74e248f
node: add syncer opts to bus config
peterjan Apr 17, 2024
1427824
testing: lower ap heartbeat
peterjan Apr 17, 2024
bdeeb92
testing: add debug logger
peterjan Apr 17, 2024
8e77a8d
testing: default to debug logger
peterjan Apr 17, 2024
3bdfc80
testing: update TestBlocklist
peterjan Apr 17, 2024
a962725
subscriber: update way we handle rollbacks
peterjan Apr 17, 2024
38ee174
testing: trigger autopilot on failure
peterjan Apr 17, 2024
9ce29bb
testing: wait for peers
peterjan Apr 17, 2024
2168a00
internal: use shorter tx retry intervals in testing
peterjan Apr 17, 2024
88e8bd2
testing: fix TestBusRecordedMetrics
peterjan Apr 17, 2024
1a24d21
testing: fix TestNewTestCluster NDF
peterjan Apr 17, 2024
9627b67
debug: add logging
peterjan Apr 17, 2024
8880d6b
subscriber: add logging
peterjan Apr 18, 2024
029f1f5
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan Apr 18, 2024
733bc31
testing: fix TestBusRecordedMetrics
peterjan Apr 18, 2024
310fbda
go.mod: upgrade coreutils dependency
peterjan Apr 18, 2024
5b47b7e
subscriber: add logging
peterjan Apr 18, 2024
fbd4c60
testing: retry stored data check
peterjan Apr 18, 2024
4af3312
testing: add micro sleep in MineBlocks
peterjan Apr 18, 2024
3cc5a9d
testing: update TestEphemeralAccounts
peterjan Apr 18, 2024
15adf40
stores: only update if revision number is smaller, fix NDF
peterjan Apr 18, 2024
4305ba2
stores: update contract
peterjan Apr 18, 2024
9820426
stores: improve perf in UpdateStateElements
peterjan Apr 19, 2024
8d52431
Merge branch 'its-happening' into pj/subscription-api
peterjan Apr 30, 2024
bfd11a6
stores: update ChainStore interface
peterjan Apr 30, 2024
f0cceb3
stores: remove height and block_id from wallet outputs
peterjan Apr 30, 2024
44613b2
ci: revert test.yml
peterjan Apr 30, 2024
c8cd78a
chain: rename manager.go to chain.go
peterjan Apr 30, 2024
c70c754
chain: make applyChainUpdate mirror revertChainUpdate
peterjan Apr 30, 2024
ecc005c
stores: fix UpdateStateElements performance
peterjan Apr 30, 2024
25c3ff1
ci: run test in loop and add logging
peterjan May 2, 2024
2a2198a
ci: fix test.yml
peterjan May 2, 2024
30b2611
stores: add logging
peterjan May 2, 2024
5379e82
stores: add logging to chainUpdateTx
peterjan May 2, 2024
2b7f6ba
subscriber: add debug logging
peterjan May 3, 2024
04b14f2
deps: update coreutils
peterjan May 3, 2024
2cc097c
stores: remove contract subscriber pattren
peterjan May 3, 2024
706c9d7
chain: avoid the error handling in ForEach
peterjan May 3, 2024
e189460
testing: use default logger in TestUnconfirmedContractArchival
peterjan May 3, 2024
8b22b75
testing: remove all NoOp loggers
peterjan May 3, 2024
094c962
testing: add TestContractExists
peterjan May 3, 2024
d189483
chain: add duration
peterjan May 3, 2024
0020c7a
testing: add context to sync issues
peterjan May 3, 2024
fe1fb70
autopilot: expose ContractConfirmationDeadline and use 144 in testing
peterjan May 3, 2024
8fc37a0
contractor: refresh contract metadata
peterjan May 3, 2024
85bf378
all: cleanup PR
peterjan May 3, 2024
893501e
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan May 3, 2024
fae31b0
ci: tmp disable stores test
peterjan May 3, 2024
5532b8e
chain: update subscriber
peterjan May 3, 2024
7485ceb
stores: add logging
peterjan May 3, 2024
42ca8b2
stors: add logging
peterjan May 3, 2024
fbb6b5a
bus: fix store contract update, add sync height
peterjan May 3, 2024
5366be3
lint: disable builtinShadow
peterjan May 3, 2024
00ffb04
ci: re-enable store tests
peterjan May 3, 2024
9671cc0
test: block after mining
peterjan May 6, 2024
7c6d1a0
test: upgrade host
peterjan May 7, 2024
98940a7
Merge branch 'its-happening' of github.com:SiaFoundation/renterd into…
peterjan May 7, 2024
783f480
all: implement CR remarks
peterjan May 7, 2024
accc3a1
testing: always mine and sync
peterjan May 10, 2024
620c9f7
chain: fix chain.db location and pass genesis block id
peterjan May 10, 2024
4560593
bus: use maxSyncTime to decide whether the bus is synced or not
peterjan May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions chain/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
ChainStore interface {
ApplyChainUpdate(ctx context.Context, cu *Update) error
ChainIndex() (types.ChainIndex, error)
WalletStateElements(ctx context.Context) ([]types.StateElement, error)
}

ContractStore interface {
Expand All @@ -51,9 +52,10 @@ type (
syncSig chan struct{}
csUnsubscribeFn func()

mu sync.Mutex
closedChan chan struct{}
knownContracts map[types.FileContractID]api.ContractState
mu sync.Mutex
closedChan chan struct{}
knownContracts map[types.FileContractID]api.ContractState
knownStateElements map[types.Hash256]types.StateElement
}
)

Expand All @@ -73,14 +75,28 @@ func NewSubscriber(cm ChainManager, cs ChainStore, contracts ContractStore, wall

syncSig: make(chan struct{}, 1),

closedChan: make(chan struct{}),
closedChan: make(chan struct{}),
knownStateElements: make(map[types.Hash256]types.StateElement),
}

// make sure we don't hang
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

// subscribe ourselves to receive new contract ids
subscriber.knownContracts, subscriber.csUnsubscribeFn, err = contracts.AddContractStoreSubscriber(context.Background(), subscriber)
subscriber.knownContracts, subscriber.csUnsubscribeFn, err = contracts.AddContractStoreSubscriber(ctx, subscriber)
if err != nil {
return nil, err
}

// fetch all state elements from the database
elements, err := cs.WalletStateElements(ctx)
if err != nil {
return nil, err
}
for _, el := range elements {
subscriber.knownStateElements[types.Hash256(el.ID)] = el
}

return subscriber, nil
}
Expand Down Expand Up @@ -377,12 +393,12 @@ func (cs *Subscriber) sync(index types.ChainIndex) error {
return fmt.Errorf("failed to fetch updates: %w", err)
}

// aggregate updates into one chain update
cu := NewChainUpdate()

// lock the subscriber while we apply the updates
cs.mu.Lock()

// aggregate updates into one chain update
cu := NewChainUpdate(cs.knownStateElements)
peterjan marked this conversation as resolved.
Show resolved Hide resolved

// revert chain updates
for _, cru := range crus {
cs.revertContractUpdate(cu, cru)
Expand Down
19 changes: 14 additions & 5 deletions chain/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (

type (
Update struct {
Index types.ChainIndex

Index types.ChainIndex
StateElements map[types.Hash256]types.StateElement
ContractUpdates map[types.FileContractID]*ContractUpdate
HostUpdates map[types.PublicKey]HostUpdate
WalletOutputUpdates map[types.Hash256]WalletOutputUpdate
WalletEventUpdates []WalletEventUpdate
RevertIndices []types.ChainIndex
}

ContractUpdate struct {
Expand Down Expand Up @@ -48,8 +49,9 @@ type (
)

// NewChainUpdate returns a new ChainUpdate.
func NewChainUpdate() *Update {
func NewChainUpdate(elements map[types.Hash256]types.StateElement) *Update {
return &Update{
StateElements: elements,
ContractUpdates: make(map[types.FileContractID]*ContractUpdate),
HostUpdates: make(map[types.PublicKey]HostUpdate),
WalletOutputUpdates: make(map[types.Hash256]WalletOutputUpdate),
Expand Down Expand Up @@ -105,12 +107,17 @@ func (cu *Update) RemoveSiacoinElements(ids []types.SiacoinOutputID) error {
// WalletStateElements returns all state elements in the database. It is used
// to update the proofs of all state elements affected by the update.
func (cu *Update) WalletStateElements() (elements []types.StateElement, _ error) {
peterjan marked this conversation as resolved.
Show resolved Hide resolved
// copy the output updates to the state
for id, el := range cu.WalletOutputUpdates {
elements = append(elements, types.StateElement{
cu.StateElements[id] = types.StateElement{
ID: id,
LeafIndex: el.Element.LeafIndex,
MerkleProof: el.Element.MerkleProof,
})
}
}
// loop over the state
for _, se := range cu.StateElements {
elements = append(elements, se)
}
return
}
Expand Down Expand Up @@ -146,5 +153,7 @@ func (cu *Update) RevertIndex(index types.ChainIndex) error {
}
}

// add the index to the list of reverted indices
cu.RevertIndices = append(cu.RevertIndices, index)
return nil
peterjan marked this conversation as resolved.
Show resolved Hide resolved
}
41 changes: 41 additions & 0 deletions stores/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func (s *SQLStore) ApplyChainUpdate(ctx context.Context, cu *chain.Update) error
}
}

// process reverts
for _, reverted := range cu.RevertIndices {
if err := revertIndex(tx, reverted); err != nil {
return fmt.Errorf("%w; failed to revert index %d", err, reverted.Height)
}
}

// update chain index
if err := updateChainIndex(tx, cu.Index); err != nil {
return fmt.Errorf("%w; failed to update chain index", err)
Expand All @@ -57,6 +64,33 @@ func (s *SQLStore) ApplyChainUpdate(ctx context.Context, cu *chain.Update) error
})
}

// WalletStateElements implements the ChainStore interface and returns all state
// elements in the database.
func (s *SQLStore) WalletStateElements(ctx context.Context) ([]types.StateElement, error) {
type row struct {
ID hash256
LeafIndex uint64
MerkleProof merkleProof
}
var rows []row
if err := s.db.WithContext(ctx).
Model(&dbWalletOutput{}).
Select("output_id AS id", "leaf_index", "merkle_proof").
Find(&rows).
Error; err != nil {
return nil, err
}
elements := make([]types.StateElement, 0, len(rows))
for _, r := range rows {
elements = append(elements, types.StateElement{
ID: types.Hash256(r.ID),
LeafIndex: r.LeafIndex,
MerkleProof: r.MerkleProof.proof,
})
}
return elements, nil
}

func updateHosts(tx *gorm.DB, ann map[types.PublicKey]chain.HostUpdate) error {
if len(ann) == 0 {
return nil
Expand Down Expand Up @@ -207,3 +241,10 @@ func updateWalletEvent(tx *gorm.DB, weu chain.WalletEventUpdate) error {
Delete(&dbWalletEvent{}).
Error
}

func revertIndex(tx *gorm.DB, index types.ChainIndex) error {
if err := tx.Model(&dbWalletEvent{}).Where("height = ? AND block_id = ?", index.Height, hash256(index.ID)).Delete(&dbWalletEvent{}).Error; err != nil {
return err
}
return tx.Model(&dbWalletOutput{}).Where("height = ? AND block_id = ?", index.Height, hash256(index.ID)).Delete(&dbWalletOutput{}).Error
}
6 changes: 3 additions & 3 deletions stores/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestApplyChainUpdate(t *testing.T) {
// prepare chain update
hk := types.PublicKey{1}
want := types.ChainIndex{Height: 1, ID: types.BlockID{1}}
cu := chain.NewChainUpdate()
cu := chain.NewChainUpdate(nil)
cu.HostUpdates[hk] = chain.HostUpdate{
Announcement: chain.HostAnnouncement{NetAddress: "foo.com:1000"},
BlockHeight: want.Height,
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestApplyChainUpdate(t *testing.T) {
}

// update the contract
cu = chain.NewChainUpdate()
cu = chain.NewChainUpdate(nil)
cu.Index = types.ChainIndex{Height: 2, ID: types.BlockID{2}}
cu.ContractUpdates[types.FileContractID{1}] = &chain.ContractUpdate{State: api.ContractStateActive}
err = db.ApplyChainUpdate(context.Background(), cu)
Expand All @@ -72,7 +72,7 @@ func TestApplyChainUpdate(t *testing.T) {
}

// add wallet output & event
cu = chain.NewChainUpdate()
cu = chain.NewChainUpdate(nil)
cu.Index = types.ChainIndex{Height: 3, ID: types.BlockID{3}}
cu.WalletOutputUpdates[types.Hash256{1}] = chain.WalletOutputUpdate{Addition: true, Element: wallet.SiacoinElement{
SiacoinElement: types.SiacoinElement{
Expand Down
Loading