Skip to content

Commit

Permalink
transactions cache
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Sep 4, 2024
1 parent 4301d72 commit 48f4b67
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
33 changes: 20 additions & 13 deletions data/psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

type Psql struct {
db *sql.DB
tx *sql.Tx
logger *zap.Logger
db *sql.DB
tx *sql.Tx
logger *zap.Logger
TransactionIDs map[string]int64
}

type PsqlInfo struct {
Expand Down Expand Up @@ -66,17 +67,22 @@ func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err erro
}

func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTransactionID int64, err error) {
//todo: create a transaction cache
rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash)
p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash))
if err != nil {
return 0, fmt.Errorf("selecting transaction: %w", err)
}
defer rows.Close()

if rows.Next() {
err = rows.Scan(&dbTransactionID)
return
////todo: create a transaction cache
//rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash)
//p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash))
//if err != nil {
// return 0, fmt.Errorf("selecting transaction: %w", err)
//}
//defer rows.Close()
//
//if rows.Next() {
// err = rows.Scan(&dbTransactionID)
// return
//}

if id, found := p.TransactionIDs[transactionHash]; found {
return id, nil
}

row := p.tx.QueryRow("INSERT INTO hivemapper.transactions (hash, block_id) VALUES ($1, $2) RETURNING id", transactionHash, dbBlockID)
Expand All @@ -85,6 +91,7 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra
return 0, fmt.Errorf("inserting transaction: %w", err)
}

p.TransactionIDs[transactionHash] = dbTransactionID
err = row.Scan(&dbTransactionID)
return
}
Expand Down
1 change: 1 addition & 0 deletions data/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
s.blockSecCount++
s.lastClock = data.Clock
hasTransaction := false
s.db.TransactionIDs = map[string]int64{}

defer func() {
s.Sinker.AverageBlockTimeProcessing().Add(time.Since(startTime).Milliseconds())
Expand Down

0 comments on commit 48f4b67

Please sign in to comment.