diff --git a/data/psql.go b/data/psql.go index 40ec74a..47a4808 100644 --- a/data/psql.go +++ b/data/psql.go @@ -14,8 +14,27 @@ import ( ) type PreparedStatement struct { - insertMint *sql.Stmt - insertTransaction *sql.Stmt + insertMint *sql.Stmt + insertTransaction *sql.Stmt + insertBlock *sql.Stmt + insertDerivedAddress *sql.Stmt + selectDerivedAddress *sql.Stmt + selectDriver *sql.Stmt + insertDriver *sql.Stmt + selectFleet *sql.Stmt + insertFleet *sql.Stmt + selectFleetDriver *sql.Stmt + insertFleetDriver *sql.Stmt + insertPayment *sql.Stmt + insertSlipPayment *sql.Stmt + insertTransfer *sql.Stmt + insertAIPayment *sql.Stmt + insertOperationalPayment *sql.Stmt + insertRewardPayment *sql.Stmt + insertMapCreate *sql.Stmt + insertMapConsumption *sql.Stmt + insertBurn *sql.Stmt + insertCursor *sql.Stmt } var preparedStatement *PreparedStatement @@ -59,9 +78,105 @@ func NewPostgreSQL(psqlInfo *PsqlInfo, logger *zap.Logger) *Psql { panic(err) } + insertBlock, err := db.Prepare("INSERT INTO hivemapper.blocks (number, hash, timestamp) VALUES ($1, $2, $3) RETURNING id") + if err != nil { + panic(err) + } + insertDerivedAddress, err := db.Prepare("INSERT INTO hivemapper.derived_addresses (transaction_id, address, derivedAddress) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") + if err != nil { + panic(err) + } + selectDerivedAddress, err := db.Prepare("SELECT address FROM hivemapper.derived_addresses WHERE derivedAddress = $1") + if err != nil { + panic(err) + } + selectDriver, err := db.Prepare("SELECT id FROM hivemapper.drivers WHERE address = $1") + if err != nil { + panic(err) + } + insertDriver, err := db.Prepare("INSERT INTO hivemapper.drivers (address, transaction_id) VALUES ($1, $2) RETURNING id") + if err != nil { + panic(err) + } + selectFleet, err := db.Prepare("SELECT id FROM hivemapper.fleets WHERE address = $1") + if err != nil { + panic(err) + } + insertFleet, err := db.Prepare("INSERT INTO hivemapper.fleets (address, transaction_id) VALUES ($1, $2) RETURNING id") + if err != nil { + panic(err) + } + selectFleetDriver, err := db.Prepare("SELECT id FROM hivemapper.fleet_drivers WHERE fleet_id = $1 and driver_id = $2") + if err != nil { + panic(err) + } + insertFleetDriver, err := db.Prepare("INSERT INTO hivemapper.fleet_drivers (transaction_id, fleet_id, driver_id) VALUES ($1, $2, $3) RETURNING id") + if err != nil { + panic(err) + } + insertPayment, err := db.Prepare("INSERT INTO hivemapper.payments (mint_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertSlipPayment, err := db.Prepare("INSERT INTO hivemapper.split_payments (transaction_id, fleet_mint_id, driver_mint_id) VALUES ($1, $2, $3)") + if err != nil { + panic(err) + } + insertTransfer, err := db.Prepare("INSERT INTO hivemapper.transfers (transaction_id, from_address, to_address, amount) VALUES ($1, $2, $3, $4)") + if err != nil { + panic(err) + } + insertAIPayment, err := db.Prepare("INSERT INTO hivemapper.ai_payments (mint_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertOperationalPayment, err := db.Prepare("INSERT INTO hivemapper.operational_payments (mint_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertRewardPayment, err := db.Prepare("INSERT INTO hivemapper.reward_payments (mint_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertMapCreate, err := db.Prepare("INSERT INTO hivemapper.map_create (burn_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertMapConsumption, err := db.Prepare("INSERT INTO hivemapper.map_consumption_reward (mint_id) VALUES ($1) RETURNING id") + if err != nil { + panic(err) + } + insertBurn, err := db.Prepare("INSERT INTO hivemapper.burns (transaction_id, from_address, amount) VALUES ($1, $2, $3) RETURNING id") + if err != nil { + panic(err) + } + insertCursor, err := db.Prepare("INSERT INTO hivemapper.cursor (name, cursor) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET cursor = $2") + if err != nil { + panic(err) + } + preparedStatement = &PreparedStatement{ - insertMint: insertMing, - insertTransaction: insertTransaction, + insertMint: insertMing, + insertTransaction: insertTransaction, + insertBlock: insertBlock, + insertDerivedAddress: insertDerivedAddress, + selectDerivedAddress: selectDerivedAddress, + selectDriver: selectDriver, + insertDriver: insertDriver, + selectFleet: selectFleet, + insertFleet: insertFleet, + selectFleetDriver: selectFleetDriver, + insertFleetDriver: insertFleetDriver, + insertPayment: insertPayment, + insertSlipPayment: insertSlipPayment, + insertTransfer: insertTransfer, + insertAIPayment: insertAIPayment, + insertOperationalPayment: insertOperationalPayment, + insertRewardPayment: insertRewardPayment, + insertMapCreate: insertMapCreate, + insertMapConsumption: insertMapConsumption, + insertBurn: insertBurn, + insertCursor: insertCursor, } return &Psql{ @@ -79,7 +194,7 @@ func (p *Psql) Init() error { return nil } func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err error) { - row := p.tx.QueryRow("INSERT INTO hivemapper.blocks (number, hash, timestamp) VALUES ($1, $2, $3) RETURNING id", clock.Number, clock.Id, clock.Timestamp.AsTime()) + row := p.tx.Stmt(preparedStatement.insertBlock).QueryRow(clock.Number, clock.Id, clock.Timestamp.AsTime()) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting clock: %w", err) @@ -90,20 +205,6 @@ func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err erro } func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTransactionID int64, err error) { - - ////todo: create a transaction cache - //rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash) - //p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash)) - //if err != nil { - // return 0, fmt.Errorf("selecting transaction: %w", err) - //} - //defer rows.Close() - // - //if rows.Next() { - // err = rows.Scan(&dbTransactionID) - // return - //} - if id, found := p.TransactionIDs[transactionHash]; found { return id, nil } @@ -125,7 +226,7 @@ func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []* if err != nil { return fmt.Errorf("handling transaction: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.derived_addresses (transaction_id, address, derivedAddress) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", dbTransactionID, initializedAccount.Owner, initializedAccount.Account) + _, err = p.tx.Stmt(preparedStatement.insertDerivedAddress).Exec(dbTransactionID, initializedAccount.Owner, initializedAccount.Account) if err != nil { return fmt.Errorf("trx_hash: %d inserting derived_addresses: %w", dbBlockID, err) } @@ -145,7 +246,7 @@ var NotFound = errors.New("not found") func (p *Psql) resolveAddress(derivedAddress string) (string, error) { resolvedAddress := "" - rows, err := p.tx.Query("SELECT address FROM hivemapper.derived_addresses WHERE derivedAddress = $1", derivedAddress) + rows, err := p.tx.Stmt(preparedStatement.selectDerivedAddress).Query(derivedAddress) if err != nil { return "", fmt.Errorf("selecting derived_addresses: %w", err) } @@ -160,7 +261,7 @@ func (p *Psql) resolveAddress(derivedAddress string) (string, error) { } func (p *Psql) handleDriver(dbTransactionID int64, driverAddress string) (dbDriverID int64, err error) { - rows, err := p.tx.Query("SELECT id FROM hivemapper.drivers WHERE address = $1", driverAddress) + rows, err := p.tx.Stmt(preparedStatement.selectDriver).Query(driverAddress) if err != nil { return 0, fmt.Errorf("selecting drivers %q : %w", driverAddress, err) } @@ -172,7 +273,7 @@ func (p *Psql) handleDriver(dbTransactionID int64, driverAddress string) (dbDriv } rows.Close() - row := p.tx.QueryRow("INSERT INTO hivemapper.drivers (address, transaction_id) VALUES ($1, $2) RETURNING id", driverAddress, dbTransactionID) + row := p.tx.Stmt(preparedStatement.insertDriver).QueryRow(driverAddress, dbTransactionID) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting driver: %w", err) @@ -183,7 +284,7 @@ func (p *Psql) handleDriver(dbTransactionID int64, driverAddress string) (dbDriv } func (p *Psql) handleFleet(dbTransactionID int64, fleetAddress string) (dbDriverID int64, err error) { - rows, err := p.tx.Query("SELECT id FROM hivemapper.fleets WHERE address = $1", fleetAddress) + rows, err := p.tx.Stmt(preparedStatement.selectFleet).Query(fleetAddress) if err != nil { return 0, fmt.Errorf("selecting fleets %q: %w", fleetAddress, err) } @@ -195,7 +296,7 @@ func (p *Psql) handleFleet(dbTransactionID int64, fleetAddress string) (dbDriver rows.Close() - row := p.tx.QueryRow("INSERT INTO hivemapper.fleets (address, transaction_id) VALUES ($1, $2) RETURNING id", fleetAddress, dbTransactionID) + row := p.tx.Stmt(preparedStatement.insertFleet).QueryRow(fleetAddress, dbTransactionID) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting driver: %w", err) @@ -206,7 +307,7 @@ func (p *Psql) handleFleet(dbTransactionID int64, fleetAddress string) (dbDriver } func (p *Psql) handleFleetDriver(dbTransactionID int64, dbFleetID int64, dbDriverID int64) (dbFleetDriverID int64, err error) { - rows, err := p.tx.Query("SELECT id FROM hivemapper.fleet_drivers WHERE fleet_id = $1 and driver_id = $2", dbFleetID, dbDriverID) + rows, err := p.tx.Stmt(preparedStatement.selectFleetDriver).Query(dbFleetID, dbDriverID) if err != nil { return 0, fmt.Errorf("selecting fleet_drivers: %w", err) } @@ -218,7 +319,7 @@ func (p *Psql) handleFleetDriver(dbTransactionID int64, dbFleetID int64, dbDrive } rows.Close() - row := p.tx.QueryRow("INSERT INTO hivemapper.fleet_drivers (transaction_id, fleet_id, driver_id) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, dbFleetID, dbDriverID) + row := p.tx.Stmt(preparedStatement.insertFleetDriver).QueryRow(dbTransactionID, dbFleetID, dbDriverID) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting driver: %w", err) @@ -241,7 +342,7 @@ func (p *Psql) HandleRegularDriverPayments(dbBlockID int64, payments []*pb.Regul return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.payments (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertPayment).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting payment: %w", err) } @@ -262,7 +363,7 @@ func (p *Psql) HandleNoneSplitPayments(dbBlockID int64, payments []*pb.NoSplitPa } //todo: detect drive vs fleet from backend api - _, err = p.tx.Exec("INSERT INTO hivemapper.no_split_payments (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertPayment).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting NoneSplitPayments with mint_id %d mint_to %q tx %q: %w", mintDbID, payment.Mint.To, payment.Mint.TrxHash, err) } @@ -312,7 +413,7 @@ func (p *Psql) HandleSplitPayments(dbBlockID int64, splitPayments []*pb.TokenSpl return fmt.Errorf("inserting driver mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.split_payments (transaction_id, fleet_mint_id, driver_mint_id) VALUES ($1, $2, $3)", dbTransactionID, fleetMintID, driverMintID) + _, err = p.tx.Stmt(preparedStatement.insertSlipPayment).Exec(dbTransactionID, fleetMintID, driverMintID) if err != nil { return fmt.Errorf("inserting split payment: %w", err) } @@ -328,7 +429,7 @@ func (p *Psql) HandleTransfers(dbBlockID int64, transfers []*pb.Transfer) error return fmt.Errorf("inserting transaction: %w", err) } //{"error": "handle BlockScopedData message: rollback transaction: rolling back transaction: driver: bad connection: while handling err handle transfers: inserting transfer: pq: unexpected Parse response 'C'"} - _, err = p.tx.Exec("INSERT INTO hivemapper.transfers (transaction_id, from_address, to_address, amount) VALUES ($1, $2, $3, $4)", dbTransactionID, transfer.From, transfer.To, transfer.Amount) + _, err = p.tx.Stmt(preparedStatement.insertTransfer).Exec(dbTransactionID, transfer.From, transfer.To, transfer.Amount) if err != nil { fmt.Println("processing transfer: ", transfer.From, transfer.To, transfer.Amount, transfer.TrxHash) return fmt.Errorf("inserting transfer: %w", err) @@ -339,7 +440,6 @@ func (p *Psql) HandleTransfers(dbBlockID int64, transfers []*pb.Transfer) error func (p *Psql) insertMint(dbTransactionID int64, mint *pb.Mint) (dbMintID int64, err error) { row := p.tx.Stmt(preparedStatement.insertMint).QueryRow(dbTransactionID, mint.To, mint.Amount) - //row := p.tx.QueryRow("INSERT INTO hivemapper.mints (transaction_id, to_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, mint.To, mint.Amount) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting mint: %w", err) @@ -375,7 +475,7 @@ func (p *Psql) HandleAITrainerPayments(dbBlockID int64, payments []*pb.AiTrainer return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.ai_payments (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertAIPayment).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting payment: %w", err) } @@ -396,7 +496,7 @@ func (p *Psql) HandleOperationalPayments(dbBlockID int64, payments []*pb.Operati return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.operational_payments (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertOperationalPayment).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting payment: %w", err) } @@ -417,7 +517,7 @@ func (p *Psql) HandleRewardPayments(dbBlockID int64, payments []*pb.RewardPaymen return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.reward_payments (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertRewardPayment).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting reward_payments: %w", err) } @@ -437,7 +537,7 @@ func (p *Psql) HandleMapCreate(dbBlockID int64, payments []*pb.MapCreate) error return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.map_create (burn_id) VALUES ($1) RETURNING id", burnDbID) + _, err = p.tx.Stmt(preparedStatement.insertMapCreate).Exec(burnDbID) if err != nil { return fmt.Errorf("inserting reward_payments: %w", err) } @@ -457,7 +557,7 @@ func (p *Psql) HandleMapConsumptionReward(dbBlockID int64, payments []*pb.MapCon return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.map_consumption_reward (mint_id) VALUES ($1) RETURNING id", mintDbID) + _, err = p.tx.Stmt(preparedStatement.insertMapConsumption).Exec(mintDbID) if err != nil { return fmt.Errorf("inserting reward_payments: %w", err) } @@ -467,7 +567,7 @@ func (p *Psql) HandleMapConsumptionReward(dbBlockID int64, payments []*pb.MapCon } func (p *Psql) insertBurns(dbTransactionID int64, burn *pb.Burn) (dbMintID int64, err error) { - row := p.tx.QueryRow("INSERT INTO hivemapper.burns (transaction_id, from_address, amount) VALUES ($1, $2, $3) RETURNING id", dbTransactionID, burn.From, burn.Amount) + row := p.tx.Stmt(preparedStatement.insertBurn).QueryRow(dbTransactionID, burn.From, burn.Amount) err = row.Err() if err != nil { return 0, fmt.Errorf("inserting burn: %w", err) @@ -503,7 +603,7 @@ func (p *Psql) HandleAiPayments(dbBlockID int64, payments []*pb.AiTrainerPayment if err != nil { return fmt.Errorf("inserting mint: %w", err) } - _, err = p.tx.Exec("INSERT INTO hivemapper.ai_payments (transaction_id, mint_id) VALUES ($1, $2)", dbTransactionID, dbMintID) + _, err = p.tx.Stmt(preparedStatement.insertAIPayment).Exec(dbTransactionID, dbMintID) if err != nil { return fmt.Errorf("inserting ai payment: %w", err) } @@ -513,7 +613,7 @@ func (p *Psql) HandleAiPayments(dbBlockID int64, payments []*pb.AiTrainerPayment } func (p *Psql) StoreCursor(cursor *sink.Cursor) error { - _, err := p.tx.Exec("INSERT INTO hivemapper.cursor (name, cursor) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET cursor = $2", "hivemapper", cursor.String()) + _, err := p.tx.Stmt(preparedStatement.insertCursor).Exec("hivemapper", cursor.String()) if err != nil { return fmt.Errorf("inserting cursor: %w", err) }