Skip to content

Commit

Permalink
cache nonce and handle epoch length issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ZzzzHui committed Nov 27, 2024
1 parent cdfccb4 commit 2dc9354
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
18 changes: 11 additions & 7 deletions internal/espressoreader/espresso_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (e *EspressoReader) Run(ctx context.Context, ready chan<- struct{}) error {
var l1FinalizedTimestamp uint64
lastProcessedL1Block, l1FinalizedTimestamp = e.readL1(ctx, app, currentBlockHeight, lastProcessedL1Block)
//** read espresso **//
e.readEspresso(ctx, app.Application.ContractAddress, currentBlockHeight, lastProcessedL1Block, l1FinalizedTimestamp)
e.readEspresso(ctx, app, currentBlockHeight, lastProcessedL1Block, l1FinalizedTimestamp)

// update lastProcessedEspressoBlock in db
err = e.repository.UpdateLastProcessedEspressoBlock(ctx, currentBlockHeight, app.Application.ContractAddress)
Expand Down Expand Up @@ -154,7 +154,7 @@ func (e *EspressoReader) bootstrap(ctx context.Context, app evmreader.TypeExport
currentEspressoBlock := batchStartingBlock + uint64(index)
slog.Debug("found namespace contained in", "block", currentEspressoBlock)
l1FinalizedHeight, l1FinalizedTimestamp = e.readL1(ctx, app, currentEspressoBlock, l1FinalizedHeight)
e.readEspresso(ctx, app.Application.ContractAddress, currentEspressoBlock, l1FinalizedHeight, l1FinalizedTimestamp)
e.readEspresso(ctx, app, currentEspressoBlock, l1FinalizedHeight, l1FinalizedTimestamp)
}
}
}
Expand Down Expand Up @@ -183,7 +183,8 @@ func (e *EspressoReader) readL1(ctx context.Context, app evmreader.TypeExportApp
return l1FinalizedLatestHeight, l1FinalizedTimestamp
}

