Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Dec 21, 2023
1 parent bffbdfa commit b7b8f3d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 103 deletions.
109 changes: 85 additions & 24 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,7 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLa
defer as.RUnlock()

var maxSeq SEQ
if as.inprogress != nil && as.inprogress.Sequence != nil {
if (*as.inprogress.Sequence).Int64() > maxSeq.Int64() {
maxSeq = *as.inprogress.Sequence
}
}
for _, tx := range as.unconfirmed {
for _, tx := range as.allTransactions {
if tx.Sequence == nil {
continue
}
Expand Down Expand Up @@ -271,9 +266,6 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyT
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
) {
as.Lock()
defer as.Unlock()

// if txIDs is not empty then only apply the filter to those transactions
if len(txIDs) > 0 {
for _, txID := range txIDs {
Expand Down Expand Up @@ -495,7 +487,9 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxT
return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress(
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
defer as.Unlock()

Expand All @@ -505,7 +499,6 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn

if tx != nil {
// if tx is not nil then remove the tx from the unstarted queue
// TODO(jtw): what should be the unique idenitifier for each transaction? ID is being set by the postgres DB
tx = as.unstarted.RemoveTxByID(tx.ID)
} else {
// if tx is nil then pop the next unstarted transaction
Expand All @@ -520,6 +513,24 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUn
return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed(
txID int64,
) error {
as.Lock()
defer as.Unlock()

tx, ok := as.confirmedMissingReceipt[txID]
if !ok || tx == nil {
return fmt.Errorf("move_confirmed_missing_receipt_to_unconfirmed: no confirmed_missing_receipt transaction with ID %d: %w", txID, ErrTxnNotFound)
}

tx.State = TxUnconfirmed
as.unconfirmed[tx.ID] = tx
delete(as.confirmedMissingReceipt, tx.ID)

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed(
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
Expand All @@ -530,25 +541,66 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn
if tx == nil {
return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress")
}

txAttempt.TxID = tx.ID
txAttempt.State = txmgrtypes.TxAttemptBroadcast
tx.State = TxUnconfirmed
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt}

var found bool
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
tx.TxAttempts[i] = txAttempt
found = true
break
as.unconfirmed[tx.ID] = tx
as.inprogress = nil

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed(
receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
) error {
as.Lock()
defer as.Unlock()

for _, tx := range as.unconfirmed {
if tx.TxAttempts == nil {
continue
}
for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
if receipt.GetTxHash() == txAttempt.Hash {
// TODO(jtw): not sure how to set blocknumber, transactionindex, and receipt on conflict
txAttempt.Receipts = []txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]{receipt}
txAttempt.State = txmgrtypes.TxAttemptBroadcast
if txAttempt.BroadcastBeforeBlockNum == nil {
blockNum := receipt.GetBlockNumber().Int64()
txAttempt.BroadcastBeforeBlockNum = &blockNum
}

tx.State = TxConfirmed
return nil
}
}
}
if !found {
// NOTE(jtw): this would mean that the TxAttempt did not exist for the Tx
// NOTE(jtw): should this log a warning?
// NOTE(jtw): can this happen?
tx.TxAttempts = append(tx.TxAttempts, txAttempt)

return fmt.Errorf("move_unconfirmed_to_confirmed: no unconfirmed transaction with receipt %v: %w", receipt, ErrTxnNotFound)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError(
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txError null.String,
) error {
as.Lock()
defer as.Unlock()

tx := as.unstarted.RemoveTxByID(etx.ID)
if tx == nil {
return fmt.Errorf("move_unstarted_to_fatal_error: no unstarted transaction with ID %d", etx.ID)
}

as.unconfirmed[tx.ID] = tx
as.inprogress = nil
tx.State = TxFatalError
tx.Sequence = nil
tx.TxAttempts = nil
tx.InitialBroadcastAt = nil
tx.Error = txError
as.fatalErrored[tx.ID] = tx

return nil
}
Expand All @@ -573,6 +625,15 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn
return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt() error {
// TODO
return nil
}
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt() error {
// TODO
return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon() {
as.Lock()
defer as.Unlock()
Expand Down
Loading

0 comments on commit b7b8f3d

Please sign in to comment.