Skip to content

Commit

Permalink
use coreutils/wallet event types
Browse files Browse the repository at this point in the history
  • Loading branch information
chris124567 committed Dec 7, 2024
1 parent 518fdb7 commit 8b7395c
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 380 deletions.
355 changes: 175 additions & 180 deletions explorer/events.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
)

require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/ip2location/ip2location-go v8.3.0+incompatible h1:QwUE+FlSbo6bjOWZpv2Grb57vJhWYFNPyBj2KCvfWaM=
github.com/ip2location/ip2location-go v8.3.0+incompatible/go.mod h1:3JUY1TBjTx1GdA7oRT7Zeqfc0bg3lMMuU5lXmzdpuME=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
Expand Down
5 changes: 4 additions & 1 deletion internal/testutil/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/explored/explorer"
Expand All @@ -13,7 +16,7 @@ import (
func Equal[T any](t *testing.T, desc string, expect, got T) {
t.Helper()

if !reflect.DeepEqual(expect, got) {
if !cmp.Equal(expect, got, cmpopts.EquateEmpty(), cmpopts.IgnoreUnexported(consensus.Work{})) {
t.Fatalf("expected %v %s, got %v", expect, desc, got)
}
}
Expand Down
135 changes: 41 additions & 94 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,51 @@ import (
"go.sia.tech/explored/explorer"
)

func scanEvent(tx *txn, s scanner) (ev explorer.Event, eventID int64, err error) {
var eventType string

err = s.Scan(&eventID, decode(&ev.ID), &ev.MaturityHeight, decode(&ev.Timestamp), &ev.Index.Height, decode(&ev.Index.ID), &eventType)
if err != nil {
return
}
// AddressEvents returns the events of a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit uint64) (events []explorer.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `
WITH last_chain_index (height) AS (
SELECT MAX(height) FROM blocks
)
SELECT
ev.id,
ev.event_id,
ev.maturity_height,
ev.date_created,
b.height,
b.id,
CASE
WHEN last_chain_index.height < b.height THEN 0
ELSE last_chain_index.height - b.height
END AS confirmations,
ev.event_type,
ev.event_data
FROM events ev INDEXED BY events_maturity_height_id_idx -- force the index to prevent temp-btree sorts
INNER JOIN event_addresses ea ON (ev.id = ea.event_id)
INNER JOIN address_balance sa ON (ea.address_id = sa.id)
INNER JOIN blocks b ON (ev.block_id = b.id)
CROSS JOIN last_chain_index
WHERE sa.address = $1
ORDER BY ev.maturity_height DESC, ev.id DESC
LIMIT $2 OFFSET $3`

switch eventType {
case explorer.EventTypeTransaction:
var txnID int64
var eventTx explorer.EventTransaction
err = tx.QueryRow(`SELECT transaction_id, fee FROM transaction_events WHERE event_id = ?`, eventID).Scan(&txnID, decode(&eventTx.Fee))
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction ID: %w", err)
}
txns, err := getTransactions(tx, map[int64]transactionID{0: {dbID: txnID, id: types.TransactionID(ev.ID)}})
if err != nil || len(txns) == 0 {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction: %w", err)
}
eventTx.Transaction = txns[0]
eventTx.HostAnnouncements = eventTx.Transaction.HostAnnouncements
ev.Data = &eventTx
case explorer.EventTypeV2Transaction:
var txnID int64
err = tx.QueryRow(`SELECT transaction_id FROM v2_transaction_events WHERE event_id = ?`, eventID).Scan(&txnID)
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch v2 transaction ID: %w", err)
}
txns, err := getV2Transactions(tx, []types.TransactionID{types.TransactionID(ev.ID)})
if err != nil || len(txns) == 0 {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch v2 transaction: %w", err)
}
eventTx := explorer.EventV2Transaction(txns[0])
ev.Data = &eventTx
case explorer.EventTypeContractPayout:
var m explorer.EventContractPayout
err = tx.QueryRow(`SELECT sce.output_id, sce.leaf_index, sce.maturity_height, sce.address, sce.value, fce.contract_id, fce.leaf_index, fce.filesize, fce.file_merkle_root, fce.window_start, fce.window_end, fce.payout, fce.unlock_hash, fce.revision_number, ev.missed
FROM contract_payout_events ev
JOIN siacoin_elements sce ON ev.output_id = sce.id
JOIN file_contract_elements fce ON ev.contract_id = fce.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), &m.SiacoinOutput.MaturityHeight, decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value), decode(&m.FileContract.ID), decode(&m.FileContract.StateElement.LeafIndex), decode(&m.FileContract.FileContract.Filesize), decode(&m.FileContract.FileContract.FileMerkleRoot), decode(&m.FileContract.FileContract.WindowStart), decode(&m.FileContract.FileContract.WindowEnd), decode(&m.FileContract.FileContract.Payout), decode(&m.FileContract.FileContract.UnlockHash), decode(&m.FileContract.FileContract.RevisionNumber), &m.Missed)
ev.Data = &m
case explorer.EventTypeMinerPayout:
var m explorer.EventMinerPayout
err = tx.QueryRow(`SELECT sc.output_id, sc.leaf_index, sc.maturity_height, sc.address, sc.value
FROM siacoin_elements sc
INNER JOIN miner_payout_events ev ON ev.output_id = sc.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), decode(&m.SiacoinOutput.MaturityHeight), decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value))
rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch miner payout event data: %w", err)
return err
}
ev.Data = &m
case explorer.EventTypeFoundationSubsidy:
var m explorer.EventFoundationSubsidy
err = tx.QueryRow(`SELECT sc.output_id, sc.leaf_index, sc.maturity_height, sc.address, sc.value
FROM siacoin_elements sc
INNER JOIN foundation_subsidy_events ev ON ev.output_id = sc.id
WHERE ev.event_id = ?`, eventID).Scan(decode(&m.SiacoinOutput.ID), decode(&m.SiacoinOutput.StateElement.LeafIndex), decode(&m.SiacoinOutput.MaturityHeight), decode(&m.SiacoinOutput.SiacoinOutput.Address), decode(&m.SiacoinOutput.SiacoinOutput.Value))
ev.Data = &m
default:
return explorer.Event{}, 0, fmt.Errorf("unknown event type: %s", eventType)
}

if err != nil {
return explorer.Event{}, 0, fmt.Errorf("failed to fetch transaction event data: %w", err)
}
defer rows.Close()

for rows.Next() {
event, _, err := scanEvent(rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}
event.Relevant = []types.Address{address}
events = append(events, event)
}
return rows.Err()
})
return
}

Expand Down Expand Up @@ -186,36 +163,6 @@ func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, off
return
}

// AddressEvents returns the events of a single address.
func (s *Store) AddressEvents(address types.Address, offset, limit uint64) (events []explorer.Event, err error) {
err = s.transaction(func(tx *txn) error {
const query = `SELECT ev.id, ev.event_id, ev.maturity_height, ev.date_created, ev.height, ev.block_id, ev.event_type
FROM events ev
INNER JOIN event_addresses ea ON ev.id = ea.event_id
INNER JOIN address_balance sa ON ea.address_id = sa.id
WHERE sa.address = $1
ORDER BY ev.maturity_height DESC, ev.id DESC
LIMIT $2 OFFSET $3`

rows, err := tx.Query(query, encode(address), limit, offset)
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
event, _, err := scanEvent(tx, rows)
if err != nil {
return fmt.Errorf("failed to scan event: %w", err)
}

events = append(events, event)
}
return rows.Err()
})
return
}

func scanSiacoinOutput(s scanner) (sco explorer.SiacoinOutput, err error) {
var spentIndex types.ChainIndex
err = s.Scan(decode(&sco.ID), decode(&sco.StateElement.LeafIndex), &sco.Source, decodeNull(&spentIndex), &sco.MaturityHeight, decode(&sco.SiacoinOutput.Address), decode(&sco.SiacoinOutput.Value))
Expand Down
79 changes: 14 additions & 65 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package sqlite
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/explored/explorer"
)

Expand Down Expand Up @@ -620,18 +620,18 @@ func addSiafundElements(tx *txn, index types.ChainIndex, spentElements, newEleme
return sfDBIds, nil
}

func addEvents(tx *txn, scDBIds map[types.SiacoinOutputID]int64, fcDBIds map[explorer.DBFileContract]int64, txnDBIds map[types.TransactionID]txnDBId, v2TxnDBIds map[types.TransactionID]txnDBId, events []explorer.Event) error {
func addEvents(tx *txn, events []wallet.Event, bid types.BlockID) error {
if len(events) == 0 {
return nil
}

insertEventStmt, err := tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, block_id, height) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (event_id) DO NOTHING RETURNING id`)
insertEventStmt, err := tx.Prepare(`INSERT INTO events (event_id, maturity_height, date_created, event_type, event_data, block_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (event_id) DO NOTHING RETURNING id`)
if err != nil {
return fmt.Errorf("failed to prepare event statement: %w", err)
}
defer insertEventStmt.Close()

