From ad870d8b5908b2079d19542136a4cbb9a1770258 Mon Sep 17 00:00:00 2001 From: Ratan Kaliani Date: Fri, 13 Dec 2024 18:29:18 -0800 Subject: [PATCH] chore: error handling (#280) * feat: better proposer error handling * feat: fix proposer witnessgen timeout * add --- proposer/op/go.sum | 6 + proposer/op/proposer/db/db.go | 47 +----- proposer/op/proposer/db/ent/migrate/schema.go | 5 + proposer/op/proposer/driver.go | 17 +- proposer/op/proposer/prove.go | 29 +++- proposer/op/proposer/range.go | 15 -- proposer/succinct/bin/server.rs | 148 ++++++++++++------ 7 files changed, 155 insertions(+), 112 deletions(-) diff --git a/proposer/op/go.sum b/proposer/op/go.sum index 8323eca8..0aa3c6b5 100644 --- a/proposer/op/go.sum +++ b/proposer/op/go.sum @@ -328,6 +328,10 @@ github.com/slack-go/slack v0.14.0 h1:6c0UTfbRnvRssZUsZ2qe0Iu07VAMPjRqOa6oX8ewF4k github.com/slack-go/slack v0.14.0/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA= github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -443,6 +447,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/proposer/op/proposer/db/db.go b/proposer/op/proposer/db/db.go index 2289a398..1e3c22f1 100644 --- a/proposer/op/proposer/db/db.go +++ b/proposer/op/proposer/db/db.go @@ -38,7 +38,7 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) { } // Use the TL;DR SQLite settings from https://kerkour.com/sqlite-for-servers. - connectionUrl := fmt.Sprintf("file:%s?_fk=1&journal_mode=WAL&synchronous=normal&cache_size=100000000&busy_timeout=15000&_txlock=immediate", dbPath) + connectionUrl := fmt.Sprintf("file:%s?_fk=1&journal_mode=WAL&synchronous=normal&cache_size=100000000&busy_timeout=30000&_txlock=immediate", dbPath) writeDrv, err := sql.Open("sqlite3", connectionUrl) if err != nil { @@ -48,7 +48,7 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) { // The write lock only allows one connection to the DB at a time. writeDb.SetMaxOpenConns(1) - writeDb.SetConnMaxLifetime(time.Hour) + writeDb.SetConnMaxLifetime(10 * time.Minute) readDrv, err := sql.Open("sqlite3", connectionUrl) if err != nil { @@ -56,7 +56,7 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) { } readDb := readDrv.DB() readDb.SetMaxOpenConns(max(4, runtime.NumCPU()/4)) - readDb.SetConnMaxLifetime(time.Hour) + readDb.SetConnMaxLifetime(10 * time.Minute) readClient := ent.NewClient(ent.Driver(readDrv)) writeClient := ent.NewClient(ent.Driver(writeDrv)) @@ -256,47 +256,6 @@ func (db *ProofDB) GetLatestEndBlock() (uint64, error) { return uint64(maxEnd.EndBlock), nil } -// When restarting the L2OutputSubmitter, some proofs may have been left in a "requested" state without a prover request ID on the server. Until we -// implement a mechanism for querying the status of the witness generation, we need to time out these proofs after a period of time so they can be requested. -func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequest, error) { - currentTime := time.Now().Unix() - twentyMinutesAgo := currentTime - 20*60 - - proofs, err := db.readClient.ProofRequest.Query(). - Where( - proofrequest.StatusEQ(proofrequest.StatusWITNESSGEN), - proofrequest.ProverRequestIDIsNil(), - proofrequest.LastUpdatedTimeLT(uint64(twentyMinutesAgo)), - ). - All(context.Background()) - - if err != nil { - return nil, fmt.Errorf("failed to query witness generation timeout proofs: %w", err) - } - - return proofs, nil -} - -// If a proof failed to be sent to the prover network, it's status will be set to FAILED, but the prover request ID will be empty. -// This function returns all such proofs. -func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) { - proofs, err := db.readClient.ProofRequest.Query(). - Where( - proofrequest.StatusEQ(proofrequest.StatusFAILED), - proofrequest.ProverRequestIDEQ(""), - ). - All(context.Background()) - - if err != nil { - if ent.IsNotFound(err) { - return nil, nil - } - return nil, fmt.Errorf("failed to query failed proof: %w", err) - } - - return proofs, nil -} - // GetAllProofsWithStatus returns all proofs with the given status. func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.ProofRequest, error) { proofs, err := db.readClient.ProofRequest.Query(). diff --git a/proposer/op/proposer/db/ent/migrate/schema.go b/proposer/op/proposer/db/ent/migrate/schema.go index 20062a6f..f4d05b92 100644 --- a/proposer/op/proposer/db/ent/migrate/schema.go +++ b/proposer/op/proposer/db/ent/migrate/schema.go @@ -3,6 +3,7 @@ package migrate import ( + "entgo.io/ent/dialect/entsql" "entgo.io/ent/dialect/sql/schema" "entgo.io/ent/schema/field" ) @@ -36,4 +37,8 @@ var ( ) func init() { + ProofRequestsTable.Annotation = &entsql.Annotation{ + Table: "proof_requests", + Options: "STRICT", + } } diff --git a/proposer/op/proposer/driver.go b/proposer/op/proposer/driver.go index 6d5c05cf..9e3e9c6a 100644 --- a/proposer/op/proposer/driver.go +++ b/proposer/op/proposer/driver.go @@ -639,13 +639,22 @@ func (l *L2OutputSubmitter) loopL2OO(ctx context.Context) { continue } - // 2) Check the statuses of all requested proofs. + // 2) Check the statuses of PROVING requests. // If it's successfully returned, we validate that we have it on disk and set status = "COMPLETE". // If it fails or times out, we set status = "FAILED" (and, if it's a span proof, split the request in half to try again). - l.Log.Info("Stage 2: Processing Pending Proofs...") - err = l.ProcessPendingProofs() + l.Log.Info("Stage 2: Processing PROVING requests...") + err = l.ProcessProvingRequests() if err != nil { - l.Log.Error("failed to update requested proofs", "err", err) + l.Log.Error("failed to update PROVING requests", "err", err) + continue + } + + // 3) Check the statuses of WITNESSGEN requests. + // If the witness generation request has been in the WITNESSGEN state for longer than the timeout, set status to FAILED and retry. + l.Log.Info("Stage 3: Processing WITNESSGEN requests...") + err = l.ProcessWitnessgenRequests() + if err != nil { + l.Log.Error("failed to update WITNESSGEN requests", "err", err) continue } diff --git a/proposer/op/proposer/prove.go b/proposer/op/proposer/prove.go index 8b5b600f..5ff44d92 100644 --- a/proposer/op/proposer/prove.go +++ b/proposer/op/proposer/prove.go @@ -17,14 +17,14 @@ import ( ) const PROOF_STATUS_TIMEOUT = 30 * time.Second -const WITNESS_GEN_TIMEOUT = 20 * time.Minute +const WITNESSGEN_TIMEOUT = 20 * time.Minute // This limit is set to prevent overloading the witness generation server. Until Kona improves their native I/O API (https://github.com/anton-rs/kona/issues/553) // the maximum number of concurrent witness generation requests is roughly num_cpu / 2. Set it to 5 for now to be safe. const MAX_CONCURRENT_WITNESS_GEN = 5 -// Process all of the pending proofs. -func (l *L2OutputSubmitter) ProcessPendingProofs() error { +// Process all of requests in PROVING state. +func (l *L2OutputSubmitter) ProcessProvingRequests() error { // Get all proof requests that are currently in the PROVING state. reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusPROVING) if err != nil { @@ -65,6 +65,25 @@ func (l *L2OutputSubmitter) ProcessPendingProofs() error { return nil } +// Process all of requests in WITNESSGEN state. +func (l *L2OutputSubmitter) ProcessWitnessgenRequests() error { + // Get all proof requests that are currently in the WITNESSGEN state. + reqs, err := l.db.GetAllProofsWithStatus(proofrequest.StatusWITNESSGEN) + if err != nil { + return err + } + for _, req := range reqs { + // If the request has been in the WITNESSGEN state for longer than the timeout, set status to FAILED. + // This is a catch-all in case the witness generation state update failed. + if req.LastUpdatedTime+uint64(WITNESSGEN_TIMEOUT.Seconds()) < uint64(time.Now().Unix()) { + // Retry the request if it timed out. + l.RetryRequest(req, ProofStatusResponse{}) + } + } + + return nil +} + // Retry a proof request. Sets the status of a proof to FAILED and retries the proof based on the optional proof status response. // If an error response is received: // - Range Proof: Split in two if the block range is > 1. Retry the same request if range is 1 block. @@ -297,13 +316,13 @@ func (l *L2OutputSubmitter) makeProofRequest(proofType proofrequest.Type, jsonBo } req.Header.Set("Content-Type", "application/json") - client := &http.Client{Timeout: WITNESS_GEN_TIMEOUT} + client := &http.Client{Timeout: WITNESSGEN_TIMEOUT} resp, err := client.Do(req) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { l.Log.Error("Witness generation request timed out", "err", err) l.Metr.RecordWitnessGenFailure("Timeout") - return nil, fmt.Errorf("request timed out after %s: %w", WITNESS_GEN_TIMEOUT, err) + return nil, fmt.Errorf("request timed out after %s: %w", WITNESSGEN_TIMEOUT, err) } return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/proposer/op/proposer/range.go b/proposer/op/proposer/range.go index 7c687b52..dde52d0e 100644 --- a/proposer/op/proposer/range.go +++ b/proposer/op/proposer/range.go @@ -187,21 +187,6 @@ func (l *L2OutputSubmitter) GetRangeProofBoundaries(ctx context.Context) error { spans := l.SplitRangeBasic(newL2StartBlock, newL2EndBlock) - // // Check if the safeDB is activated on the L2 node. If it is, we use the safeHead based range - // // splitting algorithm. Otherwise, we use the simple range splitting algorithm. - // safeDBActivated, err := l.isSafeDBActivated(ctx, rollupClient) - // if err != nil { - // l.Log.Warn("safeDB is not activated. Using simple range splitting algorithm.", "err", err) - // } - // if safeDBActivated { - // safeHeadSpans, err := l.SplitRangeBasedOnSafeHeads(ctx, newL2StartBlock, newL2EndBlock) - // if err == nil { - // spans = safeHeadSpans - // } else { - // l.Log.Warn("failed to split range based on safe heads, using basic range splitting", "err", err) - // } - // } - // Add each span to the DB. If there are no spans, we will not create any proofs. for _, span := range spans { err := l.db.NewEntry(proofrequest.TypeSPAN, span.Start, span.End) diff --git a/proposer/succinct/bin/server.rs b/proposer/succinct/bin/server.rs index c4d4d75f..10f41bac 100644 --- a/proposer/succinct/bin/server.rs +++ b/proposer/succinct/bin/server.rs @@ -7,7 +7,7 @@ use axum::{ routing::{get, post}, Json, Router, }; -use log::info; +use log::{error, info}; use op_succinct_client_utils::{ boot::{hash_rollup_config, BootInfoStruct}, types::u32_to_u8, @@ -121,7 +121,13 @@ async fn request_span_proof( Json(payload): Json, ) -> Result<(StatusCode, Json), AppError> { info!("Received span proof request: {:?}", payload); - let fetcher = OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await?; + let fetcher = match OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await { + Ok(f) => f, + Err(e) => { + error!("Failed to create data fetcher: {}", e); + return Err(AppError(e)); + } + }; let host_cli = match fetcher .get_host_cli_args( @@ -134,7 +140,7 @@ async fn request_span_proof( { Ok(cli) => cli, Err(e) => { - log::error!("Failed to get host CLI args: {}", e); + error!("Failed to get host CLI args: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get host CLI args: {}", e @@ -147,7 +153,7 @@ async fn request_span_proof( // host, and return an ID that the client can poll on to check if the proof was submitted. let mut witnessgen_executor = WitnessGenExecutor::new(WITNESSGEN_TIMEOUT, RunContext::Docker); if let Err(e) = witnessgen_executor.spawn_witnessgen(&host_cli).await { - log::error!("Failed to spawn witness generation: {}", e); + error!("Failed to spawn witness generation: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to spawn witness generation: {}", e @@ -155,7 +161,7 @@ async fn request_span_proof( } // Log any errors from running the witness generation process. if let Err(e) = witnessgen_executor.flush().await { - log::error!("Failed to generate witness: {}", e); + error!("Failed to generate witness: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to generate witness: {}", e @@ -165,7 +171,7 @@ async fn request_span_proof( let sp1_stdin = match get_proof_stdin(&host_cli) { Ok(stdin) => stdin, Err(e) => { - log::error!("Failed to get proof stdin: {}", e); + error!("Failed to get proof stdin: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get proof stdin: {}", e @@ -176,7 +182,7 @@ async fn request_span_proof( let private_key = match env::var("SP1_PRIVATE_KEY") { Ok(private_key) => private_key, Err(e) => { - log::error!("Failed to get SP1 private key: {}", e); + error!("Failed to get SP1 private key: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get SP1 private key: {}", e @@ -186,7 +192,7 @@ async fn request_span_proof( let rpc_url = match env::var("PROVER_NETWORK_RPC") { Ok(rpc_url) => rpc_url, Err(e) => { - log::error!("Failed to get PROVER_NETWORK_RPC: {}", e); + error!("Failed to get PROVER_NETWORK_RPC: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get PROVER_NETWORK_RPC: {}", e @@ -205,7 +211,7 @@ async fn request_span_proof( { Ok(vk_hash) => vk_hash, Err(e) => { - log::error!("Failed to register program: {}", e); + error!("Failed to register program: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to register program: {}", e @@ -224,7 +230,7 @@ async fn request_span_proof( { Ok(proof_id) => proof_id, Err(e) => { - log::error!("Failed to request proof: {}", e); + error!("Failed to request proof: {}", e); return Err(AppError(anyhow::anyhow!("Failed to request proof: {}", e))); } }; @@ -263,14 +269,18 @@ async fn request_agg_proof( )?; let l1_head: [u8; 32] = l1_head_bytes.try_into().unwrap(); - let fetcher = OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await?; - let res = fetcher + let fetcher = match OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await { + Ok(f) => f, + Err(e) => return Err(AppError(anyhow::anyhow!("Failed to create fetcher: {}", e))), + }; + + let headers = match fetcher .get_header_preimages(&boot_infos, l1_head.into()) - .await; - let headers = match res { - Ok(headers) => headers, + .await + { + Ok(h) => h, Err(e) => { - log::error!("Failed to get header preimages: {}", e); + error!("Failed to get header preimages: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get header preimages: {}", e @@ -286,9 +296,9 @@ async fn request_agg_proof( let stdin = match get_agg_proof_stdin(proofs, boot_infos, headers, &state.range_vk, l1_head.into()) { - Ok(stdin) => stdin, + Ok(s) => s, Err(e) => { - log::error!("Failed to get agg proof stdin: {}", e); + error!("Failed to get agg proof stdin: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to get agg proof stdin: {}", e @@ -296,18 +306,17 @@ async fn request_agg_proof( } }; - let res = prover.register_program(&state.agg_vk, AGG_ELF).await; - let vk_hash = match res { + let vk_hash = match prover.register_program(&state.agg_vk, AGG_ELF).await { Ok(vk_hash) => vk_hash, Err(e) => { - log::error!("Failed to register program: {}", e); + error!("Failed to register program: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to register program: {}", e ))); } }; - let res = prover + let proof_id = match prover .request_proof( &vk_hash, &stdin, @@ -315,13 +324,11 @@ async fn request_agg_proof( 1_000_000_000_000, None, ) - .await; - - // Check if error, otherwise get proof ID. - let proof_id = match res { - Ok(proof_id) => proof_id, + .await + { + Ok(id) => id, Err(e) => { - log::error!("Failed to request proof: {}", e); + error!("Failed to request proof: {}", e); return Err(AppError(anyhow::anyhow!("Failed to request proof: {}", e))); } }; @@ -335,33 +342,54 @@ async fn request_mock_span_proof( Json(payload): Json, ) -> Result<(StatusCode, Json), AppError> { info!("Received mock span proof request: {:?}", payload); - let fetcher = OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await?; + let fetcher = match OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await { + Ok(f) => f, + Err(e) => { + error!("Failed to create data fetcher: {}", e); + return Err(AppError(e)); + } + }; - let host_cli = fetcher + let host_cli = match fetcher .get_host_cli_args( payload.start, payload.end, ProgramType::Multi, CacheMode::DeleteCache, ) - .await?; + .await + { + Ok(cli) => cli, + Err(e) => { + error!("Failed to get host CLI args: {}", e); + return Err(AppError(e)); + } + }; // Start the server and native client with a timeout. // Note: Ideally, the server should call out to a separate process that executes the native // host, and return an ID that the client can poll on to check if the proof was submitted. let mut witnessgen_executor = WitnessGenExecutor::new(WITNESSGEN_TIMEOUT, RunContext::Docker); - witnessgen_executor.spawn_witnessgen(&host_cli).await?; + if let Err(e) = witnessgen_executor.spawn_witnessgen(&host_cli).await { + error!("Failed to spawn witness generator: {}", e); + return Err(AppError(e)); + } // Log any errors from running the witness generation process. - let res = witnessgen_executor.flush().await; - if let Err(e) = res { - log::error!("Failed to generate witness: {}", e); + if let Err(e) = witnessgen_executor.flush().await { + error!("Failed to generate witness: {}", e); return Err(AppError(anyhow::anyhow!( "Failed to generate witness: {}", e ))); } - let sp1_stdin = get_proof_stdin(&host_cli)?; + let sp1_stdin = match get_proof_stdin(&host_cli) { + Ok(stdin) => stdin, + Err(e) => { + error!("Failed to get proof stdin: {}", e); + return Err(AppError(e)); + } + }; let prover = ProverClient::mock(); let proof = prover @@ -404,30 +432,62 @@ async fn request_mock_agg_proof( .map(|proof| proof.proof.clone()) .collect(); - let l1_head_bytes = hex::decode( + let l1_head_bytes = match hex::decode( payload .head .strip_prefix("0x") .expect("Invalid L1 head, no 0x prefix."), - )?; + ) { + Ok(bytes) => bytes, + Err(e) => { + error!("Failed to decode L1 head: {}", e); + return Err(AppError(anyhow::anyhow!("Failed to decode L1 head: {}", e))); + } + }; let l1_head: [u8; 32] = l1_head_bytes.try_into().unwrap(); - let fetcher = OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await?; - let headers = fetcher + let fetcher = match OPSuccinctDataFetcher::new_with_rollup_config(RunContext::Docker).await { + Ok(f) => f, + Err(e) => { + error!("Failed to create data fetcher: {}", e); + return Err(AppError(e)); + } + }; + let headers = match fetcher .get_header_preimages(&boot_infos, l1_head.into()) - .await?; + .await + { + Ok(h) => h, + Err(e) => { + error!("Failed to get header preimages: {}", e); + return Err(AppError(e)); + } + }; let prover = ProverClient::mock(); let stdin = - get_agg_proof_stdin(proofs, boot_infos, headers, &state.range_vk, l1_head.into()).unwrap(); + match get_agg_proof_stdin(proofs, boot_infos, headers, &state.range_vk, l1_head.into()) { + Ok(s) => s, + Err(e) => { + error!("Failed to get aggregation proof stdin: {}", e); + return Err(AppError(e)); + } + }; // Simulate the mock proof. proof.bytes() returns an empty byte array for mock proofs. - let proof = prover + let proof = match prover .prove(&state.agg_pk, stdin) .set_skip_deferred_proof_verification(true) .groth16() - .run()?; + .run() + { + Ok(p) => p, + Err(e) => { + error!("Failed to generate proof: {}", e); + return Err(AppError(e)); + } + }; Ok(( StatusCode::OK,