func (e *EspressoReader) readEspresso(ctx context.Context, app common.Address, currentBlockHeight uint64, l1FinalizedLatestHeight uint64, l1FinalizedTimestamp uint64) {
func (e *EspressoReader) readEspresso(ctx context.Context, appEvmType evmreader.TypeExportApplication, currentBlockHeight uint64, l1FinalizedLatestHeight uint64, l1FinalizedTimestamp uint64) {
app := appEvmType.Application.ContractAddress
transactions, err := e.client.FetchTransactionsInBlock(ctx, currentBlockHeight, e.namespace)
if err != nil {
slog.Error("failed fetching espresso tx", "error", err)
Expand Down Expand Up @@ -261,8 +262,12 @@ func (e *EspressoReader) readEspresso(ctx context.Context, app common.Address, c
// get epoch length and last open epoch
epochLength := e.evmReader.GetEpochLengthCache(appAddress)
if epochLength == 0 {
slog.Error("could not obtain epoch length")
continue
err = e.evmReader.AddAppEpochLengthIntoCache(appEvmType)
epochLength = e.evmReader.GetEpochLengthCache(appAddress)
if err != nil || epochLength == 0 {
slog.Error("could not obtain epoch length")
continue
}
}
currentEpoch, err := e.repository.GetEpoch(ctx,
epochLength, appAddress)
Expand Down Expand Up @@ -362,8 +367,7 @@ func (e *EspressoReader) getL1FinalizedHeight(ctx context.Context, espressoBlock
l1FinalizedNumber := gjson.Get(espressoHeader, "fields.l1_finalized.number").Uint()
l1FinalizedTimestampStr := gjson.Get(espressoHeader, "fields.l1_finalized.timestamp").Str
if len(l1FinalizedTimestampStr) < 2 {
slog.Error("error fetching espresso header l1_finalized.timestamp", "at height", espressoBlockHeight, "header", espressoHeader)
slog.Error("retry fetching")
slog.Debug("Espresso header not ready. Retry fetching", "height", espressoBlockHeight)
var delay time.Duration = 3000
time.Sleep(delay * time.Millisecond)
continue
Expand Down
47 changes: 43 additions & 4 deletions internal/espressoreader/service/espressoreader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

// app address => sender address => nonce
var nonceCache map[common.Address]map[common.Address]uint64

// Service to manage InputReader lifecycle
type EspressoReaderService struct {
blockchainHttpEndpoint string
Expand Down Expand Up @@ -124,6 +127,8 @@ func (s *EspressoReaderService) setupEvmReader(ctx context.Context, database *re
}

func (s *EspressoReaderService) setupNonceHttpServer() {
nonceCache = make(map[common.Address]map[common.Address]uint64)

http.HandleFunc("/nonce", s.requestNonce)
http.HandleFunc("/submit", s.submit)

Expand Down Expand Up @@ -162,8 +167,17 @@ func (s *EspressoReaderService) requestNonce(w http.ResponseWriter, r *http.Requ
senderAddress := common.HexToAddress(nonceRequest.MsgSender)
applicationAddress := common.HexToAddress(nonceRequest.AppContract)

ctx := r.Context()
nonce := s.process(ctx, senderAddress, applicationAddress)
var nonce uint64
if nonceCache[applicationAddress] == nil {
nonceCache[applicationAddress] = make(map[common.Address]uint64)
}
if nonceCache[applicationAddress][senderAddress] == 0 {
ctx := r.Context()
nonce = s.queryNonceFromDb(ctx, senderAddress, applicationAddress)
nonceCache[applicationAddress][senderAddress] = nonce
} else {
nonce = nonceCache[applicationAddress][senderAddress]
}

slog.Debug("got nonce request", "senderAddress", senderAddress, "applicationAddress", applicationAddress)

Expand All @@ -181,7 +195,7 @@ func (s *EspressoReaderService) requestNonce(w http.ResponseWriter, r *http.Requ
}
}

func (s *EspressoReaderService) process(
func (s *EspressoReaderService) queryNonceFromDb(
ctx context.Context,
senderAddress common.Address,
applicationAddress common.Address) uint64 {
Expand Down Expand Up @@ -222,7 +236,11 @@ func (s *EspressoReaderService) submit(w http.ResponseWriter, r *http.Request) {
return
}

_, _, sigHash, err := espressoreader.ExtractSigAndData(string(tx.Payload))
msgSender, typedData, sigHash, err := espressoreader.ExtractSigAndData(string(tx.Payload))
if err != nil {
slog.Error("transaction not correctly formatted", "error", err)
return
}
submitResponse := SubmitResponse{Id: sigHash}

err = json.NewEncoder(w).Encode(submitResponse)
Expand All @@ -231,5 +249,26 @@ func (s *EspressoReaderService) submit(w http.ResponseWriter, r *http.Request) {
"service", "espresso submit endpoint",
"err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// update nonce cache
appAddressStr := typedData.Message["app"].(string)
appAddress := common.HexToAddress(appAddressStr)
if nonceCache[appAddress] == nil {
slog.Error("Should query nonce before submit")
return
}
nonceInRequest := uint64(typedData.Message["nonce"].(float64))
if nonceCache[appAddress][msgSender] == 0 {
ctx := r.Context()
nonceInDb := s.queryNonceFromDb(ctx, msgSender, appAddress)
if nonceInRequest != nonceInDb {
slog.Error("Nonce in request is incorrect")
return
}
nonceCache[appAddress][msgSender] = nonceInDb + 1
} else {
nonceCache[appAddress][msgSender]++
}
}
6 changes: 3 additions & 3 deletions internal/evmreader/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *EvmReader) ReadAndStoreInputs(
for _, app := range apps {

// Get App EpochLength
err := r.addAppEpochLengthIntoCache(app)
err := r.AddAppEpochLengthIntoCache(app)
if err != nil {
slog.Error("evmreader: Error adding epoch length into cache",
"app", app.ContractAddress,
Expand Down Expand Up @@ -251,9 +251,9 @@ func (r *EvmReader) ReadAndStoreInputs(
return nil
}

// addAppEpochLengthIntoCache checks the epoch length cache and read epoch length from IConsensus
// AddAppEpochLengthIntoCache checks the epoch length cache and read epoch length from IConsensus
// contract and add it to the cache if needed
func (r *EvmReader) addAppEpochLengthIntoCache(app application) error {
func (r *EvmReader) AddAppEpochLengthIntoCache(app application) error {

epochLength, ok := r.epochLengthCache[app.ContractAddress]
if !ok {
Expand Down

0 comments on commit 2dc9354

Please sign in to comment.