diff --git a/Cargo.lock b/Cargo.lock index 0a9d6eb..c264eb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,13 +4,19 @@ version = 3 [[package]] name = "addr2line" -version = "0.24.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "adler2" version = "2.0.0" @@ -89,6 +95,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy", @@ -105,9 +112,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.20" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "android-tzdata" @@ -507,7 +514,7 @@ dependencies = [ "nom", "num-traits", "rusticata-macros", - "thiserror 1.0.69", + "thiserror 1.0.68", "time 0.3.36", ] @@ -585,6 +592,26 @@ dependencies = [ "pin-project-lite 0.2.15", ] +[[package]] +name = "async-compatibility-layer" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32dd1dfd4a05a197583e51036d9615f04a4d851089dc119ee965d440d0bcaa39" +dependencies = [ + "async-lock 3.4.0", + "async-std", + "async-trait", + "color-eyre", + "console-subscriber 0.2.0", + "flume", + "futures", + "tokio", + "tokio-stream", + "tracing", + "tracing-error", + "tracing-subscriber 0.3.18", +] + [[package]] name = "async-dup" version = "1.2.4" @@ -674,7 +701,7 @@ dependencies = [ "futures-lite 2.5.0", "parking", "polling 3.7.4", - "rustix 0.38.40", + "rustix 0.38.39", "slab", "tracing", "windows-sys 0.59.0", @@ -708,7 +735,7 @@ checksum = "9e9e7a929bd34c68a82d58a4de7f86fffdaf97fb2af850162a7bb19dd7269b33" dependencies = [ "async-std", "native-tls", - "thiserror 1.0.69", + "thiserror 1.0.68", "url", ] @@ -727,7 +754,7 @@ dependencies = [ "cfg-if", "event-listener 5.3.1", "futures-lite 2.5.0", - "rustix 0.38.40", + "rustix 0.38.39", "tracing", ] @@ -743,7 +770,7 @@ dependencies = [ "cfg-if", "futures-core", "futures-io", - "rustix 0.38.40", + "rustix 0.38.39", "signal-hook-registry", "slab", "windows-sys 0.59.0", @@ -984,17 +1011,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", + "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", - "windows-targets 0.52.6", ] [[package]] @@ -1174,9 +1201,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" dependencies = [ "bytemuck_derive", ] @@ -1236,9 +1263,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.1" +version = "1.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf" dependencies = [ "shlex", ] @@ -1250,7 +1277,7 @@ source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.7#5406fde54e61 dependencies = [ "cdn-proto", "clap", - "console-subscriber", + "console-subscriber 0.3.0", "dashmap", "derivative", "jf-signature 0.1.0", @@ -1320,7 +1347,7 @@ dependencies = [ "rustls 0.23.16", "rustls-pki-types", "sqlx", - "thiserror 1.0.69", + "thiserror 1.0.68", "tokio", "tokio-rustls", "tracing", @@ -1400,9 +1427,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "coarsetime" @@ -1415,6 +1442,33 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -1481,6 +1535,19 @@ dependencies = [ "yaml-rust2", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic 0.10.2", + "tracing-core", +] + [[package]] name = "console-api" version = "0.7.0" @@ -1490,8 +1557,32 @@ dependencies = [ "futures-core", "prost", "prost-types", - "tonic", + "tonic 0.11.0", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api 0.6.0", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.10.2", + "tracing", "tracing-core", + "tracing-subscriber 0.3.18", ] [[package]] @@ -1500,7 +1591,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" dependencies = [ - "console-api", + "console-api 0.7.0", "crossbeam-channel", "crossbeam-utils", "futures-task", @@ -1513,7 +1604,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", "tracing", "tracing-core", "tracing-subscriber 0.3.18", @@ -1616,9 +1707,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.15" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" +checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" dependencies = [ "libc", ] @@ -1741,9 +1832,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.3.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" dependencies = [ "csv-core", "itoa", @@ -2286,6 +2377,16 @@ dependencies = [ "pin-project-lite 0.2.15", ] +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -2320,12 +2421,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.35" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -2336,6 +2437,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "futures-core", "futures-sink", + "nanorand", "spin 0.9.8", ] @@ -2595,9 +2697,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.1" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "glob" @@ -2803,7 +2905,7 @@ dependencies = [ "once_cell", "rand 0.8.5", "socket2 0.5.7", - "thiserror 1.0.69", + "thiserror 1.0.68", "tinyvec", "tokio", "tracing", @@ -2826,7 +2928,7 @@ dependencies = [ "rand 0.8.5", "resolv-conf", "smallvec", - "thiserror 1.0.69", + "thiserror 1.0.68", "tokio", "tracing", ] @@ -2912,8 +3014,8 @@ dependencies = [ [[package]] name = "hotshot" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-broadcast", @@ -2947,7 +3049,7 @@ dependencies = [ "serde", "sha2 0.10.8", "surf-disco", - "thiserror 1.0.69", + "thiserror 2.0.3", "time 0.3.36", "tokio", "tracing", @@ -2960,7 +3062,7 @@ dependencies = [ [[package]] name = "hotshot-builder-api" version = "0.1.7" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "async-trait", "clap", @@ -2970,7 +3072,7 @@ dependencies = [ "hotshot-types", "serde", "tagged-base64", - "thiserror 1.0.69", + "thiserror 2.0.3", "tide-disco", "toml", "vbs", @@ -3017,10 +3119,12 @@ dependencies = [ [[package]] name = "hotshot-events-service" version = "0.1.49" -source = "git+https://github.com/EspressoSystems/hotshot-events-service.git?tag=0.1.54#6c831301f1ff7fe7d2b5a6da4f6ff849cfe74b4a" +source = "git+https://github.com/EspressoSystems/hotshot-events-service.git?branch=sishan/tx_status_api#4ea38d4b068e416ec5e950881463c7ee095d947a" dependencies = [ "async-broadcast", + "async-compatibility-layer", "async-lock 2.8.0", + "async-std", "async-trait", "clap", "derivative", @@ -3033,17 +3137,15 @@ dependencies = [ "snafu", "tagged-base64", "tide-disco", - "tokio", "toml", "tracing", - "tracing-subscriber 0.3.18", "vbs", ] [[package]] name = "hotshot-example-types" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-broadcast", @@ -3065,7 +3167,7 @@ dependencies = [ "serde", "sha2 0.10.8", "sha3", - "thiserror 1.0.69", + "thiserror 2.0.3", "time 0.3.36", "tokio", "tracing", @@ -3075,8 +3177,8 @@ dependencies = [ [[package]] name = "hotshot-fakeapi" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-lock 3.4.0", @@ -3095,8 +3197,8 @@ dependencies = [ [[package]] name = "hotshot-macros" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "derive_builder", "proc-macro2", @@ -3106,8 +3208,8 @@ dependencies = [ [[package]] name = "hotshot-orchestrator" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-lock 3.4.0", @@ -3122,7 +3224,7 @@ dependencies = [ "serde", "serde_json", "surf-disco", - "thiserror 1.0.69", + "thiserror 2.0.3", "tide-disco", "tokio", "toml", @@ -3133,8 +3235,8 @@ dependencies = [ [[package]] name = "hotshot-task" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-broadcast", @@ -3147,8 +3249,8 @@ dependencies = [ [[package]] name = "hotshot-task-impls" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-broadcast", @@ -3172,7 +3274,7 @@ dependencies = [ "sha2 0.10.8", "surf-disco", "tagged-base64", - "thiserror 1.0.69", + "thiserror 2.0.3", "time 0.3.36", "tokio", "tracing", @@ -3184,8 +3286,8 @@ dependencies = [ [[package]] name = "hotshot-testing" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-broadcast", @@ -3217,7 +3319,7 @@ dependencies = [ "sha2 0.10.8", "sha3", "tagged-base64", - "thiserror 1.0.69", + "thiserror 2.0.3", "tide-disco", "tokio", "tracing", @@ -3229,7 +3331,7 @@ dependencies = [ [[package]] name = "hotshot-types" version = "0.1.11" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "ark-bn254", @@ -3272,7 +3374,7 @@ dependencies = [ "sha2 0.10.8", "surf-disco", "tagged-base64", - "thiserror 1.0.69", + "thiserror 2.0.3", "time 0.3.36", "tokio", "toml", @@ -3754,6 +3856,12 @@ dependencies = [ "quote", ] +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "1.9.3" @@ -4200,7 +4308,7 @@ dependencies = [ "multiaddr", "pin-project", "rw-stream-sink", - "thiserror 1.0.69", + "thiserror 1.0.68", ] [[package]] @@ -4270,7 +4378,7 @@ dependencies = [ "rw-stream-sink", "serde", "smallvec", - "thiserror 1.0.69", + "thiserror 1.0.68", "tracing", "unsigned-varint 0.8.0", "void", @@ -4343,7 +4451,7 @@ dependencies = [ "quick-protobuf", "quick-protobuf-codec 0.3.1", "smallvec", - "thiserror 1.0.69", + "thiserror 1.0.68", "tracing", "void", ] @@ -4364,7 +4472,7 @@ dependencies = [ "rand 0.8.5", "serde", "sha2 0.10.8", - "thiserror 1.0.69", + "thiserror 1.0.68", "tracing", "zeroize", ] @@ -4393,7 +4501,7 @@ dependencies = [ "serde", "sha2 0.10.8", "smallvec", - "thiserror 1.0.69", + "thiserror 1.0.68", "tracing", "uint", "void", @@ -4440,8 +4548,8 @@ dependencies = [ [[package]] name = "libp2p-networking" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "anyhow", "async-lock 3.4.0", @@ -4465,7 +4573,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.3", "tokio", "tokio-stream", "tracing", @@ -4492,7 +4600,7 @@ dependencies = [ "ring 0.17.8", "rustls 0.23.16", "socket2 0.5.7", - "thiserror 1.0.69", + "thiserror 1.0.68", "tokio", "tracing", ] @@ -4586,7 +4694,7 @@ dependencies = [ "ring 0.17.8", "rustls 0.23.16", "rustls-webpki 0.101.7", - "thiserror 1.0.69", + "thiserror 1.0.68", "x509-parser", "yasna", ] @@ -4708,7 +4816,7 @@ checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782" dependencies = [ "libc", "neli", - "thiserror 1.0.69", + "thiserror 1.0.68", "windows-sys 0.59.0", ] @@ -4825,6 +4933,7 @@ dependencies = [ "nonempty-collections", "once_cell", "portpicker", + "quick_cache", "rand 0.8.5", "serde", "sha2 0.10.8", @@ -4966,6 +5075,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -5048,6 +5166,15 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.15", +] + [[package]] name = "native-tls" version = "0.2.12" @@ -5125,7 +5252,7 @@ dependencies = [ "anyhow", "byteorder", "paste", - "thiserror 1.0.69", + "thiserror 1.0.68", ] [[package]] @@ -5139,7 +5266,7 @@ dependencies = [ "log", "netlink-packet-core", "netlink-sys", - "thiserror 1.0.69", + "thiserror 1.0.68", "tokio", ] @@ -5289,9 +5416,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -5383,6 +5510,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parking" version = "2.2.1" @@ -5456,7 +5589,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442" dependencies = [ "memchr", - "thiserror 1.0.69", + "thiserror 1.0.68", "ucd-trie", ] @@ -5602,7 +5735,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite 0.2.15", - "rustix 0.38.40", + "rustix 0.38.39", "tracing", "windows-sys 0.59.0", ] @@ -5712,7 +5845,7 @@ dependencies = [ "memchr", "parking_lot", "protobuf", - "thiserror 1.0.69", + "thiserror 1.0.68", ] [[package]] @@ -5820,7 +5953,7 @@ dependencies = [ "asynchronous-codec 0.6.2", "bytes", "quick-protobuf", - "thiserror 1.0.69", + "thiserror 1.0.68", "unsigned-varint 0.7.2", ] @@ -5833,15 +5966,27 @@ dependencies = [ "asynchronous-codec 0.7.0", "bytes", "quick-protobuf", - "thiserror 1.0.69", + "thiserror 1.0.68", "unsigned-varint 0.8.0", ] +[[package]] +name = "quick_cache" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d7c94f8935a9df96bb6380e8592c70edf497a643f94bd23b2f76b399385dbf4" +dependencies = [ + "ahash 0.8.11", + "equivalent", + "hashbrown 0.14.5", + "parking_lot", +] + [[package]] name = "quinn" -version = "0.11.6" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" dependencies = [ "bytes", "futures-io", @@ -5851,29 +5996,26 @@ dependencies = [ "rustc-hash", "rustls 0.23.16", "socket2 0.5.7", - "thiserror 2.0.3", + "thiserror 1.0.68", "tokio", "tracing", ] [[package]] name = "quinn-proto" -version = "0.11.9" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" dependencies = [ "bytes", - "getrandom 0.2.15", "rand 0.8.5", "ring 0.17.8", "rustc-hash", "rustls 0.23.16", - "rustls-pki-types", "slab", - "thiserror 2.0.3", + "thiserror 1.0.68", "tinyvec", "tracing", - "web-time", ] [[package]] @@ -6062,7 +6204,7 @@ checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ "getrandom 0.2.15", "libredox", - "thiserror 1.0.69", + "thiserror 1.0.68", ] [[package]] @@ -6073,7 +6215,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", + "regex-automata 0.4.8", "regex-syntax 0.8.5", ] @@ -6088,9 +6230,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.9" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", @@ -6289,7 +6431,7 @@ dependencies = [ "netlink-packet-route", "netlink-proto", "nix", - "thiserror 1.0.69", + "thiserror 1.0.68", "tokio", ] @@ -6364,9 +6506,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.40" +version = "0.38.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" +checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" dependencies = [ "bitflags 2.6.0", "errno", @@ -6417,9 +6559,6 @@ name = "rustls-pki-types" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" -dependencies = [ - "web-time", -] [[package]] name = "rustls-webpki" @@ -6597,9 +6736,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -6615,7 +6754,7 @@ checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" dependencies = [ "percent-encoding", "serde", - "thiserror 1.0.69", + "thiserror 1.0.68", ] [[package]] @@ -6972,7 +7111,7 @@ dependencies = [ "sha2 0.10.8", "smallvec", "sqlformat", - "thiserror 1.0.69", + "thiserror 1.0.68", "time 0.3.36", "tokio", "tokio-stream", @@ -7056,7 +7195,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 1.0.69", + "thiserror 1.0.68", "time 0.3.36", "tracing", "whoami", @@ -7095,7 +7234,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 1.0.69", + "thiserror 1.0.68", "time 0.3.36", "tracing", "whoami", @@ -7477,17 +7616,17 @@ dependencies = [ "cfg-if", "fastrand 2.2.0", "once_cell", - "rustix 0.38.40", + "rustix 0.38.39", "windows-sys 0.59.0", ] [[package]] name = "thiserror" -version = "1.0.69" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +checksum = "02dd99dc800bbb97186339685293e1cc5d9df1f8fae2d0aecd9ff1c77efea892" dependencies = [ - "thiserror-impl 1.0.69", + "thiserror-impl 1.0.68", ] [[package]] @@ -7501,9 +7640,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "1.0.69" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +checksum = "a7c61ec9a6f64d2793d8a45faba21efbe3ced62a886d44c36a009b2b519b4c7e" dependencies = [ "proc-macro2", "quote", @@ -7749,6 +7888,7 @@ dependencies = [ "mio", "parking_lot", "pin-project-lite 0.2.15", + "signal-hook-registry", "socket2 0.5.7", "tokio-macros", "tracing", @@ -7866,6 +8006,33 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic" version = "0.11.0" @@ -7970,6 +8137,16 @@ dependencies = [ "tracing-subscriber 0.3.18", ] +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber 0.3.18", +] + [[package]] name = "tracing-futures" version = "0.2.5" @@ -8100,7 +8277,7 @@ dependencies = [ "native-tls", "rand 0.8.5", "sha-1", - "thiserror 1.0.69", + "thiserror 1.0.68", "url", "utf-8", ] @@ -8242,9 +8419,9 @@ dependencies = [ [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna 1.0.3", @@ -8278,8 +8455,8 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utils" -version = "0.5.81" -source = "git+https://github.com/EspressoSystems/HotShot.git?tag=0.5.81#4ed1df20f793b947d5c9f90a1cb10e4177f8521d" +version = "0.5.79" +source = "git+https://github.com/EspressoSystems/HotShot.git?rev=f62b557#f62b5574b1d96d34803c3de0bc6c7e1a3e2533fe" dependencies = [ "tracing", ] @@ -8855,7 +9032,7 @@ dependencies = [ "oid-registry", "ring 0.17.8", "rusticata-macros", - "thiserror 1.0.69", + "thiserror 1.0.68", "time 0.3.36", ] diff --git a/Cargo.toml b/Cargo.toml index 94f3d06..1a0a619 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,14 +3,14 @@ resolver = "2" members = ["crates/*"] [workspace.dependencies] -hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", tag = "0.1.54" } -hotshot-macros = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-testing = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } -hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.81" } +hotshot = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-builder-api = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-events-service = { git = "https://github.com/EspressoSystems/hotshot-events-service.git", branch = "sishan/tx_status_api" } +hotshot-macros = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-task-impls = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-testing = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } +hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", rev = "f62b557" } anyhow = "1" async-broadcast = "0.7" @@ -31,6 +31,7 @@ nonempty-collections = "0.2" once_cell = "1.20" num_cpus = "1.16" portpicker = "0.1.1" +quick_cache = "0.6" rand = "0.8" serde = "1.0" serde_json = "1.0" @@ -45,7 +46,7 @@ toml = "0.8" tracing = "0.1" tracing-test = "0.1" typenum = "1.17" -url = "2.3" +url = "2.5" vbs = "0.1" vec1 = "1.12" tracing-subscriber = "0.3" diff --git a/crates/legacy/src/service.rs b/crates/legacy/src/service.rs index b82accd..2fde5b0 100644 --- a/crates/legacy/src/service.rs +++ b/crates/legacy/src/service.rs @@ -1,8 +1,7 @@ use hotshot::types::Event; -use hotshot_builder_api::v0_1::builder::{define_api, submit_api, Error as BuilderApiError}; use hotshot_builder_api::v0_1::{ block_info::{AvailableBlockData, AvailableBlockHeaderInput, AvailableBlockInfo}, - builder::BuildError, + builder::{define_api, submit_api, BuildError, Error as BuilderApiError, TransactionStatus}, data_source::{AcceptsTxnSubmits, BuilderDataSource}, }; use hotshot_types::traits::block_contents::{precompute_vid_commitment, Transaction}; @@ -95,6 +94,8 @@ pub struct BuilderConfig { pub txn_garbage_collect_duration: Duration, /// Channel capacity for incoming transactions for a single builder state. pub txn_channel_capacity: usize, + /// Capacity of cache storing information for transaction status API + pub tx_status_cache_capacity: usize, /// Base fee; the sequencing fee for a block is calculated as block size × base fee pub base_fee: u64, } @@ -113,20 +114,30 @@ impl BuilderConfig { maximize_txn_capture_timeout: TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT, txn_garbage_collect_duration: TEST_INCLUDED_TX_GC_PERIOD, txn_channel_capacity: TEST_CHANNEL_BUFFER_SIZE, + tx_status_cache_capacity: TEST_TX_STATUS_CACHE_CAPACITY, base_fee: TEST_BASE_FEE, } } } pub struct GlobalState { + /// Underlying coordinator, responsible for builder state lifecycle pub(crate) coordinator: Arc>, + /// Keys that this builder will use to sign responses pub(crate) builder_keys: BuilderKeys, - pub(crate) max_api_waiting_time: Duration, + /// Stores blocks built by this builder pub(crate) block_store: RwLock>, + /// Limits on block size. See [`BlockSizeLimits`] documentation for more details. pub(crate) block_size_limits: BlockSizeLimits, - pub(crate) maximize_txn_capture_timeout: Duration, + /// Number of DA nodes used in VID computation pub(crate) num_nodes: AtomicUsize, + /// Instance state, used to construct new blocks pub(crate) instance_state: Types::InstanceState, + /// See [`BuilderConfig::max_api_waiting_time`] + pub(crate) max_api_waiting_time: Duration, + /// See [`BuilderConfig::maximize_txn_capture_timeout`] + pub(crate) maximize_txn_capture_timeout: Duration, + /// See [`BuilderConfig::base_fee`] pub(crate) base_fee: u64, } @@ -147,6 +158,7 @@ where coordinator: Arc::new(BuilderStateCoordinator::new( config.txn_channel_capacity, config.txn_garbage_collect_duration, + config.tx_status_cache_capacity, )), block_store: RwLock::new(BlockStore::new()), block_size_limits: BlockSizeLimits::new( @@ -249,7 +261,14 @@ where let max_tx_len = self.block_size_limits.max_block_size(); if len > max_tx_len { tracing::warn!(%tx.commit, %len, %max_tx_len, "Transaction too big"); - return Err(Error::TxTooBig { len, max_tx_len }); + let error = Error::TxTooBig { len, max_tx_len }; + self.coordinator.update_txn_status( + &tx.commit, + TransactionStatus::Rejected { + reason: error.to_string(), + }, + ); + return Err(error); } self.coordinator.handle_transaction(tx).await } @@ -775,6 +794,13 @@ where .try_collect() .await } + + async fn txn_status( + &self, + txn_hash: Commitment<::Transaction>, + ) -> Result { + Ok(self.coordinator.tx_status(&txn_hash)) + } } #[async_trait] diff --git a/crates/legacy/src/testing/block_size.rs b/crates/legacy/src/testing/block_size.rs index 96ad139..15df76f 100644 --- a/crates/legacy/src/testing/block_size.rs +++ b/crates/legacy/src/testing/block_size.rs @@ -1,4 +1,6 @@ use async_broadcast::broadcast; +use committable::Committable; +use hotshot_builder_api::v0_1::builder::TransactionStatus; use hotshot_example_types::block_types::TestTransaction; use hotshot_example_types::state_types::TestInstanceState; use hotshot_types::data::ViewNumber; @@ -137,6 +139,7 @@ async fn huge_transactions() { let almost_too_big = TestTransaction::new(vec![0u8; PROTOCOL_MAX_BLOCK_SIZE as usize]); let too_big = TestTransaction::new(vec![0u8; PROTOCOL_MAX_BLOCK_SIZE as usize + 1]); + let too_big_commitment = too_big.commit(); test_service .submit_transactions_private(vec![almost_too_big.clone(); N_BIG_TRANSACTIONS]) @@ -148,6 +151,15 @@ async fn huge_transactions() { .await .unwrap_err(); + // Should also update the tx status + assert!(matches!( + test_service + .proxy_global_state + .coordinator + .tx_status(&too_big_commitment), + TransactionStatus::Rejected { .. } + )); + // Builder shouldn't exceed the maximum block size, so transactions // should be included one-by-one assert_eq!( diff --git a/crates/marketplace/src/builder_state.rs b/crates/marketplace/src/builder_state.rs new file mode 100644 index 0000000..4b85221 --- /dev/null +++ b/crates/marketplace/src/builder_state.rs @@ -0,0 +1,1438 @@ +use hotshot_builder_api::v0_2::builder::TransactionStatus; +use hotshot_types::{ + data::{Leaf, QuorumProposal}, + message::Proposal, + traits::{ + block_contents::{BlockHeader, BlockPayload}, + node_implementation::{ConsensusTime, NodeType}, + EncodeBytes, + }, + utils::BuilderCommitment, +}; +use marketplace_builder_shared::block::{BlockId, BuilderStateId, ParentBlockReferences}; +use marketplace_builder_shared::utils::RotatingSet; + +use committable::Commitment; +use tokio::{sync::mpsc::UnboundedSender, task::spawn, time::sleep}; + +use crate::{ + service::{BroadcastReceivers, GlobalState, ReceivedTransaction}, + utils::LegacyCommit as _, +}; +use async_broadcast::broadcast; +use async_broadcast::Receiver as BroadcastReceiver; +use async_broadcast::Sender as BroadcastSender; +use async_lock::RwLock; +use core::panic; +use futures::StreamExt; + +use std::cmp::PartialEq; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Instant; +use std::{collections::hash_map::Entry, time::Duration}; + +pub type TxTimeStamp = u128; + +/// Enum to hold the different sources of the transaction +#[derive(Clone, Debug, PartialEq)] +pub enum TransactionSource { + External, // txn from the external source i.e private mempool + HotShot, // txn from the HotShot network i.e public mempool +} + +/// Decide Message to be put on the decide channel +#[derive(Clone, Debug)] +pub struct DecideMessage { + pub latest_decide_view_number: Types::View, +} +/// DA Proposal Message to be put on the da proposal channel +#[derive(Debug, Clone, PartialEq)] +pub struct DaProposalMessage { + pub view_number: Types::View, + pub txn_commitments: Vec>, + pub sender: ::SignatureKey, + pub builder_commitment: BuilderCommitment, +} + +/// Quorum proposal message to be put on the quorum proposal channel +#[derive(Clone, Debug, PartialEq)] +pub struct QuorumProposalMessage { + pub proposal: Arc>>, + pub sender: Types::SignatureKey, +} +/// Request Message to be put on the request channel +#[derive(Clone, Debug)] +pub struct RequestMessage { + pub requested_view_number: Types::View, + pub response_channel: UnboundedSender>, +} +pub enum TriggerStatus { + Start, + Exit, +} + +/// Response Message to be put on the response channel +#[derive(Debug)] +pub struct BuildBlockInfo { + pub id: BlockId, + pub block_size: u64, + pub offered_fee: u64, + pub block_payload: Types::BlockPayload, + pub metadata: <::BlockPayload as BlockPayload>::Metadata, +} + +/// Response Message to be put on the response channel +#[derive(Debug, Clone)] +pub struct ResponseMessage { + pub builder_hash: BuilderCommitment, + pub transactions: Vec, + pub block_size: u64, + pub offered_fee: u64, +} +#[derive(Debug, Clone)] +/// Enum to hold the status out of the decide event +pub enum Status { + ShouldExit, + ShouldContinue, +} + +/// Builder State to hold the state of the builder +#[derive(Debug)] +pub struct BuilderState { + /// txns that have been included in recent blocks that have + /// been built. This is used to try and guarantee that a transaction + /// isn't duplicated. + /// Keeps a history of the last 3 proposals. + pub included_txns: RotatingSet>, + + /// txn commits currently in the `tx_queue`. This is used as a quick + /// check for whether a transaction is already in the `tx_queue` or + /// not. + /// + /// This should be kept up-to-date with the `tx_queue` as it acts as an + /// accessory to the `tx_queue`. + pub txn_commits_in_queue: HashSet>, + + /// filtered queue of available transactions, taken from `tx_receiver` + pub tx_queue: Vec>>, + + /// `da_proposal_payload_commit` to (`da_proposal`, `node_count`) + #[allow(clippy::type_complexity)] + pub da_proposal_payload_commit_to_da_proposal: + HashMap<(BuilderCommitment, Types::View), Arc>>, + + /// `quorum_proposal_payload_commit` to `quorum_proposal` + #[allow(clippy::type_complexity)] + pub quorum_proposal_payload_commit_to_quorum_proposal: + HashMap<(BuilderCommitment, Types::View), Arc>>>, + + /// Spawned-from references to the parent block. + pub parent_block_references: ParentBlockReferences, + + // Channel Receivers for the HotShot events, Tx_receiver could also receive the external transactions + /// decide receiver + pub decide_receiver: BroadcastReceiver>, + + /// da proposal receiver + pub da_proposal_receiver: BroadcastReceiver>, + + /// quorum proposal receiver + pub quorum_proposal_receiver: BroadcastReceiver>, + + /// channel receiver for the block requests + pub req_receiver: BroadcastReceiver>, + + /// incoming stream of transactions + pub tx_receiver: BroadcastReceiver>>, + + /// global state handle, defined in the service.rs + pub global_state: Arc>>, + + /// locally spawned builder Commitements + pub builder_commitments: HashSet<(BuilderStateId, BuilderCommitment)>, + + /// timeout for maximising the txns in the block + pub maximize_txn_capture_timeout: Duration, + + /// constant fee that the builder will offer per byte of data sequenced + pub base_fee: u64, + + /// validated state that is required for a proposal to be considered valid. Needed for the + /// purposes of building a valid block payload within the sequencer. + pub validated_state: Arc, + + /// instance state to enfoce `max_block_size` + pub instance_state: Arc, +} + +/// [`best_builder_states_to_extend`] is a utility function that is used to +/// in order to determine which [`BuilderState`]s are the best fit to extend +/// from. +/// +/// This function is designed to inspect the current state of the global state +/// in order to determine which [`BuilderState`]s are the best fit to extend +/// from. We only want to use information from [`GlobalState`] as otherwise +/// we would have some insider knowledge unique to our specific [`BuilderState`] +/// rather than knowledge that is available to all [`BuilderState`]s. In fact, +/// in order to ensure this, this function lives outside of the [`BuilderState`] +/// itself. +/// +/// In an ideal circumstance the best [`BuilderState`] to extend from is going to +/// be the one that is immediately preceding the [`QuorumProposal`] that we are +/// attempting to extend from. However, if all we know is the view number of +/// the [`QuorumProposal`] that we are attempting to extend from, then we may end +/// up in a scenario where we have multiple [`BuilderState`]s that are all equally +/// valid to extend from. When this happens, we have the potential for a data +/// race. +/// +/// The primary cause of this has to due with the interface of the +/// [`ProxyGlobalState`](crate::service::ProxyGlobalState)'s API. +/// In general, we want to be able to retrieve a +/// [`BuilderState`] via the [`BuilderStateId`]. The [`BuilderStateId`] only +/// references a [`ViewNumber`](hotshot_types::data::ViewNumber) and a +/// [`VidCommitment`](hotshot_types::vid::VidCommitment). While this information +/// is available in the [`QuorumProposal`], it only helps us to rule out +/// [`BuilderState`]s that already exist. It does **NOT** help us to pick a +/// [`BuilderState`] that is the best fit to extend from. +/// +/// This is where the `justify_qc` comes in to consideration. The `justify_qc` +/// contains the previous [`ViewNumber`](hotshot_types::data::ViewNumber) that is being +/// extended from, and in addition it also contains the previous [`Commitment>`] +/// that is being built on top of. Since our [`BuilderState`]s store identifying +/// information that contains this same `leaf_commit` we can compare these +/// directly to ensure that we are extending from the correct [`BuilderState`]. +/// +/// This function determines the best [`BuilderState`] in the following steps: +/// +/// 1. If we have a [`BuilderState`] that is already spawned for the current +/// [`QuorumProposal`], then we should should return no states, as one already +/// exists. This will prevent us from attempting to spawn duplicate +/// [`BuilderState`]s. +/// 2. Attempt to find all [`BuilderState`]s that are recorded within +/// [`GlobalState`] that have matching view number and leaf commitments. There +/// *should* only be one of these. But all would be valid extension points. +/// 3. If we can't find any [`BuilderState`]s that match the view number +/// and leaf commitment, then we should return for the maximum stored view +/// number that is smaller than the current [`QuorumProposal`]. +/// 4. If there is is only one [`BuilderState`] stored in the [`GlobalState`], then +/// we should return that [`BuilderState`] as the best fit. +/// 5. If none of the other criteria match, we return an empty result as it is +/// unclear what to do in this case. +/// +/// > Note: Any time this function returns more than a single entry in its +/// > [HashSet] result, there is a potential for a race condition. This is +/// > because there are multiple [BuilderState]s that are equally valid to +/// > extend from. This race could be avoided by just picking one of the +/// > entries in the resulting [HashSet], but this is not done here in order +/// > to allow us to highlight the possibility of the race. +async fn best_builder_states_to_extend( + quorum_proposal: Arc>>, + global_state: Arc>>, +) -> HashSet> { + let current_view_number = quorum_proposal.data.view_number; + let current_commitment = quorum_proposal.data.block_header.payload_commitment(); + let current_builder_state_id = BuilderStateId:: { + parent_commitment: current_commitment, + parent_view: current_view_number, + }; + + let global_state_read_lock = global_state.read_arc().await; + + // The first step is to check if we already have a spawned [BuilderState]. + // If we do, then we should indicate that there is no best fit, as we + // don't want to spawn another [BuilderState]. + if global_state_read_lock + .spawned_builder_states + .contains_key(¤t_builder_state_id) + { + // We already have a spawned [BuilderState] for this proposal. + // So we should just ignore it. + return HashSet::new(); + } + + // Next we want to see if there is an immediate match for a [BuilderState] + // that we can extend from. This is the most ideal situation, as it + // implies that we are extending from the correct [BuilderState]. + // We do this by checking the `justify_qc` stored within the + // [QuorumProposal], and checking it against the current spawned + // [BuilderState]s + let justify_qc = &quorum_proposal.data.justify_qc; + let existing_states: HashSet<_> = global_state_read_lock + .spawned_builder_states + .iter() + .filter(|(builder_state_id, (leaf_commit, _))| match leaf_commit { + None => false, + Some(leaf_commit) => { + *leaf_commit == justify_qc.data.leaf_commit + && builder_state_id.parent_view == justify_qc.view_number + } + }) + .map(|(builder_state_id, _)| builder_state_id.clone()) + .collect(); + + // If we found any matching [BuilderState]s, then we should return them + // as the best fit. + if !existing_states.is_empty() { + return existing_states; + } + + // At this point, we don't have any "ideal" matches or scenarios. So we + // need to look for a suitable fall-back. The best fallback condition to + // start with is any [BuilderState] that has the maximum spawned view + // number whose value is smaller than the current [QuorumProposal]. + let maximum_stored_view_number_smaller_than_quorum_proposal = global_state_read_lock + .spawned_builder_states + .keys() + .map(|builder_state_id| *builder_state_id.parent_view) + .filter(|view_number| view_number < ¤t_view_number) + .max(); + + // If we have a maximum view number that meets our criteria, then we should + // return all [BuilderStateId]s that match this view number. + // This can lead to multiple [BuilderStateId]s being returned. + if let Some(maximum_stored_view_number_smaller_than_quorum_proposal) = + maximum_stored_view_number_smaller_than_quorum_proposal + { + // If we are the maximum stored view number smaller than the quorum + // proposal's view number, then we are the best fit. + let mut result = HashSet::new(); + for builder_state_id in + global_state_read_lock + .spawned_builder_states + .keys() + .filter(|builder_state_id| { + builder_state_id.parent_view.u64() + == maximum_stored_view_number_smaller_than_quorum_proposal + }) + { + result.insert(builder_state_id.clone()); + } + return result; + } + + // This is our last ditch effort to continue making progress. If there is + // only one [BuilderState] active, then we should return that as the best + // fit, as it will be the only way we can continue making progress with + // the builder. + if global_state_read_lock.spawned_builder_states.len() == 1 { + let mut result = HashSet::new(); + for builder_state_id in global_state_read_lock.spawned_builder_states.keys() { + result.insert(builder_state_id.clone()); + } + return result; + } + + // This implies that there are only larger [BuilderState]s active than + // the one we are. This is weird, it implies that some sort of time + // travel has occurred view-wise. It is unclear what to do in this + // situation. + + HashSet::new() +} + +impl BuilderState { + /// Utility method that attempts to determine whether we are among + /// the best [`BuilderState`]s to extend from. + async fn am_i_the_best_builder_state_to_extend( + &self, + quorum_proposal: Arc>>, + ) -> bool { + let best_builder_states_to_extend = + best_builder_states_to_extend(quorum_proposal.clone(), self.global_state.clone()).await; + + tracing::debug!( + "{}@{} thinks these are the best builder states to extend from: {:?} for proposal {}@{}", + self.parent_block_references.vid_commitment, + self.parent_block_references.view_number.u64(), + best_builder_states_to_extend + .iter() + .map(|builder_state_id| format!( + "{}@{}", + builder_state_id.parent_commitment, + builder_state_id.parent_view.u64() + )) + .collect::>(), + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + + // We are a best fit if we are contained within the returned set of + // best [BuilderState]s to extend from. + best_builder_states_to_extend.contains(&BuilderStateId { + parent_commitment: self.parent_block_references.vid_commitment, + parent_view: self.parent_block_references.view_number, + }) + } + + /// This method is used to handle incoming DA proposal messages + /// from an incoming HotShot [Event](hotshot_types::event::Event). A DA Proposal is + /// a proposal that is meant to be voted on by consensus nodes in order to + /// determine which transactions should be included for this view. + /// + /// A DA Proposal in conjunction with a Quorum Proposal is an indicator + /// that a new Block / Leaf is being proposed for the HotShot network. So + /// we need to be able to propose new Bundles on top of these proposals. + /// + /// In order to do so we must first wait until we have both a DA Proposal + /// and a Quorum Proposal. If we do not, then we can just record the + /// proposal we have and wait for the other to arrive. + /// + /// If we do have a matching Quorum Proposal, then we can proceed to make + /// a decision about extending the current [BuilderState] via + /// [BuilderState::spawn_clone_that_extends_self]. + /// + /// > Note: In the case of `process_da_proposal` if we don't have a corresponding + /// > Quorum Proposal, then we will have to wait for `process_quorum_proposal` + /// > to be called with the matching Quorum Proposal. Until that point we + /// > exit knowing we have fulfilled the DA proposal portion. + #[tracing::instrument(skip_all, name = "process da proposal", + fields(builder_parent_block_references = %self.parent_block_references))] + async fn process_da_proposal(&mut self, da_msg: Arc>) { + tracing::debug!( + "Builder Received DA message for view {:?}", + da_msg.view_number + ); + + // we do not have the option to ignore DA proposals if we want to be able to handle failed view reorgs. + + // If the respective builder state exists to handle the request + tracing::debug!( + "Extracted builder commitment from the da proposal: {:?}", + da_msg.builder_commitment + ); + + let Entry::Vacant(e) = self + .da_proposal_payload_commit_to_da_proposal + .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) + else { + tracing::debug!("Payload commitment already exists in the da_proposal_payload_commit_to_da_proposal hashmap, so ignoring it"); + return; + }; + + // if we have matching da and quorum proposals, we can skip storing the one, and remove + // the other from storage, and call build_block with both, to save a little space. + let Entry::Occupied(quorum_proposal) = self + .quorum_proposal_payload_commit_to_quorum_proposal + .entry((da_msg.builder_commitment.clone(), da_msg.view_number)) + else { + e.insert(da_msg); + return; + }; + + let quorum_proposal = quorum_proposal.remove(); + + if quorum_proposal.data.view_number != da_msg.view_number { + tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); + return; + } + + self.spawn_clone_that_extends_self(da_msg, quorum_proposal.clone()) + .await; + } + + /// This method is used to handle incoming Quorum Proposal messages + /// from an incoming HotShot [Event](hotshot_types::event::Event). A Quorum + /// Proposal is a proposal that indicates the next potential Block of the + /// chain is being proposed for the HotShot network. This proposal is + /// voted on by the consensus nodes in order to determine if whether this + /// will be the next Block of the chain or not. + /// + /// A Quorum Proposal in conjunction with a DA Proposal is an indicator + /// that a new Block / Leaf is being proposed for the HotShot network. So + /// we need to be able to propose new Bundles on top of these proposals. + /// + /// In order to do so we must first wait until we have both a DA Proposal + /// and a Quorum Proposal. If we do not, then we can just record the + /// proposal we have and wait for the other to arrive. + /// + /// If we do have a matching DA Proposal, then we can proceed to make + /// a decision about extending the current [BuilderState] via + /// [BuilderState::spawn_clone_that_extends_self]. + /// + /// > Note: In the case of `process_quorum_proposal` if we don't have a + /// > corresponding DA Proposal, then we will have to wait for + /// > `process_da_proposal` to be called with the matching DA Proposal. + /// > Until that point we exit knowing we have fulfilled the Quorum proposal + /// > portion. + //#[tracing::instrument(skip_all, name = "Process Quorum Proposal")] + #[tracing::instrument(skip_all, name = "process quorum proposal", + fields(builder_parent_block_references = %self.parent_block_references))] + async fn process_quorum_proposal(&mut self, quorum_msg: QuorumProposalMessage) { + tracing::debug!( + "Builder Received Quorum proposal message for view {:?}", + quorum_msg.proposal.data.view_number + ); + + // Two cases to handle: + // Case 1: Bootstrapping phase + // Case 2: No intended builder state exist + let quorum_proposal = &quorum_msg.proposal; + let view_number = quorum_proposal.data.view_number; + let payload_builder_commitment = quorum_proposal.data.block_header.builder_commitment(); + + tracing::debug!( + "Extracted payload builder commitment from the quorum proposal: {:?}", + payload_builder_commitment + ); + + // first check whether vid_commitment exists in the + // quorum_proposal_payload_commit_to_quorum_proposal hashmap, if yes, ignore it, otherwise + // validate it and later insert in + + let Entry::Vacant(e) = self + .quorum_proposal_payload_commit_to_quorum_proposal + .entry((payload_builder_commitment.clone(), view_number)) + else { + tracing::debug!("Payload commitment already exists in the quorum_proposal_payload_commit_to_quorum_proposal hashmap, so ignoring it"); + return; + }; + + // if we have matching da and quorum proposals, we can skip storing + // the one, and remove the other from storage, and call build_block + // with both, to save a little space. + let Entry::Occupied(da_proposal) = self + .da_proposal_payload_commit_to_da_proposal + .entry((payload_builder_commitment.clone(), view_number)) + else { + e.insert(quorum_proposal.clone()); + return; + }; + + let da_proposal_info = da_proposal.remove(); + // remove the entry from the da_proposal_payload_commit_to_da_proposal hashmap + self.da_proposal_payload_commit_to_da_proposal + .remove(&(payload_builder_commitment.clone(), view_number)); + + // also make sure we clone for the same view number + // (check incase payload commitments are same) + if da_proposal_info.view_number != view_number { + tracing::debug!("Not spawning a clone despite matching DA and QC payload commitments, as they corresponds to different view numbers"); + } + + self.spawn_clone_that_extends_self(da_proposal_info, quorum_proposal.clone()) + .await; + } + + /// A helper function that is used by both [`process_da_proposal`](Self::process_da_proposal) + /// and [`process_quorum_proposal`](Self::process_quorum_proposal) to spawn a new [`BuilderState`] + /// that extends from the current [`BuilderState`]. + /// + /// This helper function also adds additional checks in order to ensure + /// that the [`BuilderState`] that is being spawned is the best fit for the + /// [`QuorumProposal`] that is being extended from. + async fn spawn_clone_that_extends_self( + &mut self, + da_proposal_info: Arc>, + quorum_proposal: Arc>>, + ) { + if !self + .am_i_the_best_builder_state_to_extend(quorum_proposal.clone()) + .await + { + tracing::debug!( + "{} is not the best fit for forking, {}@{}, so ignoring the Quorum proposal, and leaving it to another BuilderState", + self.parent_block_references, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64(), + ); + return; + } + + let (req_sender, req_receiver) = broadcast(self.req_receiver.capacity()); + + tracing::debug!( + "extending BuilderState with a clone from {} with new proposal {}@{}", + self.parent_block_references, + quorum_proposal.data.block_header.payload_commitment(), + quorum_proposal.data.view_number.u64() + ); + + // We literally fork ourselves + self.clone_with_receiver(req_receiver) + .spawn_clone(da_proposal_info, quorum_proposal.clone(), req_sender) + .await; + } + + /// processing the decide event + #[tracing::instrument(skip_all, name = "process decide event", + fields(builder_parent_block_references = %self.parent_block_references))] + async fn process_decide_event(&mut self, decide_msg: DecideMessage) -> Option { + // Exit out all the builder states if their parent_block_references.view_number is less than the latest_decide_view_number + // The only exception is that we want to keep the highest view number builder state active to ensure that + // we have a builder state to handle the incoming DA and Quorum proposals + let decide_view_number = decide_msg.latest_decide_view_number; + + let retained_view_cutoff = self + .global_state + .write_arc() + .await + .remove_handles(decide_view_number); + if self.parent_block_references.view_number < retained_view_cutoff { + tracing::info!( + "Decide@{:?}; Task@{:?} exiting; views < {:?} being reclaimed", + decide_view_number.u64(), + self.parent_block_references.view_number.u64(), + retained_view_cutoff.u64(), + ); + return Some(Status::ShouldExit); + } + + tracing::info!( + "Decide@{:?}; Task@{:?} not exiting; views >= {:?} being retained", + decide_view_number.u64(), + self.parent_block_references.view_number.u64(), + retained_view_cutoff.u64(), + ); + + Some(Status::ShouldContinue) + } + + /// spawn a clone of the builder state + #[tracing::instrument(skip_all, name = "spawn_clone", + fields(builder_parent_block_references = %self.parent_block_references))] + async fn spawn_clone( + mut self, + da_proposal_info: Arc>, + quorum_proposal: Arc>>, + req_sender: BroadcastSender>, + ) { + let leaf = Leaf::from_quorum_proposal(&quorum_proposal.data); + + // We replace our parent_block_references with information from the + // quorum proposal. This is identifying the block that this specific + // instance of [BuilderState] is attempting to build for. + self.parent_block_references = ParentBlockReferences { + view_number: quorum_proposal.data.view_number, + vid_commitment: quorum_proposal.data.block_header.payload_commitment(), + leaf_commit: leaf.legacy_commit(), + builder_commitment: quorum_proposal.data.block_header.builder_commitment(), + }; + + let builder_state_id = BuilderStateId { + parent_commitment: self.parent_block_references.vid_commitment, + parent_view: self.parent_block_references.view_number, + }; + + { + // Let's ensure that we don't already have one of these BuilderStates + // running already. + + let global_state_read_lock = self.global_state.read_arc().await; + if global_state_read_lock + .spawned_builder_states + .contains_key(&builder_state_id) + { + tracing::warn!( + "Aborting spawn_clone, builder state already exists in spawned_builder_states: {:?}", + builder_state_id + ); + return; + } + } + + for tx in da_proposal_info.txn_commitments.iter() { + self.txn_commits_in_queue.remove(tx); + } + + // We add the included transactions to the included_txns set, so we can + // also filter them should they be included in a future transaction + // submission. + self.included_txns + .extend(da_proposal_info.txn_commitments.iter().cloned()); + + // We wish to keep only the transactions in the tx_queue to those that + // also exist in the txns_in_queue set. + self.tx_queue + .retain(|tx| self.txn_commits_in_queue.contains(&tx.commit)); + + // register the spawned builder state to spawned_builder_states in the + // global state We register this new child within the global_state, so + // that it can be looked up via the [BuilderStateId] in the future. + self.global_state.write_arc().await.register_builder_state( + builder_state_id, + self.parent_block_references.clone(), + req_sender, + ); + + self.event_loop(); + } + + /// A method that will return a [BuildBlockInfo] if it is + /// able to build a block. If it encounters an error building a block, then + /// it will return None. + /// + /// This first starts by collecting transactions to include in the block. It + /// will wait until it has at least one transaction to include in the block, + /// or up to the configured `maximize_txn_capture_timeout` duration elapses. + /// At which point it will attempt to build a block with the transactions it + /// has collected. + /// + /// Upon successfully building a block, a commitment for the [BuilderStateId] + /// and Block payload pair are stored, and a [BuildBlockInfo] is created + /// and returned. + #[tracing::instrument(skip_all, name = "build block", + fields(builder_parent_block_references = %self.parent_block_references))] + async fn build_block( + &mut self, + state_id: BuilderStateId, + ) -> Option> { + // collect all the transactions from the near future + let timeout_after = Instant::now() + self.maximize_txn_capture_timeout; + let sleep_interval = self.maximize_txn_capture_timeout / 10; + while Instant::now() <= timeout_after { + self.collect_txns(timeout_after).await; + + if !self.tx_queue.is_empty() // we have transactions + || Instant::now() + sleep_interval > timeout_after + // we don't have time for another iteration + { + break; + } + + sleep(sleep_interval).await + } + + let Ok((payload, metadata)) = + >::from_transactions( + self.tx_queue.iter().map(|tx| tx.tx.clone()), + &self.validated_state, + &self.instance_state, + ) + .await + else { + tracing::warn!("Failed to build block payload"); + return None; + }; + + let builder_hash = payload.builder_commitment(&metadata); + // count the number of txns + let txn_count = payload.num_transactions(&metadata); + + // insert the recently built block into the builder commitments + self.builder_commitments + .insert((state_id, builder_hash.clone())); + + let encoded_txns: Vec = payload.encode().to_vec(); + let block_size: u64 = encoded_txns.len() as u64; + let offered_fee: u64 = self.base_fee * block_size; + + tracing::info!( + "Builder view num {:?}, building block with {:?} txns, with builder hash {:?}", + self.parent_block_references.view_number, + txn_count, + builder_hash + ); + + Some(BuildBlockInfo { + id: BlockId { + view: self.parent_block_references.view_number, + hash: builder_hash, + }, + block_size, + offered_fee, + block_payload: payload, + metadata, + }) + } + + /// A method that is used to handle incoming + /// [`RequestMessage`]s. These [`RequestMessage`]s are looking for a bundle + /// of transactions to be included in the next block. Instead of returning + /// a value, this method's response will be provided to the [`UnboundedSender`] that + /// is included in the [`RequestMessage`]. + /// + /// At this point this particular [`BuilderState`] has already been deemed + /// as the [`BuilderState`] that should handle this request, and it is up + /// to this [`BuilderState`] to provide the response, if it is able to do + /// so. + /// + /// The response will be a [`ResponseMessage`] that contains the transactions + /// the `Builder` wants to include in the next block in addition to the + /// expected block size, offered fee, and the + /// Builder's commit block of the data being returned. + async fn process_block_request(&mut self, req: RequestMessage) { + let requested_view_number = req.requested_view_number; + // If a spawned clone is active then it will handle the request, otherwise the highest view num builder will handle + if requested_view_number != self.parent_block_references.view_number { + tracing::debug!( + "Builder {:?} Requested view number does not match the built_from_view, so ignoring it", + self.parent_block_references.view_number + ); + return; + } + + tracing::info!( + "Request handled by builder with view {}@{:?} for (view_num: {:?})", + self.parent_block_references.vid_commitment, + self.parent_block_references.view_number, + requested_view_number + ); + + let response = self + .build_block(BuilderStateId { + parent_commitment: self.parent_block_references.vid_commitment, + parent_view: requested_view_number, + }) + .await; + + let Some(response) = response else { + tracing::debug!("No response to send"); + return; + }; + + // form the response message + let response_msg = ResponseMessage { + builder_hash: response.id.hash.clone(), + block_size: response.block_size, + offered_fee: response.offered_fee, + transactions: response + .block_payload + .transactions(&response.metadata) + .collect(), + }; + + let builder_hash = response.id.hash.clone(); + self.global_state.write_arc().await.update_global_state( + BuilderStateId { + parent_commitment: self.parent_block_references.vid_commitment, + parent_view: requested_view_number, + }, + response, + response_msg.clone(), + ); + + // ... and finally, send the response + if let Err(e) = req.response_channel.send(response_msg) { + tracing::warn!( + "Builder {:?} failed to send response to {:?} with builder hash {:?}, Err: {:?}", + self.parent_block_references.view_number, + req, + builder_hash, + e + ); + return; + } + + tracing::info!( + "Builder {:?} Sent response to the request{:?} with builder hash {:?}", + self.parent_block_references.view_number, + req, + builder_hash + ); + } + + // MARK: event loop processing for [BuilderState] + + /// Helper function used to handle incoming [`MessageType`]s, + /// specifically [`RequestMessage`]s, that are received by the + /// [`BuilderState::req_receiver`] channel. + /// + /// This method is used to process block requests. + async fn event_loop_helper_handle_request(&mut self, req: Option>) { + tracing::debug!( + "Received request msg in builder {:?}: {:?}", + self.parent_block_references.view_number, + req + ); + + let Some(req) = req else { + tracing::warn!("No more request messages to consume"); + return; + }; + + let MessageType::RequestMessage(req) = req else { + tracing::warn!("Unexpected message on requests channel: {:?}", req); + return; + }; + + tracing::debug!( + "Received request msg in builder {:?}: {:?}", + self.parent_block_references.view_number, + req + ); + + self.process_block_request(req).await; + } + + /// Helper function that is used to handle incoming [`MessageType`]s, + /// specifically [`DaProposalMessage`]s,that are received by the [`BuilderState::da_proposal_receiver`] channel. + async fn event_loop_helper_handle_da_proposal(&mut self, da: Option>) { + let Some(da) = da else { + tracing::warn!("No more da proposal messages to consume"); + return; + }; + + let MessageType::DaProposalMessage(rda_msg) = da else { + tracing::warn!("Unexpected message on da proposals channel: {:?}", da); + return; + }; + + tracing::debug!( + "Received da proposal msg in builder {:?}:\n {:?}", + self.parent_block_references, + rda_msg.view_number + ); + + self.process_da_proposal(rda_msg).await; + } + + /// Helper function that is used to handle incoming [`MessageType`]s, + /// specifically [`QuorumProposalMessage`]s, that are received by the [`BuilderState::quorum_proposal_receiver`] channel. + async fn event_loop_helper_handle_quorum_proposal( + &mut self, + quorum: Option>, + ) { + let Some(quorum) = quorum else { + tracing::warn!("No more quorum proposal messages to consume"); + return; + }; + + let MessageType::QuorumProposalMessage(quorum_proposal_message) = quorum else { + tracing::warn!( + "Unexpected message on quorum proposals channel: {:?}", + quorum + ); + return; + }; + + tracing::debug!( + "Received quorum proposal msg in builder {:?}:\n {:?} for view ", + self.parent_block_references, + quorum_proposal_message.proposal.data.view_number + ); + + self.process_quorum_proposal(quorum_proposal_message).await; + } + + /// Helper function that is used to handle incoming [`MessageType`]s, + /// specifically [`DecideMessage`]s, that are received by the [`BuilderState::decide_receiver`] channel. + /// + /// This method can trigger the exit of the [`BuilderState::event_loop`] async + /// task via the returned [`std::ops::ControlFlow`] type. If the returned + /// value is a [`std::ops::ControlFlow::Break`], then the + /// [`BuilderState::event_loop`] + /// async task should exit. + async fn event_loop_helper_handle_decide( + &mut self, + decide: Option>, + ) -> std::ops::ControlFlow<()> { + let Some(decide) = decide else { + tracing::warn!("No more decide messages to consume"); + return std::ops::ControlFlow::Continue(()); + }; + + let MessageType::DecideMessage(rdecide_msg) = decide else { + tracing::warn!("Unexpected message on decide channel: {:?}", decide); + return std::ops::ControlFlow::Continue(()); + }; + + let latest_decide_view_num = rdecide_msg.latest_decide_view_number; + tracing::debug!( + "Received decide msg view {:?} in builder {:?}", + &latest_decide_view_num, + self.parent_block_references + ); + let decide_status = self.process_decide_event(rdecide_msg).await; + + let Some(decide_status) = decide_status else { + tracing::warn!( + "decide_status was None; Continuing builder {:?}", + self.parent_block_references + ); + return std::ops::ControlFlow::Continue(()); + }; + + match decide_status { + Status::ShouldContinue => { + tracing::debug!("Continuing builder {:?}", self.parent_block_references); + std::ops::ControlFlow::Continue(()) + } + _ => { + tracing::info!( + "Exiting builder {:?} with decide view {:?}", + self.parent_block_references, + &latest_decide_view_num + ); + std::ops::ControlFlow::Break(()) + } + } + } + + /// spawns an async task that attempts to handle messages being received + /// across the [BuilderState]s various channels. + /// + /// This async task will continue to run until it receives a message that + /// indicates that it should exit. This exit message is sent via the + /// [DecideMessage] channel. + /// + /// The main body of the loop listens to four channels at once, and when + /// a message is received it will process the message with the appropriate + /// handler accordingly. + /// + /// > Note: There is potential for improvement in typing here, as each of + /// > these receivers returns the exact same type despite being separate + /// > Channels. These channels may want to convey separate types so that + /// > the contained message can pertain to its specific channel + /// > accordingly. + #[tracing::instrument(skip_all, name = "event loop", + fields(builder_parent_block_references = %self.parent_block_references))] + pub fn event_loop(mut self) { + let _builder_handle = spawn(async move { + loop { + tracing::debug!( + "Builder {:?} event loop", + self.parent_block_references.view_number + ); + + futures::select! { + req = self.req_receiver.next() => self.event_loop_helper_handle_request(req).await, + da = self.da_proposal_receiver.next() => self.event_loop_helper_handle_da_proposal(da).await, + quorum = self.quorum_proposal_receiver.next() => self.event_loop_helper_handle_quorum_proposal(quorum).await, + decide = self.decide_receiver.next() => if let std::ops::ControlFlow::Break(_) = self.event_loop_helper_handle_decide(decide).await { return; }, + }; + } + }); + } +} +/// Unifies the possible messages that can be received by the builder +#[derive(Debug, Clone)] +pub enum MessageType { + DecideMessage(DecideMessage), + DaProposalMessage(Arc>), + QuorumProposalMessage(QuorumProposalMessage), + RequestMessage(RequestMessage), +} + +#[allow(clippy::too_many_arguments)] +impl BuilderState { + pub fn new( + parent_block_references: ParentBlockReferences, + receivers: &BroadcastReceivers, + req_receiver: BroadcastReceiver>, + tx_queue: Vec>>, + global_state: Arc>>, + maximize_txn_capture_timeout: Duration, + base_fee: u64, + instance_state: Arc, + txn_garbage_collect_duration: Duration, + validated_state: Arc, + ) -> Self { + let txns_in_queue: HashSet<_> = tx_queue.iter().map(|tx| tx.commit).collect(); + BuilderState { + txn_commits_in_queue: txns_in_queue, + parent_block_references, + req_receiver, + tx_queue, + global_state, + maximize_txn_capture_timeout, + base_fee, + instance_state, + validated_state, + included_txns: RotatingSet::new(txn_garbage_collect_duration), + da_proposal_payload_commit_to_da_proposal: HashMap::new(), + quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), + builder_commitments: HashSet::new(), + decide_receiver: receivers.decide.activate_cloned(), + da_proposal_receiver: receivers.da_proposal.activate_cloned(), + quorum_proposal_receiver: receivers.quorum_proposal.activate_cloned(), + tx_receiver: receivers.transactions.activate_cloned(), + } + } + pub fn clone_with_receiver(&self, req_receiver: BroadcastReceiver>) -> Self { + let mut included_txns = self.included_txns.clone(); + included_txns.rotate(); + + BuilderState { + included_txns, + txn_commits_in_queue: self.txn_commits_in_queue.clone(), + parent_block_references: self.parent_block_references.clone(), + decide_receiver: self.decide_receiver.clone(), + da_proposal_receiver: self.da_proposal_receiver.clone(), + quorum_proposal_receiver: self.quorum_proposal_receiver.clone(), + req_receiver, + da_proposal_payload_commit_to_da_proposal: HashMap::new(), + quorum_proposal_payload_commit_to_quorum_proposal: HashMap::new(), + tx_receiver: self.tx_receiver.clone(), + tx_queue: self.tx_queue.clone(), + global_state: self.global_state.clone(), + builder_commitments: self.builder_commitments.clone(), + maximize_txn_capture_timeout: self.maximize_txn_capture_timeout, + base_fee: self.base_fee, + instance_state: self.instance_state.clone(), + validated_state: self.validated_state.clone(), + } + } + + // collect outstanding transactions + async fn collect_txns(&mut self, timeout_after: Instant) { + while Instant::now() <= timeout_after { + match self.tx_receiver.try_recv() { + Ok(tx) => { + if self.included_txns.contains(&tx.commit) { + // We've included this transaction in one of our + // recent blocks, and we do not wish to include it + // again. + continue; + } + + if self.txn_commits_in_queue.contains(&tx.commit) { + // We already have this transaction in our current + // queue, so we do not want to include it again + continue; + } + + self.txn_commits_in_queue.insert(tx.commit); + self.tx_queue.push(tx); + } + + Err(async_broadcast::TryRecvError::Empty) + | Err(async_broadcast::TryRecvError::Closed) => { + // The transaction receiver is empty, or it's been closed. + // If it's closed that's a big problem and we should + // probably indicate it as such. + break; + } + + Err(async_broadcast::TryRecvError::Overflowed(lost)) => { + tracing::warn!("Missed {lost} transactions due to backlog"); + continue; + } + } + } + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use async_broadcast::broadcast; + use committable::RawCommitmentBuilder; + use hotshot_example_types::block_types::TestTransaction; + use hotshot_example_types::node_types::TestTypes; + use hotshot_types::data::ViewNumber; + use hotshot_types::data::{Leaf, QuorumProposal}; + use hotshot_types::traits::node_implementation::{ConsensusTime, NodeType}; + use hotshot_types::utils::BuilderCommitment; + use tracing_subscriber::EnvFilter; + + use super::DaProposalMessage; + use super::MessageType; + use super::ParentBlockReferences; + use crate::testing::{calc_proposal_msg, create_builder_state}; + + /// This test the function `process_da_propsal`. + /// It checkes da_proposal_payload_commit_to_da_proposal change appropriately + /// when receiving a da proposal message. + /// This test also checks whether corresponding BuilderStateId is in global_state. + #[tokio::test] + async fn test_process_da_proposal() { + // Setup logging + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + // Number of views to simulate + const NUM_ROUNDS: usize = 5; + // Capacity of broadcast channels + const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; + // Number of nodes on DA committee + const NUM_STORAGE_NODES: usize = 4; + + // create builder_state without entering event loop + let (_senders, global_state, mut builder_state) = + create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + + // randomly generate a transaction + let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; + let (_quorum_proposal, _quorum_proposal_msg, da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; + + // sub-test one + // call process_da_proposal without matching quorum proposal message + // da_proposal_payload_commit_to_da_proposal should insert the message + let mut correct_da_proposal_payload_commit_to_da_proposal: HashMap< + (BuilderCommitment, ::View), + Arc>, + > = HashMap::new(); + + builder_state + .process_da_proposal(da_proposal_msg.clone()) + .await; + correct_da_proposal_payload_commit_to_da_proposal.insert( + ( + da_proposal_msg.builder_commitment.clone(), + da_proposal_msg.view_number, + ), + da_proposal_msg, + ); + assert_eq!( + builder_state + .da_proposal_payload_commit_to_da_proposal + .clone(), + correct_da_proposal_payload_commit_to_da_proposal.clone(), + ); + // check global_state didn't change + if global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(&builder_state_id) + { + panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); + } + + // sub-test two + // call process_da_proposal with the same msg again + // we should skip the process and everything should be the same + let transactions_1 = transactions.clone(); + let (_quorum_proposal_1, _quorum_proposal_msg_1, da_proposal_msg_1, builder_state_id_1) = + calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_1).await; + builder_state + .process_da_proposal(da_proposal_msg_1.clone()) + .await; + assert_eq!( + builder_state + .da_proposal_payload_commit_to_da_proposal + .clone(), + correct_da_proposal_payload_commit_to_da_proposal.clone(), + ); + // check global_state didn't change + if global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(&builder_state_id_1) + { + panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); + } + + // sub-test three + // add the matching quorum proposal message with different tx + // and call process_da_proposal with this matching da proposal message and quorum proposal message + // we should spawn_clone here + // and check whether global_state has correct BuilderStateId + let transactions_2 = vec![TestTransaction::new(vec![1, 2, 3, 4]); 2]; + let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = + calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; + + // process quorum proposal first, so that later when process_da_proposal we can directly call `build_block` and skip storage + builder_state + .process_quorum_proposal(quorum_proposal_msg_2.clone()) + .await; + + // process da proposal message and do the check + builder_state + .process_da_proposal(da_proposal_msg_2.clone()) + .await; + assert_eq!( + builder_state + .da_proposal_payload_commit_to_da_proposal + .clone(), + correct_da_proposal_payload_commit_to_da_proposal.clone(), + ); + // check global_state has this new builder_state_id + if global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(&builder_state_id_2) + { + tracing::debug!("global_state updated successfully"); + } else { + panic!("global_state should have cooresponding builder_state_id as now we have matching quorum proposal."); + } + } + + /// This test the function `process_quorum_propsal`. + /// It checkes quorum_proposal_payload_commit_to_quorum_proposal change appropriately + /// when receiving a quorum proposal message. + /// This test also checks whether corresponding BuilderStateId is in global_state. + #[tokio::test] + async fn test_process_quorum_proposal() { + // Setup logging + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + tracing::info!("Testing the function `process_quorum_proposal` in `builder_state.rs`"); + + // Number of views to simulate + const NUM_ROUNDS: usize = 5; + // Capacity of broadcast channels + const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; + // Number of nodes on DA committee + const NUM_STORAGE_NODES: usize = 4; + + // create builder_state without entering event loop + let (_senders, global_state, mut builder_state) = + create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + + // randomly generate a transaction + let transactions = vec![TestTransaction::new(vec![1, 2, 3]); 3]; + let (_quorum_proposal, quorum_proposal_msg, _da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions.clone()).await; + + // sub-test one + // call process_quorum_proposal without matching da proposal message + // quorum_proposal_payload_commit_to_quorum_proposal should insert the message + let mut correct_quorum_proposal_payload_commit_to_quorum_proposal = HashMap::new(); + builder_state + .process_quorum_proposal(quorum_proposal_msg.clone()) + .await; + correct_quorum_proposal_payload_commit_to_quorum_proposal.insert( + ( + quorum_proposal_msg + .proposal + .data + .block_header + .builder_commitment + .clone(), + quorum_proposal_msg.proposal.data.view_number, + ), + quorum_proposal_msg.proposal, + ); + assert_eq!( + builder_state + .quorum_proposal_payload_commit_to_quorum_proposal + .clone(), + correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() + ); + // check global_state didn't change + if global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(&builder_state_id) + { + panic!("global_state shouldn't have cooresponding builder_state_id without matching quorum proposal."); + } + + // sub-test two + // add the matching da proposal message with different tx + // and call process_da_proposal with this matching quorum proposal message and quorum da message + // we should spawn_clone here + // and check whether global_state has correct BuilderStateId + let transactions_2 = vec![TestTransaction::new(vec![2, 3, 4]); 2]; + let (_quorum_proposal_2, quorum_proposal_msg_2, da_proposal_msg_2, builder_state_id_2) = + calc_proposal_msg(NUM_STORAGE_NODES, 0, None, transactions_2).await; + + // process da proposal message first, so that later when process_da_proposal we can directly call `build_block` and skip storage + builder_state + .process_da_proposal(da_proposal_msg_2.clone()) + .await; + + // process quorum proposal, and do the check + builder_state + .process_quorum_proposal(quorum_proposal_msg_2.clone()) + .await; + assert_eq!( + builder_state + .quorum_proposal_payload_commit_to_quorum_proposal + .clone(), + correct_quorum_proposal_payload_commit_to_quorum_proposal.clone() + ); + // check global_state has this new builder_state_id + if global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(&builder_state_id_2) + { + tracing::debug!("global_state updated successfully"); + } else { + panic!("global_state should have cooresponding builder_state_id as now we have matching da proposal."); + } + } + + /// This test the function `process_decide_event`. + /// It checkes whether we exit out correct builder states when there's a decide event coming in. + /// This test also checks whether corresponding BuilderStateId is removed in global_state. + #[tokio::test] + async fn test_process_decide_event() { + // Setup logging + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + tracing::info!("Testing the builder core with multiple messages from the channels"); + + // Number of views to simulate + const NUM_ROUNDS: usize = 5; + // Number of transactions to submit per round + const NUM_TXNS_PER_ROUND: usize = 4; + // Capacity of broadcast channels + const CHANNEL_CAPACITY: usize = NUM_ROUNDS * 5; + // Number of nodes on DA committee + const NUM_STORAGE_NODES: usize = 4; + + // create builder_state without entering event loop + let (_senders, global_state, mut builder_state) = + create_builder_state(CHANNEL_CAPACITY, NUM_STORAGE_NODES).await; + + // Transactions to send + let all_transactions = (0..NUM_ROUNDS) + .map(|round| { + (0..NUM_TXNS_PER_ROUND) + .map(|tx_num| TestTransaction::new(vec![round as u8, tx_num as u8])) + .collect::>() + }) + .collect::>(); + let mut prev_quorum_proposal: Option> = None; + // register some builder states for later decide event + #[allow(clippy::needless_range_loop)] + for round in 0..NUM_ROUNDS { + let transactions = all_transactions[round].clone(); + let (quorum_proposal, _quorum_proposal_msg, _da_proposal_msg, builder_state_id) = + calc_proposal_msg(NUM_STORAGE_NODES, round, prev_quorum_proposal, transactions) + .await; + prev_quorum_proposal = Some(quorum_proposal.clone()); + let (req_sender, _req_receiver) = broadcast(CHANNEL_CAPACITY); + let leaf: Leaf = Leaf::from_quorum_proposal(&quorum_proposal); + let leaf_commit = RawCommitmentBuilder::new("leaf commitment") + .u64_field("view number", leaf.view_number().u64()) + .u64_field("block number", leaf.height()) + .field("parent Leaf commitment", leaf.parent_commitment()) + .var_size_field( + "block payload commitment", + leaf.payload_commitment().as_ref(), + ) + .finalize(); + global_state.write_arc().await.register_builder_state( + builder_state_id, + ParentBlockReferences { + view_number: quorum_proposal.view_number, + vid_commitment: quorum_proposal.block_header.payload_commitment, + leaf_commit, + builder_commitment: quorum_proposal.block_header.builder_commitment, + }, + req_sender, + ); + } + + // send out a decide event in a middle round + let latest_decide_view_number = ViewNumber::new(3); + + let decide_message = MessageType::DecideMessage(crate::builder_state::DecideMessage { + latest_decide_view_number, + }); + if let MessageType::DecideMessage(practice_decide_msg) = decide_message.clone() { + builder_state + .process_decide_event(practice_decide_msg.clone()) + .await; + } else { + panic!("Not a decide_message in correct format"); + } + // check whether spawned_builder_states have correct builder_state_id and already exit-ed builder_states older than decides + let current_spawned_builder_states = + global_state.read_arc().await.spawned_builder_states.clone(); + current_spawned_builder_states + .iter() + .for_each(|(builder_state_id, _)| { + assert!(builder_state_id.parent_view >= latest_decide_view_number) + }); + } +} diff --git a/crates/marketplace/src/service.rs b/crates/marketplace/src/service.rs index 79db387..33bc1cd 100644 --- a/crates/marketplace/src/service.rs +++ b/crates/marketplace/src/service.rs @@ -16,9 +16,12 @@ use futures::{ TryStreamExt, }; use hotshot::types::Event; -use hotshot_builder_api::v0_3::{ - builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, - data_source::{AcceptsTxnSubmits, BuilderDataSource}, +use hotshot_builder_api::{ + v0_2::builder::TransactionStatus, + v0_3::{ + builder::{define_api, submit_api, BuildError, Error as BuilderApiError}, + data_source::{AcceptsTxnSubmits, BuilderDataSource}, + }, }; use hotshot_types::bundle::Bundle; use hotshot_types::traits::block_contents::{BuilderFee, Transaction}; @@ -60,6 +63,8 @@ pub struct BuilderConfig { pub txn_garbage_collect_duration: Duration, /// Channel capacity for incoming transactions for a single builder state. pub txn_channel_capacity: usize, + /// Capacity of cache storing information for transaction status API + pub tx_status_cache_capacity: usize, /// Base fee; the sequencing fee for a bundle is calculated as bundle size × base fee pub base_fee: u64, } @@ -99,6 +104,7 @@ impl BuilderConfig { txn_garbage_collect_duration: TEST_INCLUDED_TX_GC_PERIOD, txn_channel_capacity: TEST_CHANNEL_BUFFER_SIZE, base_fee: TEST_BASE_FEE, + tx_status_cache_capacity: TEST_TX_STATUS_CACHE_CAPACITY, } } } @@ -116,6 +122,7 @@ where let coordinator = BuilderStateCoordinator::new( config.txn_channel_capacity, config.txn_garbage_collect_duration, + config.tx_status_cache_capacity, ); Arc::new(Self { hooks: Arc::new(hooks), @@ -409,6 +416,13 @@ where .try_collect() .await } + + async fn txn_status( + &self, + txn_hash: Commitment<::Transaction>, + ) -> Result { + Ok(self.coordinator.tx_status(&txn_hash)) + } } #[async_trait] diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 15ef0e0..b28a6c2 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -25,6 +25,7 @@ hotshot-types = { workspace = true } jf-vid = { workspace = true } nonempty-collections = { workspace = true } once_cell = { workspace = true } +quick_cache = { workspace = true } rand = { workspace = true } serde = { workspace = true } sha2 = { workspace = true } diff --git a/crates/shared/src/coordinator/mod.rs b/crates/shared/src/coordinator/mod.rs index e38a7cb..243e00f 100644 --- a/crates/shared/src/coordinator/mod.rs +++ b/crates/shared/src/coordinator/mod.rs @@ -7,7 +7,10 @@ use std::{ use async_broadcast::Sender; use async_lock::{Mutex, RwLock}; +use committable::Commitment; use either::Either; +use hotshot::traits::BlockPayload; +use hotshot_builder_api::v0_1::builder::TransactionStatus; use hotshot_types::{ data::{DaProposal, QuorumProposal}, event::LeafInfo, @@ -16,6 +19,7 @@ use hotshot_types::{ node_implementation::{ConsensusTime, NodeType}, }, }; +use quick_cache::sync::Cache; use tiered_view_map::TieredViewMap; use tracing::{error, info, warn}; @@ -80,6 +84,7 @@ where Types: NodeType, { builder_states: RwLock>, + tx_status: quick_cache::sync::Cache, TransactionStatus>, transaction_sender: Sender>>, proposals: Mutex>, } @@ -93,7 +98,12 @@ where /// `txn_garbage_collect_duration` specifies the duration for which the coordinator retains the hashes of transactions /// that have been marked as included by its [`BuilderState`]s. Once this duration has elapsed, new [`BuilderState`]s /// can include duplicates of older transactions should such be received again. - pub fn new(txn_channel_capacity: usize, txn_garbage_collect_duration: Duration) -> Self { + /// `tx_status_cache_capacity` controls the capacity of transaction status + pub fn new( + txn_channel_capacity: usize, + txn_garbage_collect_duration: Duration, + tx_status_cache_capacity: usize, + ) -> Self { let (txn_sender, txn_receiver) = async_broadcast::broadcast(txn_channel_capacity); let bootstrap_state = BuilderState::new( ParentBlockReferences::bootstrap(), @@ -108,6 +118,7 @@ where transaction_sender: txn_sender, builder_states: RwLock::new(builder_states), proposals: Mutex::new(ProposalMap::new()), + tx_status: Cache::new(tx_status_cache_capacity), } } @@ -120,6 +131,22 @@ where leaf_chain: Arc>>, ) -> BuilderStateMap { let latest_decide_view_num = leaf_chain[0].leaf.view_number(); + + for leaf_info in leaf_chain.iter() { + if let Some(payload) = leaf_info.leaf.block_payload() { + for commitment in + payload.transaction_commitments(leaf_info.leaf.block_header().metadata()) + { + self.update_txn_status( + &commitment, + TransactionStatus::Sequenced { + leaf: leaf_info.leaf.block_header().block_number(), + }, + ); + } + } + } + let pruned = { let mut builder_states_write_guard = self.builder_states.write().await; let highest_active_view_num = builder_states_write_guard @@ -155,20 +182,32 @@ where &self, transaction: ReceivedTransaction, ) -> Result<(), Error> { - match self.transaction_sender.try_broadcast(Arc::new(transaction)) { - Ok(None) => Ok(()), - Ok(Some(evicted_txn)) => { - warn!( - ?evicted_txn.commit, - "Overflow mode enabled, transaction evicted", - ); - Ok(()) - } + let commit = transaction.commit; + + let maybe_evicted = match self.transaction_sender.try_broadcast(Arc::new(transaction)) { + Ok(maybe_evicted) => maybe_evicted, Err(err) => { warn!(?err, "Failed to broadcast txn"); - Err(Error::TxnSender(err)) + self.update_txn_status( + &commit, + TransactionStatus::Rejected { + reason: "Failed to broadcast transaction".to_owned(), + }, + ); + return Err(Error::TxnSender(err)); } + }; + + self.update_txn_status(&commit, TransactionStatus::Pending); + + if let Some(evicted) = maybe_evicted { + warn!( + ?evicted.commit, + "Overflow mode enabled, transaction evicted", + ); } + + Ok(()) } /// This function should be called whenever new DA Proposal is recieved from HotShot. @@ -444,6 +483,37 @@ where warn!("View time-travel"); Vec::new() } + + /// Update status of transaction. + pub fn update_txn_status( + &self, + txn_hash: &Commitment<::Transaction>, + new_status: TransactionStatus, + ) { + if let Some(old_status) = self.tx_status.get(txn_hash) { + match old_status { + TransactionStatus::Rejected { .. } | TransactionStatus::Sequenced { .. } => { + tracing::debug!( + ?old_status, + ?new_status, + "Not changing status of rejected/sequenced transaction", + ); + return; + } + _ => { + tracing::debug!(?old_status, ?new_status, "Changing status of transaction",); + } + } + } + self.tx_status.insert(*txn_hash, new_status); + } + + /// Get transaction status for given hash + pub fn tx_status(&self, txn_hash: &Commitment) -> TransactionStatus { + self.tx_status + .get(txn_hash) + .unwrap_or(TransactionStatus::Unknown) + } } #[cfg_attr(coverage_nightly, coverage(off))] @@ -451,13 +521,17 @@ where mod tests { use std::time::Instant; + use committable::Committable; use hotshot_example_types::node_types::TestTypes; + use hotshot_types::data::ViewNumber; use tracing_test::traced_test; use crate::{ block::TransactionSource, testing::{ - constants::{TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD}, + constants::{ + TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD, TEST_TX_STATUS_CACHE_CAPACITY, + }, mock, }, }; @@ -469,8 +543,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_coordinator_new() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); assert_eq!( coordinator.builder_states.read().await.len(), @@ -518,8 +595,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_handle_proposal_matching_types_creates_builder_state() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); let (da_proposal, quorum_proposal) = mock::proposals(7).await; @@ -541,8 +621,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_handle_proposal_duplicate_proposal_ignored() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); let (proposal, _) = mock::proposals(7).await; @@ -562,8 +645,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_handle_proposal_stores_new_proposal_when_no_match() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); let (proposal, _) = mock::proposals(1).await; let proposal_id = ProposalId::from_da_proposal(&proposal); @@ -587,8 +673,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_handle_proposal_same_view_different_proposals() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); let view_number = 9; // arbitrary let (da_proposal_1, quorum_proposal_1) = mock::proposals(view_number).await; @@ -622,8 +711,11 @@ mod tests { #[tokio::test] #[traced_test] async fn test_decide_reaps_old_proposals() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); for view in 0..100 { let (da_proposal, quorum_proposal) = mock::proposals(view).await; @@ -661,11 +753,87 @@ mod tests { ); } + #[tokio::test] + #[traced_test] + async fn test_transaction_status() { + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); + + let enqueued_transactions = (0..TEST_CHANNEL_BUFFER_SIZE) + .map(|_| mock::transaction()) + .collect::>(); + + // Coordinator should update transaction status when included + for tx in enqueued_transactions.iter() { + assert_eq!( + coordinator.tx_status(&tx.commit()), + TransactionStatus::Unknown + ); + coordinator + .handle_transaction(ReceivedTransaction::new( + tx.clone(), + TransactionSource::Public, + )) + .await + .unwrap(); + assert_eq!( + coordinator.tx_status(&tx.commit()), + TransactionStatus::Pending + ); + } + + // This transaction won't be included, we're over capacity + let rejected_transaction = mock::transaction(); + coordinator + .handle_transaction(ReceivedTransaction::new( + rejected_transaction.clone(), + TransactionSource::Public, + )) + .await + .unwrap_err(); + assert!(matches!( + coordinator.tx_status(&rejected_transaction.commit()), + TransactionStatus::Rejected { .. } + )); + + // Transaction that was never submitted to the builder but is going to be + // included anyway, simulating it being included by a different builder + let external_transaction = mock::transaction(); + + let decided_transactions = enqueued_transactions + .iter() + .chain(std::iter::once(&external_transaction)) + .cloned() + .collect::>(); + + // Simulate all transactions being decided + let leaf_chain = mock::decide_leaf_chain_with_transactions( + *ViewNumber::genesis(), + decided_transactions.clone(), + ) + .await; + coordinator.handle_decide(leaf_chain).await; + + // All decided transactions should change status + for tx in decided_transactions { + assert!(matches!( + coordinator.tx_status(&tx.commit()), + TransactionStatus::Sequenced { .. } + )); + } + } + #[tokio::test] #[traced_test] async fn test_transaction_overflow() { - let coordinator = - BuilderStateCoordinator::new(TEST_CHANNEL_BUFFER_SIZE, TEST_INCLUDED_TX_GC_PERIOD); + let coordinator = BuilderStateCoordinator::new( + TEST_CHANNEL_BUFFER_SIZE, + TEST_INCLUDED_TX_GC_PERIOD, + TEST_TX_STATUS_CACHE_CAPACITY, + ); // Coordinator should handle transactions while there's space in the buffer for _ in 0..TEST_CHANNEL_BUFFER_SIZE { diff --git a/crates/shared/src/testing/constants.rs b/crates/shared/src/testing/constants.rs index 31f0b93..62e57c0 100644 --- a/crates/shared/src/testing/constants.rs +++ b/crates/shared/src/testing/constants.rs @@ -23,6 +23,11 @@ pub const TEST_NUM_CONSENSUS_RETRIES: usize = 4; /// specifically bounded in tests, so it is set to an arbitrary value. pub const TEST_CHANNEL_BUFFER_SIZE: usize = 81920; +/// Governs the target space used by the mapping from txn to its status. +/// This is expressed as a target number of transactions. +/// This is an arbitrary default value for testing. +pub const TEST_TX_STATUS_CACHE_CAPACITY: usize = 1_000_000 * 10; + /// Governs the included transaction GC period used in tests. /// This is an arbitrary default value for testing. pub const TEST_INCLUDED_TX_GC_PERIOD: Duration = Duration::from_secs(1); @@ -38,3 +43,8 @@ pub const TEST_MAXIMIZE_TX_CAPTURE_TIMEOUT: Duration = Duration::from_millis(100 /// Governs fee per byte used by builders. /// This is an arbitrary default value for testing. pub const TEST_BASE_FEE: u64 = 1; + +/// Governs the target space used by the mapping from txn to its status. +/// This is expressed as a target number of transactions. +/// This is an arbitrary default value for testing. +pub const TEST_MAX_TX_NUM: usize = 1_000_000 * 10; diff --git a/crates/shared/src/testing/mock.rs b/crates/shared/src/testing/mock.rs index 0161825..20f614c 100644 --- a/crates/shared/src/testing/mock.rs +++ b/crates/shared/src/testing/mock.rs @@ -40,8 +40,21 @@ pub fn transaction() -> TestTransaction { } pub async fn decide_leaf_chain(decided_view: u64) -> Arc>> { - let (_, quorum_proposal) = proposals(decided_view).await; - let leaf = Leaf::from_quorum_proposal(&quorum_proposal); + decide_leaf_chain_with_transactions(decided_view, vec![transaction()]).await +} + +pub async fn decide_leaf_chain_with_transactions( + decided_view: u64, + transactions: Vec, +) -> Arc>> { + let (da_proposal, quorum_proposal) = + proposals_with_transactions(decided_view, transactions).await; + let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal); + let payload = >::from_bytes( + &da_proposal.encoded_transactions, + &da_proposal.metadata, + ); + leaf.fill_block_payload_unchecked(payload); Arc::new(vec![LeafInfo { leaf, state: Default::default(), @@ -52,20 +65,28 @@ pub async fn decide_leaf_chain(decided_view: u64) -> Arc /// Create mock pair of DA and Quorum proposals pub async fn proposals(view: u64) -> (DaProposal, QuorumProposal) { + let transaction = transaction(); + proposals_with_transactions(view, vec![transaction]).await +} + +/// Create mock pair of DA and Quorum proposals with given transactions +pub async fn proposals_with_transactions( + view: u64, + transactions: Vec, +) -> (DaProposal, QuorumProposal) { let view_number = ::View::new(view); let upgrade_lock = UpgradeLock::::new(); let validated_state = TestValidatedState::default(); let instance_state = TestInstanceState::default(); - let transaction = transaction(); let (payload, metadata) = >::from_transactions( - vec![transaction.clone()], + transactions.clone(), &validated_state, &instance_state, ) .await .unwrap(); - let encoded_transactions = TestTransaction::encode(&[transaction]); + let encoded_transactions = TestTransaction::encode(&transactions); let header = TestBlockHeader::new( &Leaf::::genesis(&Default::default(), &Default::default()).await, diff --git a/flake.lock b/flake.lock index ea56974..3dad7c3 100644 --- a/flake.lock +++ b/flake.lock @@ -37,11 +37,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1726560853, - "narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=", + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", "owner": "numtide", "repo": "flake-utils", - "rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", "type": "github" }, "original": { @@ -60,11 +60,11 @@ "nixpkgs-stable": "nixpkgs-stable" }, "locked": { - "lastModified": 1730302582, - "narHash": "sha256-W1MIJpADXQCgosJZT8qBYLRuZls2KSiKdpnTVdKBuvU=", + "lastModified": 1732021966, + "narHash": "sha256-mnTbjpdqF0luOkou8ZFi2asa1N3AA2CchR/RqCNmsGE=", "owner": "cachix", "repo": "git-hooks.nix", - "rev": "af8a16fe5c264f5e9e18bcee2859b40a656876cf", + "rev": "3308484d1a443fc5bc92012435d79e80458fe43c", "type": "github" }, "original": { @@ -96,11 +96,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1730200266, - "narHash": "sha256-l253w0XMT8nWHGXuXqyiIC/bMvh1VRszGXgdpQlfhvU=", + "lastModified": 1732014248, + "narHash": "sha256-y/MEyuJ5oBWrWAic/14LaIr/u5E0wRVzyYsouYY3W6w=", "owner": "nixos", "repo": "nixpkgs", - "rev": "807e9154dcb16384b1b765ebe9cd2bba2ac287fd", + "rev": "23e89b7da85c3640bbc2173fe04f4bd114342367", "type": "github" }, "original": { @@ -112,11 +112,11 @@ }, "nixpkgs-stable": { "locked": { - "lastModified": 1720386169, - "narHash": "sha256-NGKVY4PjzwAa4upkGtAMz1npHGoRzWotlSnVlqI40mo=", + "lastModified": 1730741070, + "narHash": "sha256-edm8WG19kWozJ/GqyYx2VjW99EdhjKwbY3ZwdlPAAlo=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "194846768975b7ad2c4988bdb82572c00222c0d7", + "rev": "d063c1dd113c91ab27959ba540c0d9753409edf3", "type": "github" }, "original": { @@ -156,11 +156,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1730514457, - "narHash": "sha256-cjFX208s9pyaOfMvF9xI6WyafyXINqdhMF7b1bMQpLI=", + "lastModified": 1732328983, + "narHash": "sha256-RHt12f/slrzDpSL7SSkydh8wUE4Nr4r23HlpWywed9E=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "1ff38ca26eb31858e4dfe7fe738b6b3ce5d74922", + "rev": "ed8aa5b64f7d36d9338eb1d0a3bb60cf52069a72", "type": "github" }, "original": {