From 6a02865d3d0011487e0e30eafca351fbf40f781f Mon Sep 17 00:00:00 2001 From: arnaudberger Date: Thu, 1 Feb 2024 15:29:45 -0500 Subject: [PATCH] refeactoring undo signal handling --- data/psql.go | 16 ++++++++-------- data/sinker.go | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/data/psql.go b/data/psql.go index 4698561..4c096db 100644 --- a/data/psql.go +++ b/data/psql.go @@ -89,14 +89,6 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra return } -func (p *Psql) HandleBlockUndo(blockId string) error { - _, err := p.tx.Exec("DELETE CASCADE FROM solana_tokens.blocks WHERE hash = $1", blockId) - if err != nil { - return fmt.Errorf("deleting block: %w", err) - } - return nil -} - func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []*pb.InitializedAccount) (err error) { for _, initializedAccount := range initializedAccounts { dbTransactionID, err := p.handleTransaction(dbBlockID, initializedAccount.TrxHash) @@ -111,6 +103,14 @@ func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []* return nil } +func (p *Psql) HandleBlocksUndo(lastValidBlockNum uint64) error { + _, err := p.tx.Exec("DELETE CASCADE FROM solana_tokens.blocks WHERE num > $1", lastValidBlockNum) + if err != nil { + return fmt.Errorf("deleting block from %s: %w", lastValidBlockNum, err) + } + return nil +} + var NotFound = errors.New("Not found") func (p *Psql) resolveAddress(derivedAddress string) (string, error) { diff --git a/data/sinker.go b/data/sinker.go index fcff01d..d55f792 100644 --- a/data/sinker.go +++ b/data/sinker.go @@ -149,7 +149,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp } func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) (err error) { - blockId := cursor.Block().ID() + lastValidBlockNum := undoSignal.LastValidBlock.Number s.logger.Info("Handling undo block signal", zap.Stringer("block", cursor.Block()), zap.Stringer("cursor", cursor)) @@ -157,7 +157,7 @@ func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstr if err != nil { if s.db.tx != nil { e := s.db.RollbackTransaction() - err = fmt.Errorf("undo block: %s rollback transaction: %w: while handling err %w", blockId, e, err) + err = fmt.Errorf("undo blocks: %s rollback transaction: %w: while handling err %w", lastValidBlockNum, e, err) } return @@ -172,9 +172,9 @@ func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstr return fmt.Errorf("begin transaction: %w", err) } - err = s.db.HandleBlockUndo(blockId) + err = s.db.HandleBlocksUndo(lastValidBlockNum) if err != nil { - return fmt.Errorf("handle block %s undo: %w", blockId, err) + return fmt.Errorf("handle blocks undo from %d : %w", lastValidBlockNum, err) } err = s.db.StoreCursor(cursor)