diff --git a/Cargo.lock b/Cargo.lock index e85a4e03..7f2f8be9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,6 +342,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "ascii" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" + [[package]] name = "ascii-canvas" version = "3.0.0" @@ -378,47 +384,15 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-graphql" -version = "4.0.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9ed522678d412d77effe47b3c82314ac36952a35e6e852093dd48287c421f80" -dependencies = [ - "async-graphql-derive 4.0.16", - "async-graphql-parser 4.0.16", - "async-graphql-value 4.0.16", - "async-stream", - "async-trait", - "base64 0.13.1", - "bytes", - "fast_chemail", - "fnv", - "futures-util", - "http", - "indexmap 1.9.3", - "mime", - "multer", - "num-traits", - "once_cell", - "pin-project-lite", - "regex", - "serde", - "serde_json", - "serde_urlencoded", - "static_assertions", - "tempfile", - "thiserror", -] - [[package]] name = "async-graphql" version = "6.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "298a5d587d6e6fdb271bf56af2dc325a80eb291fd0fc979146584b9a05494a8c" dependencies = [ - "async-graphql-derive 6.0.11", - "async-graphql-parser 6.0.11", - "async-graphql-value 6.0.11", + "async-graphql-derive", + "async-graphql-parser", + "async-graphql-value", "async-stream", "async-trait", "base64 0.13.1", @@ -445,37 +419,22 @@ dependencies = [ [[package]] name = "async-graphql-axum" -version = "4.0.16" +version = "6.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91ac174c05670edffb720bc376b9d4c274c3d127ac08ed3d38144c9415502cd" +checksum = "01a1c20a2059bffbc95130715b23435a05168c518fba9709c81fa2a38eed990c" dependencies = [ - "async-graphql 4.0.16", + "async-graphql", "async-trait", - "axum 0.5.17", + "axum", "bytes", "futures-util", - "http-body", "serde_json", + "tokio", + "tokio-stream", "tokio-util", "tower-service", ] -[[package]] -name = "async-graphql-derive" -version = "4.0.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c121a894495d7d3fc3d4e15e0a9843e422e4d1d9e3c514d8062a1c94b35b005d" -dependencies = [ - "Inflector", - "async-graphql-parser 4.0.16", - "darling 0.14.4", - "proc-macro-crate 1.3.1", - "proc-macro2", - "quote", - "syn 1.0.109", - "thiserror", -] - [[package]] name = "async-graphql-derive" version = "6.0.11" @@ -483,8 +442,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7f329c7eb9b646a72f70c9c4b516c70867d356ec46cb00dcac8ad343fd006b0" dependencies = [ "Inflector", - "async-graphql-parser 6.0.11", - "darling 0.20.3", + "async-graphql-parser", + "darling", "proc-macro-crate 1.3.1", "proc-macro2", "quote", @@ -493,42 +452,18 @@ dependencies = [ "thiserror", ] -[[package]] -name = "async-graphql-parser" -version = "4.0.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b6c386f398145c6180206c1869c2279f5a3d45db5be4e0266148c6ac5c6ad68" -dependencies = [ - "async-graphql-value 4.0.16", - "pest", - "serde", - "serde_json", -] - [[package]] name = "async-graphql-parser" version = "6.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6139181845757fd6a73fbb8839f3d036d7150b798db0e9bb3c6e83cdd65bd53b" dependencies = [ - "async-graphql-value 6.0.11", + "async-graphql-value", "pest", "serde", "serde_json", ] -[[package]] -name = "async-graphql-value" -version = "4.0.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a941b499fead4a3fb5392cabf42446566d18c86313f69f2deab69560394d65f" -dependencies = [ - "bytes", - "indexmap 1.9.3", - "serde", - "serde_json", -] - [[package]] name = "async-graphql-value" version = "6.0.11" @@ -594,6 +529,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] + [[package]] name = "atty" version = "0.2.14" @@ -683,41 +627,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "axum" -version = "0.5.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" -dependencies = [ - "async-trait", - "axum-core 0.2.9", - "base64 0.13.1", - "bitflags 1.3.2", - "bytes", - "futures-util", - "headers", - "http", - "http-body", - "hyper", - "itoa", - "matchit 0.5.0", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "serde", - "serde_json", - "serde_urlencoded", - "sha-1 0.10.1", - "sync_wrapper", - "tokio", - "tokio-tungstenite 0.17.2", - "tower", - "tower-http 0.3.5", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.6.20" @@ -725,7 +634,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core 0.3.4", + "axum-core", + "base64 0.21.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -734,7 +644,7 @@ dependencies = [ "http-body", "hyper", "itoa", - "matchit 0.7.3", + "matchit", "memchr", "mime", "percent-encoding", @@ -744,29 +654,15 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", ] -[[package]] -name = "axum-core" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http", - "http-body", - "mime", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.3.4" @@ -1100,6 +996,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" + [[package]] name = "byteorder" version = "1.5.0" @@ -1317,6 +1219,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" +dependencies = [ + "ascii", + "byteorder", + "either", + "memchr", + "unreachable", +] + [[package]] name = "concurrent-queue" version = "2.3.0" @@ -1527,38 +1442,14 @@ dependencies = [ "cipher", ] -[[package]] -name = "darling" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" -dependencies = [ - "darling_core 0.14.4", - "darling_macro 0.14.4", -] - [[package]] name = "darling" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", -] - -[[package]] -name = "darling_core" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn 1.0.109", + "darling_core", + "darling_macro", ] [[package]] @@ -1575,24 +1466,13 @@ dependencies = [ "syn 2.0.38", ] -[[package]] -name = "darling_macro" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" -dependencies = [ - "darling_core 0.14.4", - "quote", - "syn 1.0.109", -] - [[package]] name = "darling_macro" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ - "darling_core 0.20.3", + "darling_core", "quote", "syn 2.0.38", ] @@ -2162,7 +2042,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tokio-tungstenite 0.20.1", + "tokio-tungstenite", "tracing", "tracing-futures", "url", @@ -2220,7 +2100,7 @@ dependencies = [ "tokio", "tracing", "walkdir", - "yansi", + "yansi 0.5.1", ] [[package]] @@ -2302,7 +2182,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d15473d7f83b54a44826907af16ae5727eaacaf6e53b51474016d3efd9aa35d5" dependencies = [ - "darling 0.20.3", + "darling", "proc-macro2", "quote", "syn 2.0.38", @@ -2319,12 +2199,32 @@ dependencies = [ "subtle", ] +[[package]] +name = "figment" +version = "0.10.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "649f3e5d826594057e9a519626304d8da859ea8a0b18ce99500c586b8d45faee" +dependencies = [ + "atomic", + "pear", + "serde", + "toml 0.8.8", + "uncased", + "version_check", +] + [[package]] name = "finl_unicode" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "firestorm" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c5f6c2c942da57e2aaaa84b8a521489486f14e75e7fa91dab70aba913975f98" + [[package]] name = "fixed-hash" version = "0.8.0" @@ -2665,10 +2565,20 @@ dependencies = [ "smallvec", ] +[[package]] +name = "graphql" +version = "0.3.0" +source = "git+https://github.com/edgeandnode/toolshed?branch=main#af60592fcd8ecf67cb043cc9f2ce7ceb1829370b" +dependencies = [ + "firestorm", + "graphql-parser", + "serde", +] + [[package]] name = "graphql-http" version = "0.1.1" -source = "git+https://github.com/edgeandnode/toolshed?branch=main#f113025319cde85b03706f80d6be5711dcb31678" +source = "git+https://github.com/edgeandnode/toolshed?branch=main#af60592fcd8ecf67cb043cc9f2ce7ceb1829370b" dependencies = [ "anyhow", "async-trait", @@ -2678,6 +2588,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "graphql-parser" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" +dependencies = [ + "combine", + "thiserror", +] + [[package]] name = "group" version = "0.13.0" @@ -3082,7 +3002,7 @@ dependencies = [ "arc-swap", "async-trait", "autometrics 0.6.0", - "axum 0.6.20", + "axum", "build-info", "env_logger", "ethers", @@ -3177,6 +3097,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "inout" version = "0.1.3" @@ -3626,12 +3552,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matchit" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" - [[package]] name = "matchit" version = "0.7.3" @@ -4290,6 +4210,29 @@ dependencies = [ "hmac", ] +[[package]] +name = "pear" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a386cd715229d399604b50d1361683fe687066f42d56f54be995bc6868f71c" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi 1.0.0-rc.1", +] + +[[package]] +name = "pear_codegen" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f0f13dac8069c139e8300a6510e3f4143ecf5259c60b116a9b271b4ca0d54" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.38", +] + [[package]] name = "pem" version = "1.1.1" @@ -4529,7 +4472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" dependencies = [ "diff", - "yansi", + "yansi 0.5.1", ] [[package]] @@ -4608,6 +4551,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", + "version_check", + "yansi 1.0.0-rc.1", +] + [[package]] name = "prometheus" version = "0.13.3" @@ -5569,7 +5525,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" dependencies = [ - "darling 0.20.3", + "darling", "proc-macro2", "quote", "syn 2.0.38", @@ -5595,10 +5551,10 @@ dependencies = [ "alloy-primitives", "alloy-sol-types", "anyhow", - "async-graphql 4.0.16", + "async-graphql", "async-graphql-axum", "autometrics 0.3.3", - "axum 0.5.17", + "axum", "build-info", "build-info-build", "cargo-husky", @@ -5610,6 +5566,8 @@ dependencies = [ "ethers-core", "eventuals", "faux", + "figment", + "graphql", "graphql-http", "hex", "hex-literal", @@ -5622,6 +5580,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_spanned", "sha3", "sqlx", "tap_core", @@ -5629,9 +5588,8 @@ dependencies = [ "thegraph", "thiserror", "tokio", - "toml 0.8.8", "tower", - "tower-http 0.4.4", + "tower-http", "tracing", "tracing-subscriber", "wiremock", @@ -5650,17 +5608,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "sha-1" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha1" version = "0.10.6" @@ -5808,7 +5755,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "sha-1 0.9.8", + "sha-1", ] [[package]] @@ -6260,7 +6207,7 @@ dependencies = [ "alloy-primitives", "alloy-sol-types", "anyhow", - "axum 0.6.20", + "axum", "clap", "ethereum-types", "ethers-core", @@ -6351,11 +6298,11 @@ dependencies = [ [[package]] name = "thegraph" version = "0.1.1" -source = "git+https://github.com/edgeandnode/toolshed?branch=main#f113025319cde85b03706f80d6be5711dcb31678" +source = "git+https://github.com/edgeandnode/toolshed?branch=main#af60592fcd8ecf67cb043cc9f2ce7ceb1829370b" dependencies = [ "alloy-primitives", "alloy-sol-types", - "async-graphql 6.0.11", + "async-graphql", "bs58", "ethers-core", "serde", @@ -6508,18 +6455,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.17.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.17.3", -] - [[package]] name = "tokio-tungstenite" version = "0.20.1" @@ -6531,7 +6466,7 @@ dependencies = [ "rustls", "tokio", "tokio-rustls", - "tungstenite 0.20.1", + "tungstenite", "webpki-roots", ] @@ -6621,25 +6556,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" -dependencies = [ - "bitflags 1.3.2", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-http" version = "0.4.4" @@ -6677,7 +6593,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db81d9313372d714152194f3f2b66badda23a783fb6a97462e35f632814f4cff" dependencies = [ - "axum 0.6.20", + "axum", "forwarded-header-value", "futures", "futures-core", @@ -6782,25 +6698,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "tungstenite" -version = "0.17.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" -dependencies = [ - "base64 0.13.1", - "byteorder", - "bytes", - "http", - "httparse", - "log", - "rand 0.8.5", - "sha-1 0.10.1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.20.1" @@ -6851,6 +6748,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "uncased" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b9bc53168a4be7402ab86c3aad243a84dd7381d09be0eddc81280c1da95ca68" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6890,6 +6796,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" +dependencies = [ + "void", +] + [[package]] name = "unsafe-libyaml" version = "0.2.9" @@ -6969,6 +6884,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "wait-timeout" version = "0.2.0" @@ -7290,6 +7211,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yansi" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1367295b8f788d371ce2dbc842c7b709c73ee1364d30351dd300ec2203b12377" + [[package]] name = "zeroize" version = "1.6.0" diff --git a/service/Cargo.toml b/service/Cargo.toml index 6e16b787..61e548d3 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,8 +10,8 @@ license = "Apache-2.0" [dependencies] indexer-common = { path = "../common" } confy = "0.5.1" -ethers-core = "2.0.10" -ethers = "2.0.10" +ethers-core = "2.0.11" +ethers = "2.0.11" eventuals = "0.6.7" dotenvy = "0.15" log = "0.4.17" @@ -22,7 +22,7 @@ tracing = "0.1.34" thiserror = "1.0.49" serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1" -axum = "0.5" +axum = "0.6.20" hyper = "0.14.27" tower = { version = "0.4", features = ["util", "timeout", "limit"] } tower-http = { version = "0.4.0", features = [ @@ -30,10 +30,9 @@ tower-http = { version = "0.4.0", features = [ "trace", "cors", ] } -toml = "0.8.8" once_cell = "1.17" -async-graphql = "4.0.16" -async-graphql-axum = "4.0.16" +async-graphql = "6.0.11" +async-graphql-axum = "6.0.11" sha3 = "0.10.6" tracing-subscriber = { version = "0.3", features = [ "env-filter", @@ -59,8 +58,16 @@ alloy-primitives = { version = "0.5.2", features = ["serde"] } alloy-sol-types = "0.5.2" lazy_static = "1.4.0" thegraph = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } -graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } +graphql = { git = "https://github.com/edgeandnode/toolshed", branch = "main" } +graphql-http = { git = "https://github.com/edgeandnode/toolshed", branch = "main", features = [ + "http-reqwest", +] } build-info = "0.0.34" +figment = { version = "0.10", features = ["toml", "env"] } + +# FIXME: Needed due to a serde_spanned version conflict between +# `ethers` and `figment`. +serde_spanned = "=0.6.4" [dev-dependencies] faux = "0.1.10" diff --git a/service/src/cli.rs b/service/src/cli.rs new file mode 100644 index 00000000..e576e744 --- /dev/null +++ b/service/src/cli.rs @@ -0,0 +1,9 @@ +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Parser)] +pub struct Cli { + #[arg(long, value_name = "FILE")] + pub config: PathBuf, +} diff --git a/service/src/common/address.rs b/service/src/common/address.rs deleted file mode 100644 index 9172f33d..00000000 --- a/service/src/common/address.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use ethers::signers::{ - coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError, -}; -use ethers_core::k256::ecdsa::SigningKey; - -/// Build Wallet from Private key or Mnemonic -pub fn build_wallet(value: &str) -> Result, WalletError> { - value - .parse::() - .or(MnemonicBuilder::::default().phrase(value).build()) -} - -/// Get wallet public address to String -pub fn wallet_address(wallet: &Wallet) -> String { - format!("{:?}", wallet.address()) -} diff --git a/service/src/common/database.rs b/service/src/common/database.rs deleted file mode 100644 index 5ba4707e..00000000 --- a/service/src/common/database.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use sqlx::{postgres::PgPoolOptions, PgPool}; - -use std::time::Duration; -use tracing::debug; - -use crate::config; - -pub async fn connect(config: &config::Postgres) -> PgPool { - let url = format!( - "postgresql://{}:{}@{}:{}/{}", - config.postgres_username, - config.postgres_password, - config.postgres_host, - config.postgres_port, - config.postgres_database - ); - - debug!( - postgres_host = tracing::field::debug(&config.postgres_host), - postgres_port = tracing::field::debug(&config.postgres_port), - postgres_database = tracing::field::debug(&config.postgres_database), - "Connecting to database" - ); - - PgPoolOptions::new() - .max_connections(50) - .acquire_timeout(Duration::from_secs(3)) - .connect(&url) - .await - .expect("Could not connect to DATABASE_URL") -} diff --git a/service/src/common/mod.rs b/service/src/common/mod.rs deleted file mode 100644 index a7c9b560..00000000 --- a/service/src/common/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -pub mod address; -pub mod database; -pub mod indexer_management; diff --git a/service/src/config.rs b/service/src/config.rs index 017ad4a2..e4260d46 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -1,267 +1,245 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use clap::{command, Args, Parser, ValueEnum}; +use std::path::PathBuf; use alloy_primitives::Address; +use figment::{ + providers::{Format, Toml}, + Figment, +}; +use indexer_common::indexer_service::http::IndexerServiceConfig; use serde::{Deserialize, Serialize}; use thegraph::types::DeploymentId; -use crate::util::init_tracing; - -#[derive(Clone, Debug, Parser, Serialize, Deserialize, Default)] -#[clap( - name = "indexer-service", - about = "Indexer service on top of graph node", - author = "hopeyen" -)] -#[command(author, version, about, long_about = None, arg_required_else_help = true)] -pub struct Cli { - #[command(flatten)] - pub ethereum: Ethereum, - #[command(flatten)] - pub receipts: Receipts, - #[command(flatten)] - pub indexer_infrastructure: IndexerInfrastructure, - #[command(flatten)] - pub postgres: Postgres, - #[command(flatten)] - pub network_subgraph: NetworkSubgraph, - #[command(flatten)] - pub escrow_subgraph: EscrowSubgraph, - - #[arg( - short, - value_name = "config", - env = "CONFIG", - help = "Indexer service configuration file (YAML format)" - )] - config: Option, +#[derive(Clone, Debug, Deserialize)] +pub struct Config { + // pub ethereum: Ethereum, + // pub receipts: Receipts, + // pub indexer_infrastructure: IndexerInfrastructure, + // pub postgres: Postgres, + // pub network_subgraph: NetworkSubgraph, + // pub escrow_subgraph: EscrowSubgraph, + pub common: IndexerServiceConfig, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Ethereum { - #[clap( - long, - value_name = "ethereum-node-provider", - env = "ETH_NODE", - help = "Ethereum node or provider URL" - )] + // #[clap( + // long, + // value_name = "ethereum-node-provider", + // env = "ETH_NODE", + // help = "Ethereum node or provider URL" + // )] pub ethereum: String, - #[clap( - long, - value_name = "ethereum-polling-interval", - env = "ETHEREUM_POLLING_INTERVAL", - default_value_t = 4000, - help = "Polling interval for the Ethereum provider (ms)" - )] + // #[clap( + // long, + // value_name = "ethereum-polling-interval", + // env = "ETHEREUM_POLLING_INTERVAL", + // default_value_t = 4000, + // help = "Polling interval for the Ethereum provider (ms)" + // )] pub ethereum_polling_interval: usize, - #[clap( - long, - value_name = "mnemonic", - env = "MNEMONIC", - help = "Mnemonic for the operator wallet" - )] + // #[clap( + // long, + // value_name = "mnemonic", + // env = "MNEMONIC", + // help = "Mnemonic for the operator wallet" + // )] pub mnemonic: String, - #[clap( - long, - value_name = "indexer-address", - env = "INDEXER_ADDRESS", - help = "Ethereum address of the indexer" - )] + // #[clap( + // long, + // value_name = "indexer-address", + // env = "INDEXER_ADDRESS", + // help = "Ethereum address of the indexer" + // )] pub indexer_address: Address, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Receipts { - #[clap( - long, - value_name = "receipts-verifier-chain-id", - env = "RECEIPTS_VERIFIER_CHAIN_ID", - help = "Scalar TAP verifier chain ID" - )] + // #[clap( + // long, + // value_name = "receipts-verifier-chain-id", + // env = "RECEIPTS_VERIFIER_CHAIN_ID", + // help = "Scalar TAP verifier chain ID" + // )] pub receipts_verifier_chain_id: u64, - #[clap( - long, - value_name = "receipts-verifier-address", - env = "RECEIPTS_VERIFIER_ADDRESS", - help = "Scalar TAP verifier contract address" - )] + // #[clap( + // long, + // value_name = "receipts-verifier-address", + // env = "RECEIPTS_VERIFIER_ADDRESS", + // help = "Scalar TAP verifier contract address" + // )] pub receipts_verifier_address: Address, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct IndexerInfrastructure { - #[clap( - long, - value_name = "port", - env = "PORT", - default_value_t = 7600, - help = "Port to serve queries at" - )] + // #[clap( + // long, + // value_name = "port", + // env = "PORT", + // default_value_t = 7600, + // help = "Port to serve queries at" + // )] pub port: u32, - #[clap( - long, - value_name = "metrics-port", - env = "METRICS_PORT", - default_value_t = 7300, - help = "Port to serve Prometheus metrics at" - )] + // #[clap( + // long, + // value_name = "metrics-port", + // env = "METRICS_PORT", + // default_value_t = 7300, + // help = "Port to serve Prometheus metrics at" + // )] pub metrics_port: u16, - #[clap( - long, - value_name = "graph-node-query-endpoint", - env = "GRAPH_NODE_QUERY_ENDPOINT", - default_value_t = String::from("http://0.0.0.0:8000"), - help = "Graph node GraphQL HTTP service endpoint" - )] + // #[clap( + // long, + // value_name = "graph-node-query-endpoint", + // env = "GRAPH_NODE_QUERY_ENDPOINT", + // default_value_t = String::from("http://0.0.0.0:8000"), + // help = "Graph node GraphQL HTTP service endpoint" + // )] pub graph_node_query_endpoint: String, - #[clap( - long, - value_name = "graph-node-status-endpoint", - env = "GRAPH_NODE_STATUS_ENDPOINT", - default_value_t = String::from("http://0.0.0.0:8030"), - help = "Graph node endpoint for the index node server" - )] + // #[clap( + // long, + // value_name = "graph-node-status-endpoint", + // env = "GRAPH_NODE_STATUS_ENDPOINT", + // default_value_t = String::from("http://0.0.0.0:8030"), + // help = "Graph node endpoint for the index node server" + // )] pub graph_node_status_endpoint: String, - #[clap( - long, - value_name = "log-level", - env = "LOG_LEVEL", - value_enum, - help = "Log level in RUST_LOG format" - )] + // #[clap( + // long, + // value_name = "log-level", + // env = "LOG_LEVEL", + // value_enum, + // help = "Log level in RUST_LOG format" + // )] pub log_level: Option, - #[clap( - long, - value_name = "gcloud-profiling", - env = "GCLOUD_PROFILING", - default_value_t = false, - help = "Whether to enable Google Cloud profiling" - )] + // #[clap( + // long, + // value_name = "gcloud-profiling", + // env = "GCLOUD_PROFILING", + // default_value_t = false, + // help = "Whether to enable Google Cloud profiling" + // )] pub gcloud_profiling: bool, - #[clap( - long, - value_name = "free-query-auth-token", - env = "FREE_QUERY_AUTH_TOKEN", - help = "Auth token that clients can use to query for free" - )] + // #[clap( + // long, + // value_name = "free-query-auth-token", + // env = "FREE_QUERY_AUTH_TOKEN", + // help = "Auth token that clients can use to query for free" + // )] pub free_query_auth_token: Option, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Postgres { - #[clap( - long, - value_name = "postgres-host", - env = "POSTGRES_HOST", - default_value_t = String::from("http://0.0.0.0/"), - help = "Postgres host" - )] + // #[clap( + // long, + // value_name = "postgres-host", + // env = "POSTGRES_HOST", + // default_value_t = String::from("http://0.0.0.0/"), + // help = "Postgres host" + // )] pub postgres_host: String, - #[clap( - long, - value_name = "postgres-port", - env = "POSTGRES_PORT", - default_value_t = 5432, - help = "Postgres port" - )] + // #[clap( + // long, + // value_name = "postgres-port", + // env = "POSTGRES_PORT", + // default_value_t = 5432, + // help = "Postgres port" + // )] pub postgres_port: usize, - #[clap( - long, - value_name = "postgres-database", - env = "POSTGRES_DATABASE", - help = "Postgres database name" - )] + // #[clap( + // long, + // value_name = "postgres-database", + // env = "POSTGRES_DATABASE", + // help = "Postgres database name" + // )] pub postgres_database: String, - #[clap( - long, - value_name = "postgres-username", - env = "POSTGRES_USERNAME", - default_value_t = String::from("postgres"), - help = "Postgres username" - )] + // #[clap( + // long, + // value_name = "postgres-username", + // env = "POSTGRES_USERNAME", + // default_value_t = String::from("postgres"), + // help = "Postgres username" + // )] pub postgres_username: String, - #[clap( - long, - value_name = "postgres-password", - env = "POSTGRES_PASSWORD", - default_value_t = String::from(""), - help = "Postgres password" - )] + // #[clap( + // long, + // value_name = "postgres-password", + // env = "POSTGRES_PASSWORD", + // default_value_t = String::from(""), + // help = "Postgres password" + // )] pub postgres_password: String, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NetworkSubgraph { - #[clap( - long, - value_name = "network-subgraph-deployment", - env = "NETWORK_SUBGRAPH_DEPLOYMENT", - help = "Network subgraph deployment" - )] + // #[clap( + // long, + // value_name = "network-subgraph-deployment", + // env = "NETWORK_SUBGRAPH_DEPLOYMENT", + // help = "Network subgraph deployment" + // )] pub network_subgraph_deployment: Option, - #[clap( - long, - value_name = "network-subgraph-endpoint", - env = "NETWORK_SUBGRAPH_ENDPOINT", - default_value_t = String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"), - help = "Endpoint to query the network subgraph from" - )] + // #[clap( + // long, + // value_name = "network-subgraph-endpoint", + // env = "NETWORK_SUBGRAPH_ENDPOINT", + // default_value_t = String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"), + // help = "Endpoint to query the network subgraph from" + // )] pub network_subgraph_endpoint: String, - #[clap( - long, - value_name = "network-subgraph-auth-token", - env = "NETWORK_SUBGRAPH_AUTH_TOKEN", - help = "Bearer token to require for /network queries" - )] + // #[clap( + // long, + // value_name = "network-subgraph-auth-token", + // env = "NETWORK_SUBGRAPH_AUTH_TOKEN", + // help = "Bearer token to require for /network queries" + // )] pub network_subgraph_auth_token: Option, - #[clap( - long, - value_name = "serve-network-subgraph", - env = "SERVE_NETWORK_SUBGRAPH", - default_value_t = false, - help = "Whether to serve the network subgraph at /network" - )] + // #[clap( + // long, + // value_name = "serve-network-subgraph", + // env = "SERVE_NETWORK_SUBGRAPH", + // default_value_t = false, + // help = "Whether to serve the network subgraph at /network" + // )] pub serve_network_subgraph: bool, - #[clap( - long, - value_name = "allocation-syncing-interval", - env = "ALLOCATION_SYNCING_INTERVAL", - default_value_t = 120_000, - help = "Interval (in ms) for syncing indexer allocations from the network" - )] + // #[clap( + // long, + // value_name = "allocation-syncing-interval", + // env = "ALLOCATION_SYNCING_INTERVAL", + // default_value_t = 120_000, + // help = "Interval (in ms) for syncing indexer allocations from the network" + // )] pub allocation_syncing_interval: u64, - #[clap( - long, - value_name = "client-signer-address", - env = "CLIENT_SIGNER_ADDRESS", - help = "Address that signs query fee receipts from a known client" - )] + // #[clap( + // long, + // value_name = "client-signer-address", + // env = "CLIENT_SIGNER_ADDRESS", + // help = "Address that signs query fee receipts from a known client" + // )] pub client_signer_address: Option, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct EscrowSubgraph { - #[clap( - long, - value_name = "escrow-subgraph-deployment", - env = "ESCROW_SUBGRAPH_DEPLOYMENT", - help = "Escrow subgraph deployment" - )] + // #[clap( + // long, + // value_name = "escrow-subgraph-deployment", + // env = "ESCROW_SUBGRAPH_DEPLOYMENT", + // help = "Escrow subgraph deployment" + // )] pub escrow_subgraph_deployment: Option, - #[clap( - long, - value_name = "escrow-subgraph-endpoint", - env = "ESCROW_SUBGRAPH_ENDPOINT", - help = "Endpoint to query the network subgraph from" - )] + // #[clap( + // long, + // value_name = "escrow-subgraph-endpoint", + // env = "ESCROW_SUBGRAPH_ENDPOINT", + // help = "Endpoint to query the network subgraph from" + // )] pub escrow_subgraph_endpoint: String, // #[clap( // long, @@ -288,39 +266,8 @@ pub struct EscrowSubgraph { pub escrow_syncing_interval: u64, } -impl Cli { - /// Parse config arguments - /// If environmental variable for config is set to a valid config file path, then parse from config - /// Otherwise parse from command line arguments - pub fn args() -> Self { - let cli = if let Ok(file_path) = std::env::var("config") { - confy::load_path::(file_path.clone()) - .unwrap_or_else(|_| panic!("Parse config file at {}", file_path.clone())) - } else { - Cli::parse() - // Potentially store it for the user - // let _ = confy::store_path("./args.toml", cli.clone()); - }; - - // Enables tracing under RUST_LOG variable - if let Some(log_setting) = &cli.indexer_infrastructure.log_level { - std::env::set_var("RUST_LOG", log_setting); - }; - // add a LogFormat to config - init_tracing("pretty".to_string()).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level`"); - cli +impl Config { + pub fn load(filename: &PathBuf) -> Result { + Figment::new().merge(Toml::file(filename)).extract() } } - -#[derive( - Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Serialize, Deserialize, Default, -)] -pub enum LogLevel { - Trace, - #[default] - Debug, - Info, - Warn, - Error, - Fatal, -} diff --git a/service/src/common/indexer_management/mod.rs b/service/src/database.rs similarity index 97% rename from service/src/common/indexer_management/mod.rs rename to service/src/database.rs index f32002b4..72858f6a 100644 --- a/service/src/common/indexer_management/mod.rs +++ b/service/src/database.rs @@ -1,12 +1,25 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::time::Duration; use std::{collections::HashSet, str::FromStr}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use sqlx::PgPool; +use sqlx::{postgres::PgPoolOptions, PgPool}; use thegraph::types::{DeploymentId, DeploymentIdError}; +use tracing::debug; + +pub async fn connect(url: &str) -> PgPool { + debug!("Connecting to database"); + + PgPoolOptions::new() + .max_connections(50) + .acquire_timeout(Duration::from_secs(3)) + .connect(url) + .await + .expect("Should be able to connect to the database") +} /// Internal cost model representation as stored in the database. /// diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs deleted file mode 100644 index b1bafb2d..00000000 --- a/service/src/graph_node.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::anyhow; -use reqwest::{header, Client, Url}; -use std::sync::Arc; -use thegraph::types::DeploymentId; - -use crate::query_processor::{QueryError, UnattestedQueryResult}; - -/// Graph node query wrapper. -/// -/// This is Arc internally, so it can be cloned and shared between threads. -#[derive(Debug, Clone)] -pub struct GraphNodeInstance { - client: Client, // it is Arc - subgraphs_base_url: Arc, -} - -impl GraphNodeInstance { - pub fn new(endpoint: &str) -> GraphNodeInstance { - let subgraphs_base_url = Url::parse(endpoint) - .and_then(|u| u.join("/subgraphs/id/")) - .expect("Could not parse graph node endpoint"); - let client = reqwest::Client::builder() - .user_agent("indexer-service") - .build() - .expect("Could not build a client to graph node query endpoint"); - GraphNodeInstance { - client, - subgraphs_base_url: Arc::new(subgraphs_base_url), - } - } - - pub async fn subgraph_query_raw( - &self, - subgraph_id: &DeploymentId, - data: String, - ) -> Result { - let request = self - .client - .post( - self.subgraphs_base_url - .join(&subgraph_id.to_string()) - .map_err(|e| { - QueryError::Other(anyhow!( - "Could not build subgraph query URL: {}", - e.to_string() - )) - })?, - ) - .body(data) - .header(header::CONTENT_TYPE, "application/json"); - - let response = request.send().await?; - let attestable = response - .headers() - .get("graph-attestable") - .map_or(false, |v| v == "true"); - - Ok(UnattestedQueryResult { - graphql_response: response.text().await?, - attestable, - }) - } -} - -#[cfg(test)] -mod test { - use std::str::FromStr; - - use lazy_static::lazy_static; - use serde_json::json; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use super::*; - - lazy_static! { - static ref NETWORK_SUBGRAPH_ID: DeploymentId = - DeploymentId::from_str("QmV614UpBCpuusv5MsismmPYu4KqLtdeNMKpiNrX56kw6u").unwrap(); - } - - async fn mock_graph_node_server() -> MockServer { - let mock_server = MockServer::start().await; - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + &NETWORK_SUBGRAPH_ID.to_string(), - )) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 960 - } - } - } - "#, - "application/json", - )); - mock_server.register(mock).await; - - mock_server - } - - async fn local_graph_node() -> GraphNodeInstance { - let graph_node_endpoint = std::env::var("GRAPH_NODE_ENDPOINT") - .expect("GRAPH_NODE_ENDPOINT env variable is not set"); - - GraphNodeInstance::new(&graph_node_endpoint) - } - - /// Also tests against the network subgraph, but using the `subgraph_query_raw` method - #[tokio::test] - #[ignore] // Run only if explicitly specified - async fn test_subgraph_query_local() { - let network_subgraph_id = DeploymentId::from_str( - &std::env::var("NETWORK_SUBGRAPH_ID") - .expect("NETWORK_SUBGRAPH_ID env variable is not set"), - ) - .unwrap(); - - let graph_node = local_graph_node().await; - - let query = r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#; - - let query_json = json!({ - "query": query, - "variables": {} - }); - - let response = graph_node - .subgraph_query_raw(&network_subgraph_id, query_json.to_string()) - .await - .unwrap(); - - // Check that the response is valid JSON - let _json: serde_json::Value = serde_json::from_str(&response.graphql_response).unwrap(); - } - - /// Also tests against the network subgraph, but using the `subgraph_query_raw` method - #[tokio::test] - async fn test_subgraph_query() { - let mock_server = mock_graph_node_server().await; - - let graph_node = GraphNodeInstance::new(&mock_server.uri()); - - let query = r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#; - - let query_json = json!({ - "query": query, - "variables": {} - }); - - let response = graph_node - .subgraph_query_raw(&NETWORK_SUBGRAPH_ID, query_json.to_string()) - .await - .unwrap(); - - // Check that the response is valid JSON - let _json: serde_json::Value = serde_json::from_str(&response.graphql_response).unwrap(); - } -} diff --git a/service/src/main.rs b/service/src/main.rs index 064ea507..570dec96 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,198 +1,182 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy_sol_types::eip712_domain; -use axum::Server; -use dotenvy::dotenv; -use ethereum_types::U256; -use std::{net::SocketAddr, str::FromStr, time::Duration}; -use tracing::info; - -use indexer_common::{ - indexer_service::http::IndexerServiceRelease, - prelude::{ - attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, - DeploymentDetails, SubgraphClient, TapManager, - }, +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Error; +use axum::{ + async_trait, + response::{IntoResponse, Response}, + routing::post, + Json, Router, }; +use clap::Parser; +use indexer_common::indexer_service::http::{ + IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease, IsAttestable, +}; +use reqwest::StatusCode; +use serde::Serialize; +use serde_json::{json, Value}; +use sqlx::PgPool; +use thegraph::types::DeploymentId; +use thiserror::Error; +use tracing::error; + +mod cli; +mod config; +pub mod database; +mod routes; + +use cli::Cli; +use config::Config; + +#[derive(Debug, Error)] +pub enum SubgraphServiceError { + #[error("Invalid status query: {0}")] + InvalidStatusQuery(Error), + #[error("Unsupported status query fields: {0:?}")] + UnsupportedStatusQueryFields(Vec), + #[error("Internal server error: {0}")] + StatusQueryError(Error), +} -use util::shutdown_signal; +impl From<&SubgraphServiceError> for StatusCode { + fn from(err: &SubgraphServiceError) -> Self { + use SubgraphServiceError::*; + match err { + InvalidStatusQuery(_) => StatusCode::BAD_REQUEST, + UnsupportedStatusQueryFields(_) => StatusCode::BAD_REQUEST, + StatusQueryError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} -use crate::{ - common::database, config::Cli, metrics::handle_serve_metrics, query_processor::QueryProcessor, - server::create_server, util::public_key, -}; +// Tell axum how to convert `SubgraphServiceError` into a response. +impl IntoResponse for SubgraphServiceError { + fn into_response(self) -> Response { + (StatusCode::from(&self), self.to_string()).into_response() + } +} -use server::ServerOptions; +#[derive(Serialize)] +#[serde(transparent)] +struct SubgraphResponse { + inner: Value, + #[serde(skip)] + attestable: bool, +} -mod common; -mod config; -mod graph_node; -mod metrics; -mod query_processor; -mod server; -mod util; - -#[cfg(test)] -mod test_vectors; - -/// Create Indexer service App -/// -/// Initialization for server and Query processor -/// -/// Validate that graph-node instance is running for Query processor -/// Validate that server is running with a health check -/// -/// Parse Requests received -/// -/// Route the requests as a FreeQuery -/// -/// Return response from Query Processor +impl SubgraphResponse { + fn new(inner: Value, attestable: bool) -> Self { + Self { inner, attestable } + } +} + +impl IntoResponse for SubgraphResponse { + fn into_response(self) -> Response { + Json(self.inner).into_response() + } +} + +impl IsAttestable for SubgraphResponse { + fn is_attestable(&self) -> bool { + self.attestable + } +} + +pub struct SubgraphServiceState { + pub config: Config, + pub database: PgPool, + pub cost_schema: routes::cost::CostSchema, + pub graph_node_client: reqwest::Client, + pub graph_node_status_url: String, +} + +struct SubgraphService { + config: Config, +} + +impl SubgraphService { + fn new(config: Config) -> Self { + Self { config } + } +} + +#[async_trait] +impl IndexerServiceImpl for SubgraphService { + type Error = SubgraphServiceError; + type Request = serde_json::Value; + type Response = SubgraphResponse; + type State = SubgraphServiceState; + + async fn process_request( + &self, + _manifest_id: DeploymentId, + request: Self::Request, + ) -> Result<(Self::Request, Self::Response), Self::Error> { + Ok((request, SubgraphResponse::new(json!("hello"), false))) + } +} + +/// Run the subgraph indexer service #[tokio::main] -async fn main() -> Result<(), std::io::Error> { - dotenv().ok(); +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt::init(); + + // Parse command line and environment arguments + let cli = Cli::parse(); + + // Load the json-rpc service configuration, which is a combination of the + // general configuration options for any indexer service and specific + // options added for JSON-RPC + let config = match Config::load(&cli.config) { + Ok(config) => config, + Err(e) => { + error!( + "Invalid configuration file `{}`: {}", + cli.config.display(), + e + ); + std::process::exit(1); + } + }; // Parse basic configurations - let config = Cli::args(); build_info::build_info!(fn build_info); let release = IndexerServiceRelease::from(build_info()); - // Initialize graph-node client - let graph_node = graph_node::GraphNodeInstance::new( - &config.indexer_infrastructure.graph_node_query_endpoint, - ); - - let http_client = reqwest::Client::builder() - .tcp_nodelay(true) - .timeout(Duration::from_secs(30)) - .build() - .expect("Failed to init HTTP client"); - - // Make an instance of network subgraph at either - // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment - // or network_subgraph_endpoint - // - // We're leaking the network subgraph here to obtain a reference with - // a static lifetime, which avoids having to pass around and clone `Arc` - // objects everywhere. Since the network subgraph is read-only, this is - // no problem. - let network_subgraph = Box::leak(Box::new(SubgraphClient::new( - http_client.clone(), - config - .network_subgraph - .network_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_status_endpoint, - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect("Failed to parse graph node query endpoint and network subgraph deployment"), - DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) - .expect("Failed to parse network subgraph endpoint"), - ))); - - let indexer_allocations = indexer_allocations( - network_subgraph, - config.ethereum.indexer_address, - 1, - Duration::from_millis(config.network_subgraph.allocation_syncing_interval), - ); - - // TODO: Chain ID should be a config - let graph_network_id = 1; - - let dispute_manager = - dispute_manager(network_subgraph, graph_network_id, Duration::from_secs(60)); - - let attestation_signers = attestation_signers( - indexer_allocations.clone(), - config.ethereum.mnemonic.clone(), - U256::from(graph_network_id), - dispute_manager, - ); - - // Establish Database connection necessary for serving indexer management - // requests with defined schema - // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models - // which defaults to files in "./migrations" to sync the database; - // however, this can cause conflicts with the migrations run by indexer - // agent. Hence we leave syncing and migrating entirely to the agent and - // assume the models are up to date in the service. - let indexer_management_db = database::connect(&config.postgres).await; - - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - http_client, - config - .escrow_subgraph - .escrow_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_status_endpoint, - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), - DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) - .expect("Failed to parse escrow subgraph endpoint"), - ))); - - let escrow_accounts = escrow_accounts( - escrow_subgraph, - config.ethereum.indexer_address, - Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval), - ); - - let tap_manager = TapManager::new( - indexer_management_db.clone(), - indexer_allocations, - escrow_accounts, - eip712_domain! { - name: "Scalar TAP", - version: "1", - chain_id: config.receipts.receipts_verifier_chain_id, - verifying_contract: config.receipts.receipts_verifier_address, - }, - ); - // Proper initiation of server, query processor - // server health check, graph-node instance connection check - let query_processor = - QueryProcessor::new(graph_node.clone(), attestation_signers.clone(), tap_manager); - - // Start indexer service basic metrics - tokio::spawn(handle_serve_metrics( - String::from("0.0.0.0"), - config.indexer_infrastructure.metrics_port, - )); - - let service_options = ServerOptions::new( - Some(config.indexer_infrastructure.port), + // Some of the subgrpah service configuration goes into the so-called + // "state", which will be passed to any request handler, middleware etc. + // that is involved in serving requests + let state = Arc::new(SubgraphServiceState { + config: config.clone(), + database: database::connect(&config.common.database.postgres_url).await, + cost_schema: routes::cost::build_schema().await, + graph_node_client: reqwest::ClientBuilder::new() + .tcp_nodelay(true) + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to init HTTP client for Graph Node"), + graph_node_status_url: config + .common + .graph_node + .as_ref() + .expect("Config must have `common.graph_node.status_url` set") + .status_url + .clone(), + }); + + IndexerService::run(IndexerServiceOptions { release, - query_processor, - config.indexer_infrastructure.free_query_auth_token, - config.indexer_infrastructure.graph_node_status_endpoint, - indexer_management_db, - public_key(&config.ethereum.mnemonic).expect("Failed to initiate with operator wallet"), - network_subgraph, - config.network_subgraph.network_subgraph_auth_token, - config.network_subgraph.serve_network_subgraph, - ); - - info!("Initialized server options"); - let app = create_server(service_options).await; - - let addr = SocketAddr::from_str(&format!("0.0.0.0:{}", config.indexer_infrastructure.port)) - .expect("Start server port"); - info!("Initialized server app at {}", addr); - Server::bind(&addr) - .serve(app.into_make_service()) - .with_graceful_shutdown(shutdown_signal()) - .await - .unwrap(); - - Ok(()) + config: config.common.clone(), + url_namespace: "subgraphs", + metrics_prefix: "subgraph", + service_impl: SubgraphService::new(config), + extra_routes: Router::new() + .route("/cost", post(routes::cost::cost)) + .route("/status", post(routes::status)) + .with_state(state), + }) + .await } diff --git a/service/src/metrics/mod.rs b/service/src/metrics/mod.rs deleted file mode 100644 index dfc8d562..00000000 --- a/service/src/metrics/mod.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use autometrics::{encode_global_metrics, global_metrics_exporter}; -use axum::http::StatusCode; -use axum::routing::get; -use axum::Router; -use lazy_static::lazy_static; -use prometheus::{register_histogram_vec, register_int_counter_vec, HistogramVec, IntCounterVec}; -use std::{net::SocketAddr, str::FromStr}; -use tracing::info; - -// Record Queries related metrics -lazy_static! { - pub static ref QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_total", - "Incoming queries", - &["deployment"], - ) - .expect("Failed to create queries counters"); - pub static ref SUCCESSFUL_QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_ok", - "Successfully executed queries", - &["deployment"], - ) - .expect("Failed to create successfulQueries counters"); - pub static ref FAILED_QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_failed", - "Queries that failed to execute", - &["deployment"], - ) - .expect("Failed to create failedQueries counters"); - pub static ref QUERIES_WITH_INVALID_RECEIPT_HEADER: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_with_invalid_receipt_header", - "Queries that failed executing because they came with an invalid receipt header", - &["deployment"], - ) - .expect("Failed to create queriesWithInvalidReceiptHeader counters"); - pub static ref QUERIES_WITHOUT_RECEIPT: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_without_receipt", - "Queries that failed executing because they came without a receipt", - &["deployment"], - ) - .expect("Failed to create queriesWithoutReceipt counters"); - pub static ref QUERY_DURATION: HistogramVec = register_histogram_vec!( - "indexer_service_query_duration", - "Duration of processing a query from start to end", - &["deployment"], - ) - .unwrap(); -} - -/// This handler serializes the metrics into a string for Prometheus to scrape -pub async fn get_metrics() -> (StatusCode, String) { - match encode_global_metrics() { - Ok(metrics) => (StatusCode::OK, metrics), - Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{err:?}")), - } -} - -/// Metrics server router -pub async fn handle_serve_metrics(host: String, port: u16) { - // Set up the exporter to collect metrics - let _exporter = global_metrics_exporter(); - - let app = Router::new().route("/metrics", get(get_metrics)); - let addr = - SocketAddr::from_str(&format!("{}:{}", host, port)).expect("Start Prometheus metrics"); - let server = axum::Server::bind(&addr); - info!( - address = addr.to_string(), - "Prometheus Metrics port exposed" - ); - - server - .serve(app.into_make_service()) - .await - .expect("Error starting Prometheus metrics port"); -} diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs deleted file mode 100644 index e494ff52..00000000 --- a/service/src/query_processor.rs +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::HashMap; - -use alloy_primitives::Address; -use eventuals::Eventual; -use indexer_common::tap_manager::TapManager; -use log::error; -use serde::{Deserialize, Serialize}; -use tap_core::tap_manager::SignedReceipt; -use thegraph::types::{attestation::Attestation, DeploymentId}; - -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause, IndexerErrorCode}; -use indexer_common::prelude::AttestationSigner; - -use crate::graph_node::GraphNodeInstance; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct QueryResult { - #[serde(rename = "graphQLResponse")] - pub graphql_response: String, - pub attestation: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UnattestedQueryResult { - #[serde(rename = "graphQLResponse")] - pub graphql_response: String, - pub attestable: bool, -} - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct Response { - pub result: T, - pub status: i64, -} - -/// Free query do not need signature, receipt, signers -/// Also ignore metrics for now -/// Later add along with PaidQuery -#[derive(Debug)] -pub struct FreeQuery { - pub subgraph_deployment_id: DeploymentId, - pub query: String, -} - -/// Paid query needs subgraph_deployment_id, query, receipt -pub struct PaidQuery { - pub subgraph_deployment_id: DeploymentId, - pub query: String, - pub receipt: String, -} - -#[derive(Debug, thiserror::Error)] -pub enum QueryError { - #[error(transparent)] - Transport(#[from] reqwest::Error), - #[error("The subgraph is in a failed state")] - IndexingError, - #[error("Bad or invalid entity data found in the subgraph: {}", .0.to_string())] - BadData(anyhow::Error), - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} - -#[derive(Clone)] -pub struct QueryProcessor { - graph_node: GraphNodeInstance, - attestation_signers: Eventual>, - tap_manager: TapManager, -} - -impl QueryProcessor { - pub fn new( - graph_node: GraphNodeInstance, - attestation_signers: Eventual>, - tap_manager: TapManager, - ) -> QueryProcessor { - QueryProcessor { - graph_node, - attestation_signers, - tap_manager, - } - } - - pub async fn execute_free_query( - &self, - query: FreeQuery, - ) -> Result, QueryError> { - let response = self - .graph_node - .subgraph_query_raw(&query.subgraph_deployment_id, query.query) - .await?; - - Ok(Response { - result: response, - status: 200, - }) - } - - pub async fn execute_paid_query( - &self, - query: PaidQuery, - ) -> Result, QueryError> { - let PaidQuery { - subgraph_deployment_id, - query, - receipt, - } = query; - - let parsed_receipt: SignedReceipt = match serde_json::from_str(&receipt) - .map_err(|e| QueryError::Other(anyhow::Error::from(e))) - { - Ok(r) => r, - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE031, - Some(IndexerErrorCause::new( - "Failed to parse receipt for a paid query", - )), - ); - - return Err(e); - } - }; - - let allocation_id = parsed_receipt.message.allocation_id; - - self.tap_manager - .verify_and_store_receipt(parsed_receipt) - .await - .map_err(|e| { - IndexerError::new( - IndexerErrorCode::IE053, - Some(IndexerErrorCause::new( - "Failed to verify and store a parsed receipt", - )), - ); - - QueryError::Other(e) - })?; - - let signers = self - .attestation_signers - .value_immediate() - .ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?; - let signer = signers.get(&allocation_id).ok_or_else(|| { - let err_msg = format!("No signer found for allocation id {}", allocation_id); - IndexerError::new( - IndexerErrorCode::IE022, - Some(IndexerErrorCause::new(err_msg.clone())), - ); - - QueryError::Other(anyhow::anyhow!(err_msg)) - })?; - - let response = self - .graph_node - .subgraph_query_raw(&subgraph_deployment_id, query.clone()) - .await?; - - let attestation = response - .attestable - .then(|| Self::create_attestation(signer, &query, &response)); - - Ok(Response { - result: QueryResult { - graphql_response: response.graphql_response, - attestation, - }, - status: 200, - }) - } - - fn create_attestation( - signer: &AttestationSigner, - query: &str, - response: &UnattestedQueryResult, - ) -> Attestation { - signer.create_attestation(query, &response.graphql_response) - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use alloy_primitives::Address; - use ethers_core::types::U256; - use indexer_common::prelude::{ - Allocation, AllocationStatus, AttestationSigner, SubgraphDeployment, - }; - use lazy_static::lazy_static; - - use super::*; - - const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; - const INDEXER_ADDRESS: &str = "0x1234567890123456789012345678901234567890"; - - lazy_static! { - static ref DEPLOYMENT_ID: DeploymentId = DeploymentId( - "0xc064c354bc21dd958b1d41b67b8ef161b75d2246b425f68ed4c74964ae705cbd" - .parse() - .unwrap(), - ); - } - - #[test] - fn paid_query_attestation() { - let subgraph_deployment = SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - }; - - let indexer = Address::from_str(INDEXER_ADDRESS).unwrap(); - let allocation = &Allocation { - id: Address::from_str("0x4CAF2827961262ADEF3D0Ad15C341e40c21389a4").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment, - indexer, - allocated_tokens: U256::from(100), - created_at_epoch: 940, - created_at_block_hash: String::from(""), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - - let attestation_signer = AttestationSigner::new( - INDEXER_OPERATOR_MNEMONIC, - allocation, - U256::from(1), - Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), - ) - .unwrap(); - - let request = "test input"; - let response = "test output"; - let attestation = QueryProcessor::create_attestation( - &attestation_signer, - request, - &UnattestedQueryResult { - graphql_response: response.to_owned(), - attestable: true, - }, - ); - - attestation_signer - .verify(&attestation, request, response, &allocation.id) - .unwrap(); - } -} diff --git a/service/src/server/routes/cost.rs b/service/src/routes/cost.rs similarity index 64% rename from service/src/server/routes/cost.rs rename to service/src/routes/cost.rs index c1c30c89..c005dd2f 100644 --- a/service/src/server/routes/cost.rs +++ b/service/src/routes/cost.rs @@ -1,19 +1,15 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - use std::str::FromStr; +use std::sync::Arc; use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; -use axum::extract::Extension; +use axum::extract::State; use serde::{Deserialize, Serialize}; use serde_json::Value; use thegraph::types::DeploymentId; -use crate::{ - common::indexer_management::{self, CostModel}, - server::ServerOptions, -}; +use crate::database::{self, CostModel}; +use crate::SubgraphServiceState; #[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)] pub struct GraphQlCostModel { @@ -32,13 +28,11 @@ impl From for GraphQlCostModel { } } -pub type CostSchema = Schema; - #[derive(Default)] -pub struct QueryRoot; +pub struct Query; #[Object] -impl QueryRoot { +impl Query { async fn cost_models( &self, ctx: &Context<'_>, @@ -48,8 +42,8 @@ impl QueryRoot { .into_iter() .map(|s| DeploymentId::from_str(&s)) .collect::, _>>()?; - let pool = &ctx.data_unchecked::().indexer_management_db; - let cost_models = indexer_management::cost_models(pool, &deployment_ids).await?; + let pool = &ctx.data_unchecked::>().database; + let cost_models = database::cost_models(pool, &deployment_ids).await?; Ok(cost_models.into_iter().map(|m| m.into()).collect()) } @@ -59,20 +53,26 @@ impl QueryRoot { deployment: String, ) -> Result, anyhow::Error> { let deployment_id = DeploymentId::from_str(&deployment)?; - let pool = &ctx.data_unchecked::().indexer_management_db; - indexer_management::cost_model(pool, &deployment_id) + let pool = &ctx.data_unchecked::>().database; + database::cost_model(pool, &deployment_id) .await .map(|model_opt| model_opt.map(GraphQlCostModel::from)) } } -pub(crate) async fn graphql_handler( +pub type CostSchema = Schema; + +pub async fn build_schema() -> CostSchema { + Schema::build(Query, EmptyMutation, EmptySubscription).finish() +} + +pub async fn cost( + State(state): State>, req: GraphQLRequest, - Extension(schema): Extension, - Extension(server_options): Extension, ) -> GraphQLResponse { - schema - .execute(req.into_inner().data(server_options)) + state + .cost_schema + .execute(req.into_inner().data(state.clone())) .await .into() } diff --git a/service/src/routes/mod.rs b/service/src/routes/mod.rs new file mode 100644 index 00000000..4953d4c0 --- /dev/null +++ b/service/src/routes/mod.rs @@ -0,0 +1,4 @@ +pub mod cost; +mod status; + +pub use status::status; diff --git a/service/src/routes/status.rs b/service/src/routes/status.rs new file mode 100644 index 00000000..41b91ad2 --- /dev/null +++ b/service/src/routes/status.rs @@ -0,0 +1,114 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashSet; +use std::sync::Arc; + +use async_graphql_axum::GraphQLRequest; +use axum::{extract::State, response::IntoResponse, Json}; +use graphql::graphql_parser::query as q; +use graphql_http::{ + http::request::{IntoRequestParameters, RequestParameters}, + http_client::{ReqwestExt, ResponseError}, +}; +use serde_json::{json, Map, Value}; + +use crate::{SubgraphServiceError, SubgraphServiceState}; + +lazy_static::lazy_static! { + static ref SUPPORTED_ROOT_FIELDS: HashSet<&'static str> = + vec![ + "indexingStatuses", + "chains", + "latestBlock", + "earliestBlock", + "publicProofsOfIndexing", + "entityChangesInBlock", + "blockData", + "cachedEthereumCalls", + "subgraphFeatures", + "apiVersions", + ].into_iter().collect(); +} + +struct WrappedGraphQLRequest(async_graphql::Request); + +impl IntoRequestParameters for WrappedGraphQLRequest { + fn into_request_parameters(self) -> RequestParameters { + RequestParameters { + query: self.0.query.into(), + operation_name: self.0.operation_name, + variables: Map::from_iter(self.0.variables.iter().map(|(name, value)| { + ( + name.as_str().to_string(), + value.clone().into_json().unwrap(), + ) + })), + extensions: Map::from_iter(self.0.extensions.into_iter().map(|(name, value)| { + ( + name.as_str().to_string(), + value.clone().into_json().unwrap(), + ) + })), + } + } +} + +// Custom middleware function to process the request before reaching the main handler +pub async fn status( + State(state): State>, + request: GraphQLRequest, +) -> Result { + let request = request.into_inner(); + let query: q::Document = q::parse_query(request.query.as_str()) + .map_err(|e| SubgraphServiceError::InvalidStatusQuery(e.into()))?; + + let root_fields = query + .definitions + .iter() + // This gives us all root selection sets + .filter_map(|def| match def { + q::Definition::Operation(op) => match op { + q::OperationDefinition::Query(query) => Some(&query.selection_set), + q::OperationDefinition::SelectionSet(selection_set) => Some(selection_set), + _ => None, + }, + q::Definition::Fragment(fragment) => Some(&fragment.selection_set), + }) + // This gives us all field names of root selection sets (and potentially non-root fragments) + .flat_map(|selection_set| { + selection_set + .items + .iter() + .filter_map(|item| match item { + q::Selection::Field(field) => Some(&field.name), + _ => None, + }) + .collect::>() + }); + + let unsupported_root_fields: Vec<_> = root_fields + .filter(|field| !SUPPORTED_ROOT_FIELDS.contains(field.as_str())) + .map(ToString::to_string) + .collect(); + + if !unsupported_root_fields.is_empty() { + return Err(SubgraphServiceError::UnsupportedStatusQueryFields( + unsupported_root_fields, + )); + } + + let result = state + .graph_node_client + .post(&state.graph_node_status_url) + .send_graphql::(WrappedGraphQLRequest(request)) + .await + .map_err(|e| SubgraphServiceError::StatusQueryError(e.into()))?; + + result.map(Json).or_else(|e| match e { + ResponseError::Failure { errors } => Ok(Json(json!({ + "errors": errors, + }))), + ResponseError::Empty => todo!(), + }) +} diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs deleted file mode 100644 index 329f2cc3..00000000 --- a/service/src/server/mod.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -pub(crate) use axum::{ - error_handling::HandleErrorLayer, - handler::Handler, - http::{Method, StatusCode}, - routing::get, -}; - -use axum::{routing::post, Extension, Router}; -use sqlx::PgPool; -use std::time::Duration; -use tower::{BoxError, ServiceBuilder}; -use tower_http::{ - add_extension::AddExtensionLayer, - cors::CorsLayer, - trace::{self, TraceLayer}, -}; -use tracing::Level; - -use indexer_common::{indexer_service::http::IndexerServiceRelease, prelude::SubgraphClient}; - -use crate::{ - query_processor::QueryProcessor, - server::routes::{network_ratelimiter, slow_ratelimiter}, -}; - -pub mod routes; - -#[derive(Clone)] -pub struct ServerOptions { - pub port: Option, - pub release: IndexerServiceRelease, - pub query_processor: QueryProcessor, - pub free_query_auth_token: Option, - pub graph_node_status_endpoint: String, - pub indexer_management_db: PgPool, - pub operator_public_key: String, - pub network_subgraph: &'static SubgraphClient, - pub network_subgraph_auth_token: Option, - pub serve_network_subgraph: bool, -} - -impl ServerOptions { - #[allow(clippy::too_many_arguments)] - pub fn new( - port: Option, - release: IndexerServiceRelease, - query_processor: QueryProcessor, - free_query_auth_token: Option, - graph_node_status_endpoint: String, - indexer_management_db: PgPool, - operator_public_key: String, - network_subgraph: &'static SubgraphClient, - network_subgraph_auth_token: Option, - serve_network_subgraph: bool, - ) -> Self { - let free_query_auth_token = free_query_auth_token.map(|token| format!("Bearer {}", token)); - - ServerOptions { - port, - release, - query_processor, - free_query_auth_token, - graph_node_status_endpoint, - indexer_management_db, - operator_public_key, - network_subgraph, - network_subgraph_auth_token, - serve_network_subgraph, - } - } -} - -pub async fn create_server(options: ServerOptions) -> Router { - Router::new() - .route("/", get(routes::basic::index)) - .route("/health", get(routes::basic::health)) - .route("/version", get(routes::basic::version)) - .route( - "/status", - post(routes::status::status_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/health/:deployment", - get(routes::deployment::deployment_health - .layer(AddExtensionLayer::new(slow_ratelimiter()))), - ) - .route( - "/cost", - post(routes::cost::graphql_handler) - .get(routes::cost::graphql_handler) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .nest( - "/operator", - routes::basic::create_operator_server(options.clone()) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .route( - "/network", - post(routes::network::network_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/id/:id", - post(routes::subgraphs::subgraph_queries), - ) - .layer(Extension(options.clone())) - .layer(CorsLayer::new().allow_methods([Method::GET, Method::POST])) - .layer( - // Handle error for timeout, ratelimit, or a general internal server error - ServiceBuilder::new() - .layer(HandleErrorLayer::new(|error: BoxError| async move { - if error.is::() { - Ok(StatusCode::REQUEST_TIMEOUT) - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Unhandled internal error: {}", error), - )) - } - })) - .layer( - TraceLayer::new_for_http() - .make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG)) - .on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)), - ) - .timeout(Duration::from_secs(10)) - .into_inner(), - ) -} diff --git a/service/src/server/routes/basic.rs b/service/src/server/routes/basic.rs deleted file mode 100644 index 1c0f2f8a..00000000 --- a/service/src/server/routes/basic.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{extract::Extension, routing::get, Router}; -use axum::{http::StatusCode, response::IntoResponse, Json}; -use serde::Serialize; -use serde_json::json; - -use crate::server::ServerOptions; - -#[derive(Serialize)] -struct Health { - healthy: bool, -} - -/// Endpoint for server health -pub async fn health() -> impl IntoResponse { - let health = Health { healthy: true }; - (StatusCode::OK, Json(health)) -} - -/// Index endpoint for status checks -pub async fn index() -> impl IntoResponse { - let responder = "Ready to roll!".to_string(); - responder.into_response() -} - -/// Endpoint for package version -pub async fn version(server: axum::extract::Extension) -> impl IntoResponse { - let version = server.release.clone(); - (StatusCode::OK, Json(version)) -} - -// Define a handler function for the `/info` route -async fn operator_info(Extension(options): Extension) -> Json { - let public_key = &options.operator_public_key; - Json(json!({ "publicKey": public_key })) -} - -// Create a function to build the operator server router -pub fn create_operator_server(_options: ServerOptions) -> Router { - Router::new().route("/info", get(operator_info)) -} diff --git a/service/src/server/routes/deployment.rs b/service/src/server/routes/deployment.rs deleted file mode 100644 index 85519089..00000000 --- a/service/src/server/routes/deployment.rs +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; - -use serde::{Deserialize, Serialize}; -use serde_json::json; - -use crate::server::{routes::internal_server_error_response, ServerOptions}; -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause}; - -/// Parse an incoming query request and route queries with authenticated -/// free query token to graph node -/// Later add receipt manager functions for paid queries -pub async fn deployment_health( - Extension(server): Extension, - deployment: axum::extract::Path, -) -> impl IntoResponse { - // Create the GraphQL query - let query = status_query(deployment.to_string()); - - // Send the GraphQL request - let response = reqwest::Client::new() - .post(server.graph_node_status_endpoint) - .header("Content-Type", "application/json") - .json(&query) - .send() - .await; - - match response { - Ok(response) => { - if response.status().is_success() { - // Deserialize the JSON response - //TODO: match with error - let data: serde_json::Value = if let Ok(data) = response.json().await { - data - } else { - return internal_server_error_response("Invalid json response"); - }; - - // Process the response and return the appropriate HTTP status - let status = if let Some(status) = - data["data"]["indexingStatuses"].get(0).and_then(|s| { - let parse = serde_json::from_value::(s.clone()); - parse.ok() - }) { - status - } else { - return internal_server_error_response("Missing indexing status"); - }; - - // Build health response based on the returned status - if status.health == SubgraphHealth::failed { - return internal_server_error_response("Subgraph deployment has failed"); - } - - if let Ok((latest, head)) = block_numbers(status) { - if latest > head - 5 { - (StatusCode::OK, Json("Subgraph deployment is up to date")).into_response() - } else { - internal_server_error_response("Subgraph deployment is lagging behind") - } - } else { - internal_server_error_response( - "Invalid indexing status (missing block numbers)", - ) - } - } else { - internal_server_error_response("Unknown error") - } - } - Err(e) => internal_server_error_response(&e.to_string()), - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct IndexingStatus { - health: SubgraphHealth, - chains: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[allow(non_camel_case_types)] // Need exact field names to match with GQL response -enum SubgraphHealth { - healthy, - unhealthy, - failed, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct ChainStatus { - network: String, - latest_block: Block, - chain_head_block: Block, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct Block { - number: String, - hash: String, -} - -fn status_query(deployment: String) -> serde_json::Value { - json!({ - "query": r#"query indexingStatus($subgraphs: [String!]!) { - indexingStatuses(subgraphs: $subgraphs) { - subgraph - health - chains { - network - ... on EthereumIndexingStatus { - latestBlock { number hash } - chainHeadBlock { number hash } - } - } - } - }"#, - "variables": { - "subgraphs": [deployment], - }, - }) -} - -fn block_numbers(status: IndexingStatus) -> Result<(u64, u64), IndexerError> { - let latest_block_number = status - .chains - .get(0) - .map(|chain| chain.latest_block.number.clone()) - .map(|number| number.parse::()); - - let head_block_number = status - .chains - .get(0) - .map(|chain| chain.chain_head_block.number.clone()) - .map(|number| number.parse::()); - - if let (Some(Ok(latest)), Some(Ok(head))) = (latest_block_number, head_block_number) { - Ok((latest, head)) - } else { - Err(IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Ill formatted block numbers from indexing status", - )), - )) - } -} diff --git a/service/src/server/routes/mod.rs b/service/src/server/routes/mod.rs deleted file mode 100644 index f0aae988..00000000 --- a/service/src/server/routes/mod.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - http::StatusCode, - response::{IntoResponse, Response}, - Json, -}; -use hyper::http::HeaderName; -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause}; -use tower::limit::RateLimitLayer; - -pub mod basic; -pub mod cost; -pub mod deployment; -pub mod network; -pub mod status; -pub mod subgraphs; - -/// Helper function to convert response body to query string -pub async fn response_body_to_query_string( - body: hyper::body::Body, -) -> Result { - let query_bytes = hyper::body::to_bytes(body).await.map_err(|e| { - IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE075, - Some(IndexerErrorCause::new(e)), - ) - })?; - let query_string = String::from_utf8(query_bytes.to_vec()).map_err(|e| { - IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE075, - Some(IndexerErrorCause::new(e)), - ) - })?; - Ok(query_string) -} - -/// Create response for a bad request -pub fn bad_request_response(error_body: &str) -> Response { - ( - StatusCode::BAD_REQUEST, - axum::response::AppendHeaders([(HeaderName::from_static("graph-attestable"), "false")]), - Json(error_body.to_string()), - ) - .into_response() -} - -/// Create response for an internal server error -pub fn internal_server_error_response(error_body: &str) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(error_body.to_string()), - ) - .into_response() -} - -/// Limit status requests to 9000/30min (5/s) -pub fn slow_ratelimiter() -> RateLimitLayer { - RateLimitLayer::new(9000, std::time::Duration::from_millis(30 * 60 * 1000)) -} - -/// Limit network requests to 90000/30min (50/s) -pub fn network_ratelimiter() -> RateLimitLayer { - RateLimitLayer::new(90000, std::time::Duration::from_millis(30 * 60 * 1000)) -} diff --git a/service/src/server/routes/network.rs b/service/src/server/routes/network.rs deleted file mode 100644 index 852a12ff..00000000 --- a/service/src/server/routes/network.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - extract::Extension, - http::{self, Request}, - response::IntoResponse, - Json, -}; -use serde_json::{json, Value}; - -use crate::server::ServerOptions; - -use super::bad_request_response; - -pub async fn network_queries( - Extension(server): Extension, - req: Request, - axum::extract::Json(body): axum::extract::Json, -) -> impl IntoResponse { - // Extract free query auth token - let auth_token = req - .headers() - .get(http::header::AUTHORIZATION) - .and_then(|t| t.to_str().ok()); - - // Serve only if enabled by indexer and request auth token matches - if !(server.serve_network_subgraph - && auth_token.is_some() - && server.network_subgraph_auth_token.is_some() - && auth_token.unwrap() == server.network_subgraph_auth_token.as_deref().unwrap()) - { - return bad_request_response("Not enabled or authorized query"); - } - - match server.network_subgraph.query::(&body).await { - Ok(result) => Json(json!({ - "data": result.data, - "errors": result.errors - .into_iter() - .map(|e| json!({ "message": e.message })) - .collect::>(), - })) - .into_response(), - Err(e) => bad_request_response(&e.to_string()), - } -} diff --git a/service/src/server/routes/status.rs b/service/src/server/routes/status.rs deleted file mode 100644 index 630411eb..00000000 --- a/service/src/server/routes/status.rs +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::HashSet; - -use axum::{ - http::{Request, StatusCode}, - response::IntoResponse, - Extension, Json, -}; - -use hyper::body::Bytes; - -use reqwest::{header, Client}; - -use crate::server::ServerOptions; -use indexer_common::{graphql::filter_supported_fields, indexer_errors::*}; - -use super::bad_request_response; - -lazy_static::lazy_static! { - static ref SUPPORTED_ROOT_FIELDS: HashSet<&'static str> = - vec![ - "indexingStatuses", - "chains", - "latestBlock", - "earliestBlock", - "publicProofsOfIndexing", - "entityChangesInBlock", - "blockData", - "cachedEthereumCalls", - "subgraphFeatures", - "apiVersions", - ].into_iter().collect(); -} - -// Custom middleware function to process the request before reaching the main handler -pub async fn status_queries( - Extension(server): Extension, - req: Request, -) -> impl IntoResponse { - let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); - // Read the requested query string - let query_string = match String::from_utf8(body_bytes.to_vec()) { - Ok(s) => s, - Err(e) => return bad_request_response(&e.to_string()), - }; - - // filter supported root fields - let query_string = match filter_supported_fields(&query_string, &SUPPORTED_ROOT_FIELDS) { - Ok(query) => query, - Err(unsupported_fields) => { - return ( - StatusCode::BAD_REQUEST, - format!("Cannot query field: {:#?}", unsupported_fields), - ) - .into_response(); - } - }; - - // Pass the modified operation to the actual endpoint - let request = Client::new() - .post(&server.graph_node_status_endpoint) - .body(Bytes::from(query_string)) - .header(header::CONTENT_TYPE, "application/json"); - - match request.send().await { - Ok(r) => match r.json::>().await { - Ok(r) => (StatusCode::OK, Json(r)).into_response(), - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Failed to parse the indexing status API response", - )), - ); - bad_request_response(&e.to_string()) - } - }, - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Failed to query indexing status API from the graph node status endpoint", - )), - ); - bad_request_response(&e.to_string()) - } - } -} diff --git a/service/src/server/routes/subgraphs.rs b/service/src/server/routes/subgraphs.rs deleted file mode 100644 index 2591e9aa..00000000 --- a/service/src/server/routes/subgraphs.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - extract::Extension, - http::{self, Request, StatusCode}, - response::IntoResponse, - Json, -}; -use std::str::FromStr; -use thegraph::types::DeploymentId; -use tracing::trace; - -use crate::{ - metrics, - query_processor::FreeQuery, - server::{ - routes::{bad_request_response, response_body_to_query_string}, - ServerOptions, - }, -}; -use indexer_common::indexer_errors::*; - -/// Parse an incoming query request and route queries with authenticated -/// free query token to graph node -/// Later add receipt manager functions for paid queries -pub async fn subgraph_queries( - Extension(server): Extension, - id: axum::extract::Path, - req: Request, -) -> impl IntoResponse { - let (parts, body) = req.into_parts(); - - // Initialize id into a subgraph deployment ID - let subgraph_deployment_id = match DeploymentId::from_str(id.as_str()) { - Ok(id) => id, - Err(e) => return bad_request_response(&e.to_string()), - }; - let deployment_label = subgraph_deployment_id.to_string(); - - let query_duration_timer = metrics::QUERY_DURATION - .with_label_values(&[&deployment_label]) - .start_timer(); - metrics::QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - // Extract scalar receipt from header and free query auth token for paid or free query - let receipt = if let Some(receipt) = parts.headers.get("scalar-receipt") { - match receipt.to_str() { - Ok(r) => Some(r), - Err(_) => { - query_duration_timer.observe_duration(); - metrics::QUERIES_WITH_INVALID_RECEIPT_HEADER - .with_label_values(&[&deployment_label]) - .inc(); - let err_msg = "Bad scalar receipt for subgraph query"; - IndexerError::new( - IndexerErrorCode::IE029, - Some(IndexerErrorCause::new(err_msg)), - ); - return bad_request_response(err_msg); - } - } - } else { - None - }; - trace!( - "receipt attached by the query, can pass it to TAP: {:?}", - receipt - ); - - // Extract free query auth token - let auth_token = parts - .headers - .get(http::header::AUTHORIZATION) - .and_then(|t| t.to_str().ok()); - // determine if the query is paid or authenticated to be free - let free = auth_token.is_some() - && server.free_query_auth_token.is_some() - && auth_token.unwrap() == server.free_query_auth_token.as_deref().unwrap(); - - let query_string = match response_body_to_query_string(body).await { - Ok(q) => q, - Err(e) => { - query_duration_timer.observe_duration(); - return bad_request_response(&e.to_string()); - } - }; - - if free { - let free_query = FreeQuery { - subgraph_deployment_id, - query: query_string, - }; - - match server.query_processor.execute_free_query(free_query).await { - Ok(res) if res.status == 200 => { - query_duration_timer.observe_duration(); - (StatusCode::OK, Json(res.result)).into_response() - } - _ => { - IndexerError::new( - IndexerErrorCode::IE033, - Some(IndexerErrorCause::new( - "Failed to execute a free subgraph query to graph node", - )), - ); - bad_request_response("Failed to execute free query") - } - } - } else if let Some(receipt) = receipt { - let paid_query = crate::query_processor::PaidQuery { - subgraph_deployment_id, - query: query_string, - receipt: receipt.to_string(), - }; - - match server.query_processor.execute_paid_query(paid_query).await { - Ok(res) => { - query_duration_timer.observe_duration(); - metrics::SUCCESSFUL_QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - (StatusCode::OK, Json(res.result)).into_response() - } - Err(e) => { - metrics::FAILED_QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - let err_msg = format!( - "Failed to execute a paid subgraph query to graph node: {}", - e - ); - IndexerError::new(IndexerErrorCode::IE032, Some(IndexerErrorCause::new(e))); - return bad_request_response(&err_msg); - } - } - } else { - let error_body = "Query request header missing scalar-receipts or incorrect auth token"; - metrics::QUERIES_WITHOUT_RECEIPT - .with_label_values(&[&deployment_label]) - .inc(); - IndexerError::new( - IndexerErrorCode::IE030, - Some(IndexerErrorCause::new(error_body)), - ); - query_duration_timer.observe_duration(); - bad_request_response(error_body) - } -} diff --git a/service/src/test_vectors.rs b/service/src/test_vectors.rs deleted file mode 100644 index b21c78e7..00000000 --- a/service/src/test_vectors.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::str::FromStr; - -use alloy_primitives::Address; -use lazy_static::lazy_static; - -lazy_static! { - pub static ref INDEXER_ADDRESS: Address = - Address::from_str("0x1234567890123456789012345678901234567890").unwrap(); -} diff --git a/service/src/util.rs b/service/src/util.rs deleted file mode 100644 index 0c64abc6..00000000 --- a/service/src/util.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use ethers::signers::WalletError; -use tokio::signal; -use tracing::{ - info, - subscriber::{set_global_default, SetGlobalDefaultError}, -}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; - -use crate::common::address::{build_wallet, wallet_address}; - -/// Validate that private key as an Eth wallet -pub fn public_key(value: &str) -> Result { - // The wallet can be stored instead of the original private key - let wallet = build_wallet(value)?; - let addr = wallet_address(&wallet); - info!(address = addr, "Resolved Graphcast id"); - Ok(addr) -} - -/// Sets up tracing, allows log level to be set from the environment variables -pub fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { - let filter = EnvFilter::from_default_env(); - - let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< - tracing_subscriber::fmt::format::DefaultFields, - tracing_subscriber::fmt::format::Format, - EnvFilter, - > = FmtSubscriber::builder().with_env_filter(filter); - - match format.as_str() { - "json" => set_global_default(subscriber_builder.json().finish()), - "full" => set_global_default(subscriber_builder.finish()), - "compact" => set_global_default(subscriber_builder.compact().finish()), - _ => set_global_default(subscriber_builder.with_ansi(true).pretty().finish()), - } -} - -pub async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - } - - info!("signal received, starting graceful shutdown"); -}