Skip to content

Commit

Permalink
fixup! feat(claimer): rewrite claimer in go
Browse files Browse the repository at this point in the history
  • Loading branch information
mpolitzer committed Nov 26, 2024
1 parent 87a3519 commit a5051c0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 41 deletions.
54 changes: 35 additions & 19 deletions internal/claimer/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,32 +146,32 @@ func (s *Service) Stop(bool) []error {
}

func (s *Service) Tick() []error {
err := s.submitClaimsAndUpdateDatabase(s)
if err != nil {
return []error{err}
}
return nil
return s.submitClaimsAndUpdateDatabase(s)
}

func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) error {
func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error {
errs := []error{}
prevClaims, currClaims, err := se.selectClaimPairsPerApp()
if err != nil {
return err
errs = append(errs, err)
return errs
}

// check claims in flight
for key, txHash := range s.claimsInFlight {
ready, receipt, err := se.pollTransaction(txHash)
if err != nil {
return err
errs = append(errs, err)
return errs
}
if !ready {
continue
}
if claim, ok := currClaims[key]; ok {
err = se.updateEpochWithSubmittedClaim(&claim, receipt.TxHash)
if err != nil {
return err
errs = append(errs, err)
return errs
}
s.Logger.Info("claimer: Claim submitted",
"app", claim.AppContractAddress,
Expand All @@ -187,7 +187,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) error {
}

// check computed claims
for key, currClaimRow := range currClaims {
nextApp: for key, currClaimRow := range currClaims {
var ic *iconsensus.IConsensus = nil
var prevEvent *claimSubmissionEvent = nil
var currEvent *claimSubmissionEvent = nil
Expand All @@ -204,36 +204,46 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) error {
"currClaim", currClaimRow,
"err", err,
)
return err
delete(currClaims, key)
errs = append(errs, err)
goto nextApp
}

// if prevClaimRow exists, there must be a matching event
ic, prevEvent, currEvent, err =
se.findClaimSubmissionEventAndSucc(&prevClaimRow)
if err != nil {
return err
delete(currClaims, key)
errs = append(errs, err)
goto nextApp
}
if prevEvent == nil {
s.Logger.Error("claimer: missing event",
"claim", prevClaimRow,
"err", ErrMissingEvent,
)
return ErrMissingEvent
delete(currClaims, key)
errs = append(errs, ErrMissingEvent)
goto nextApp
}
if !claimMatchesEvent(&prevClaimRow, prevEvent) {
s.Logger.Error("claimer: event mismatch",
"claim", prevClaimRow,
"event", prevEvent,
"err", ErrEventMismatch,
)
return ErrEventMismatch
delete(currClaims, key)
errs = append(errs, ErrEventMismatch)
goto nextApp
}
} else {
// first claim
ic, currEvent, _, err =
se.findClaimSubmissionEventAndSucc(&currClaimRow)
if err != nil {
return err
delete(currClaims, key)
errs = append(errs, err)
goto nextApp
}
}

Expand All @@ -244,18 +254,24 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) error {
"event", currEvent,
"err", ErrEventMismatch,
)
return ErrEventMismatch
delete(currClaims, key)
errs = append(errs, ErrEventMismatch)
goto nextApp
}
txHash := currEvent.Raw.TxHash
err = se.updateEpochWithSubmittedClaim(&currClaimRow, txHash)
if err != nil {
return err
delete(currClaims, key)
errs = append(errs, err)
goto nextApp
}
delete(s.claimsInFlight, key)
} else if s.submissionEnabled {
txHash, err := se.submitClaimToBlockchain(ic, &currClaimRow)
if err != nil {
return err
delete(currClaims, key)
errs = append(errs, err)
goto nextApp
}
s.Logger.Info("claimer: Submitting claim to blockchain",
"app", currClaimRow.AppContractAddress,
Expand All @@ -265,7 +281,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) error {
s.claimsInFlight[currClaimRow.AppContractAddress] = txHash
}
}
return nil
return errs
}

func checkClaimConstraint(c *claimRow) error {
Expand Down
47 changes: 25 additions & 22 deletions internal/claimer/claimer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func TestDoNothing(t *testing.T) {
m.On("selectClaimPairsPerApp").
Return(prevClaims, currClaims, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
}

func TestSubmitFirstClaim(t *testing.T) {
Expand Down Expand Up @@ -133,8 +133,8 @@ func TestSubmitFirstClaim(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 1)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -181,8 +181,8 @@ func TestSubmitClaimWithAntecessor(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 1)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -218,8 +218,8 @@ func TestSkipSubmitFirstClaim(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 0)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -267,8 +267,8 @@ func TestSkipSubmitClaimWithAntecessor(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 0)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -305,8 +305,8 @@ func TestInFlightCompleted(t *testing.T) {
m.On("updateEpochWithSubmittedClaim", &currClaim, txHash).
Return(nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 0)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 0)
m.AssertNumberOfCalls(t, "pollTransaction", 1)
Expand Down Expand Up @@ -342,8 +342,8 @@ func TestUpdateFirstClaim(t *testing.T) {
m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash).
Return(nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 0)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -391,8 +391,8 @@ func TestUpdateClaimWithAntecessor(t *testing.T) {
m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash).
Return(nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Nil(t, err)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 0)
assert.Equal(t, len(m.claimsInFlight), 0)
m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1)
m.AssertNumberOfCalls(t, "pollTransaction", 0)
Expand Down Expand Up @@ -444,8 +444,9 @@ func TestSubmitClaimWithAntecessorMismatch(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, err, ErrEventMismatch)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 1)
assert.Equal(t, errs[0], ErrEventMismatch)
}

// !claimMatchesEvent(currClaim, currEvent)
Expand Down Expand Up @@ -488,8 +489,9 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) {
m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash).
Return(nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, err, ErrEventMismatch)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 1)
assert.Equal(t, errs[0], ErrEventMismatch)
}

// !checkClaimsConstraint(prevClaim, currClaim)
Expand Down Expand Up @@ -531,7 +533,8 @@ func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) {
m.On("submitClaimToBlockchain", nil, &currClaim).
Return(claimTransactionHash, nil)

err := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, err, ErrClaimMismatch)
errs := m.submitClaimsAndUpdateDatabase(m)
assert.Equal(t, len(errs), 1)
assert.Equal(t, errs[0], ErrClaimMismatch)
}

0 comments on commit a5051c0

Please sign in to comment.