Skip to content

Commit

Permalink
Implementing handle undo signal
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Feb 1, 2024
1 parent d52fd48 commit 66de505
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
8 changes: 8 additions & 0 deletions data/psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra
return
}

func (p *Psql) HandleBlockUndo(blockId string) error {
_, err := p.tx.Exec("DELETE CASCADE FROM solana_tokens.blocks WHERE hash = $1", blockId)
if err != nil {
return fmt.Errorf("deleting block: %w", err)
}
return nil
}

func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []*pb.InitializedAccount) (err error) {
for _, initializedAccount := range initializedAccounts {
dbTransactionID, err := p.handleTransaction(dbBlockID, initializedAccount.TrxHash)
Expand Down
36 changes: 34 additions & 2 deletions data/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,38 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
return nil
}

func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
panic("should not be called on solana")
func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) (err error) {
blockId := cursor.Block().ID()

s.logger.Info("Handling undo block signal", zap.Stringer("block", cursor.Block()), zap.Stringer("cursor", cursor))

defer func() {
if err != nil {
if s.db.tx != nil {
e := s.db.RollbackTransaction()
err = fmt.Errorf("Undo block: %d rollback transaction: %w: while handling err %w", blockId, e, err)

Check failure on line 160 in data/sinker.go

View workflow job for this annotation

GitHub Actions / Test (1.20.x, ubuntu-latest)

fmt.Errorf format %d has arg blockId of wrong type string
}

return
}
if s.db.tx != nil {
err = s.db.CommitTransaction()
}
}()

err = s.db.BeginTransaction()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

err = s.db.HandleBlockUndo(blockId)
if err != nil {
return fmt.Errorf("handle block %d undo: %w", blockId, err)

Check failure on line 177 in data/sinker.go

View workflow job for this annotation

GitHub Actions / Test (1.20.x, ubuntu-latest)

fmt.Errorf format %d has arg blockId of wrong type string
}

err = s.db.StoreCursor(cursor)
if err != nil {
return fmt.Errorf("store cursor: %w", err)
}
return nil
}

0 comments on commit 66de505

Please sign in to comment.