addrStmt, err := tx.Prepare(`INSERT INTO address_balance (address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $3, 0) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`)
addrStmt, err := tx.Prepare(`INSERT INTO address_balance (address, siacoin_balance, immature_siacoin_balance, siafund_balance) VALUES ($1, $2, $2, 0) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`)
if err != nil {
return fmt.Errorf("failed to prepare address statement: %w", err)
}
Expand All @@ -643,84 +643,33 @@ func addEvents(tx *txn, scDBIds map[types.SiacoinOutputID]int64, fcDBIds map[exp
}
defer relevantAddrStmt.Close()

transactionEventStmt, err := tx.Prepare(`INSERT INTO transaction_events (event_id, transaction_id, fee) VALUES (?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare transaction event statement: %w", err)
}
defer transactionEventStmt.Close()

v2TransactionEventStmt, err := tx.Prepare(`INSERT INTO v2_transaction_events (event_id, transaction_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare v2 transaction event statement: %w", err)
}
defer v2TransactionEventStmt.Close()

minerPayoutEventStmt, err := tx.Prepare(`INSERT INTO miner_payout_events (event_id, output_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare miner payout event statement: %w", err)
}
defer minerPayoutEventStmt.Close()

contractPayoutEventStmt, err := tx.Prepare(`INSERT INTO contract_payout_events (event_id, output_id, contract_id, missed) VALUES (?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare contract payout event statement: %w", err)
}
defer contractPayoutEventStmt.Close()

foundationSubsidyEventStmt, err := tx.Prepare(`INSERT INTO foundation_subsidy_events (event_id, output_id) VALUES (?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare foundation subsidy event statement: %w", err)
}
defer foundationSubsidyEventStmt.Close()

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc := types.NewEncoder(&buf)
for _, event := range events {
buf.Reset()
if err := enc.Encode(event.Data); err != nil {
return fmt.Errorf("failed to encode event: %w", err)
ev, ok := event.Data.(types.EncoderTo)
if !ok {
panic("event data does not implement types.EncoderTo") // developer error
}
ev.EncodeTo(enc)
enc.Flush()

var eventID int64
err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Data.EventType(), encode(event.Index.ID), event.Index.Height).Scan(&eventID)
err = insertEventStmt.QueryRow(encode(event.ID), event.MaturityHeight, encode(event.Timestamp), event.Type, buf.Bytes(), encode(bid)).Scan(&eventID)
if errors.Is(err, sql.ErrNoRows) {
continue // skip if the event already exists
} else if err != nil {
return fmt.Errorf("failed to add event: %w", err)
}

switch v := event.Data.(type) {
case *explorer.EventTransaction:
dbID := txnDBIds[types.TransactionID(event.ID)].id
if _, err = transactionEventStmt.Exec(eventID, dbID, encode(v.Fee)); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case *explorer.EventV2Transaction:
dbID := v2TxnDBIds[types.TransactionID(event.ID)].id
if _, err = v2TransactionEventStmt.Exec(eventID, dbID); err != nil {
return fmt.Errorf("failed to insert transaction event: %w", err)
}
case *explorer.EventMinerPayout:
_, err = minerPayoutEventStmt.Exec(eventID, scDBIds[types.SiacoinOutputID(event.ID)])
case *explorer.EventContractPayout:
_, err = contractPayoutEventStmt.Exec(eventID, scDBIds[v.SiacoinOutput.ID], fcDBIds[explorer.DBFileContract{ID: v.FileContract.ID, RevisionNumber: v.FileContract.FileContract.RevisionNumber}], v.Missed)
case *explorer.EventFoundationSubsidy:
_, err = foundationSubsidyEventStmt.Exec(eventID, scDBIds[types.SiacoinOutputID(event.ID)])
default:
return errors.New("unknown event type")
}
if err != nil {
return fmt.Errorf("failed to insert %s event: %w", event.Data.EventType(), err)
}

used := make(map[types.Address]bool)
for _, addr := range event.Addresses {
for _, addr := range event.Relevant {
if used[addr] {
continue
}

var addressID int64
err = addrStmt.QueryRow(encode(addr), encode(types.ZeroCurrency), encode(types.ZeroCurrency)).Scan(&addressID)
err = addrStmt.QueryRow(encode(addr), encode(types.ZeroCurrency)).Scan(&addressID)
if err != nil {
return fmt.Errorf("failed to get address: %w", err)
}
Expand Down Expand Up @@ -1062,7 +1011,7 @@ func (ut *updateTx) ApplyIndex(state explorer.UpdateState) error {
return fmt.Errorf("ApplyIndex: failed to update metrics: %w", err)
} else if err := addHostAnnouncements(ut.tx, state.Block.Timestamp, state.HostAnnouncements, state.V2HostAnnouncements); err != nil {
return fmt.Errorf("ApplyIndex: failed to add host announcements: %w", err)
} else if err := addEvents(ut.tx, scDBIds, fcDBIds, txnDBIds, v2TxnDBIds, state.Events); err != nil {
} else if err := addEvents(ut.tx /*scDBIds, fcDBIds, txnDBIds, v2TxnDBIds,*/, state.Events, state.Block.ID()); err != nil {
return fmt.Errorf("ApplyIndex: failed to add events: %w", err)
} else if err := updateFileContractIndices(ut.tx, false, state.Metrics.Index, state.FileContractElements); err != nil {
return fmt.Errorf("ApplyIndex: failed to update file contract element indices: %w", err)
Expand Down
10 changes: 7 additions & 3 deletions persist/sqlite/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package sqlite_test

import (
"errors"
"log"
"math"
"path/filepath"
"reflect"
"testing"
"time"

Expand All @@ -12,6 +14,7 @@ import (
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
ctestutil "go.sia.tech/coreutils/testutil"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/explored/explorer"
"go.sia.tech/explored/internal/testutil"
"go.sia.tech/explored/persist/sqlite"
Expand Down Expand Up @@ -1619,10 +1622,11 @@ func TestHostAnnouncement(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if v, ok := events[0].Data.(*explorer.EventTransaction); !ok {
t.Fatal("expected EventTransaction")
log.Printf("%v", reflect.TypeOf(events[0].Data))
if v, ok := events[0].Data.(wallet.EventV1Transaction); !ok {
t.Fatal("expected EventV1Transaction")
} else {
testutil.CheckTransaction(t, txn1, v.Transaction)
testutil.Equal(t, "transaction", txn1, v.Transaction)
}
}

Expand Down
Loading

0 comments on commit 8b7395c

Please sign in to comment.