From ba258910a623467009efd140bce579d324610501 Mon Sep 17 00:00:00 2001 From: sugargoat Date: Fri, 7 Jun 2024 02:39:19 -0700 Subject: [PATCH] Introduce webhook callback in account sync (#998) * Introduce webhook callback in account sync Signed-off-by: sugargoat * Plumbing to introduce webhook thread Signed-off-by: sugargoat * Webhook reqwest functioning Signed-off-by: sugargoat * Clean up Signed-off-by: sugargoat * Updating to use URL + documentation Signed-off-by: sugargoat * Smaller cargo.lock footprint Signed-off-by: sugargoat * Fix linter - sort dependencies Signed-off-by: sugargoat * Plumbing to introduce webhook thread Signed-off-by: sugargoat * Clean up logging Signed-off-by: sugargoat * Rebased on main, some more logging fixups Signed-off-by: sugargoat * Webhook now receives post of list of accounts, and only when they are fully synced Signed-off-by: sugargoat * Cleaning up logging and fixing up documentation Signed-off-by: sugargoat * Move WebhookThread to its own file Signed-off-by: sugargoat * Set poll_interval to match config for full-service polling Signed-off-by: sugargoat * Refactor test_utils to be a little less invasive Signed-off-by: sugargoat * Clean up test documentation Signed-off-by: sugargoat * minor fixups and remove webhook restart notification --------- Signed-off-by: sugargoat Co-authored-by: Henry Holtzman --- Cargo.lock | 644 +++++++++++++++++- full-service/Cargo.toml | 1 + full-service/src/bin/main.rs | 21 +- full-service/src/config.rs | 35 + full-service/src/db/account.rs | 3 +- full-service/src/db/transaction_log.rs | 55 +- .../src/json_rpc/v1/api/test_utils.rs | 1 + .../src/json_rpc/v2/api/test_utils.rs | 33 +- full-service/src/json_rpc/v2/e2e_tests/mod.rs | 1 + .../src/json_rpc/v2/e2e_tests/webhook.rs | 146 ++++ full-service/src/service/account.rs | 8 +- full-service/src/service/address.rs | 8 +- full-service/src/service/balance.rs | 2 +- full-service/src/service/gift_code.rs | 4 +- full-service/src/service/mod.rs | 1 + .../src/service/models/tx_proposal.rs | 2 +- full-service/src/service/receipt.rs | 8 +- full-service/src/service/sync.rs | 63 +- full-service/src/service/transaction.rs | 14 +- .../src/service/transaction_builder.rs | 79 ++- full-service/src/service/transaction_log.rs | 2 +- full-service/src/service/txo.rs | 2 +- full-service/src/service/wallet_service.rs | 46 +- full-service/src/service/webhook.rs | 136 ++++ full-service/src/test_utils.rs | 8 +- 25 files changed, 1236 insertions(+), 87 deletions(-) create mode 100644 full-service/src/json_rpc/v2/e2e_tests/webhook.rs create mode 100644 full-service/src/service/webhook.rs diff --git a/Cargo.lock b/Cargo.lock index e28a34947..9768702bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,58 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "ascii-canvas" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8824ecca2e851cec16968d54a01dd372ef8f95b244fb84b84e70128be347c3c6" +dependencies = [ + "term", +] + +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.6" @@ -176,6 +228,165 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-executor" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8828ec6e544c02b0d6691d21ed9f9218d0384a82542855073c2a3f58304aaf0" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.3.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io 2.3.3", + "async-lock 3.4.0", + "blocking", + "futures-lite 2.3.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" +dependencies = [ + "async-lock 3.4.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.3.0", + "parking", + "polling 3.5.0", + "rustix 0.38.34", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-object-pool" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeb901c30ebc2fc4ab46395bbfbdba9542c16559d853645d75190c3056caf3bc" +dependencies = [ + "async-std", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io 1.13.0", + "async-lock 2.8.0", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.1.0", + "futures-lite 1.13.0", + "rustix 0.38.34", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-signal" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "329972aa325176e89114919f2a80fdae4f4c040f66a370b1a1159c6c0f94e7aa" +dependencies = [ + "async-io 2.3.3", + "async-lock 3.4.0", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.34", + "signal-hook-registry", + "slab", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -198,6 +409,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.77" @@ -233,6 +450,12 @@ dependencies = [ "critical-section", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -283,6 +506,17 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "basic-cookies" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67bd8fd42c16bdb08688243dc5f0cc117a3ca9efeeaba3a345a18a6159ad96f7" +dependencies = [ + "lalrpop", + "lalrpop-util", + "regex", +] + [[package]] name = "binascii" version = "0.1.4" @@ -377,6 +611,15 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + [[package]] name = "bit-vec" version = "0.6.3" @@ -434,6 +677,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel 2.3.1", + "async-task", + "futures-io", + "futures-lite 2.3.0", + "piper", +] + [[package]] name = "bluez-async" version = "0.7.2" @@ -808,6 +1064,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1368,6 +1633,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ena" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d248bdd43ce613d87415282f69b9bb99d947d290b10962dd6c56233312c2ad5" +dependencies = [ + "log", +] + [[package]] name = "encdec" version = "0.9.0" @@ -1460,6 +1734,44 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener 5.3.1", + "pin-project-lite", +] + [[package]] name = "failure" version = "0.1.8" @@ -1482,6 +1794,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -1691,6 +2012,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -1799,6 +2148,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" @@ -2091,6 +2452,34 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "httpmock" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ec9586ee0910472dec1a1f0f8acf52f0fdde93aea74d70d4a3107b4be0fd5b" +dependencies = [ + "assert-json-diff", + "async-object-pool", + "async-std", + "async-trait", + "base64", + "basic-cookies", + "crossbeam-utils", + "form_urlencoded", + "futures-util", + "hyper", + "lazy_static", + "levenshtein", + "log", + "regex", + "serde", + "serde_json", + "serde_regex", + "similar", + "tokio", + "url", +] + [[package]] name = "humantime" version = "2.1.0" @@ -2114,7 +2503,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -2220,12 +2609,32 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.4", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -2239,7 +2648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi 0.3.4", - "rustix", + "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -2252,6 +2661,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -2320,6 +2738,46 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lalrpop" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cb077ad656299f160924eb2912aa147d7339ea7d69e1b5517326fdcec3c1ca" +dependencies = [ + "ascii-canvas", + "bit-set", + "ena", + "itertools 0.11.0", + "lalrpop-util", + "petgraph", + "pico-args", + "regex", + "regex-syntax 0.8.2", + "string_cache", + "term", + "tiny-keccak", + "unicode-xid", + "walkdir", +] + +[[package]] +name = "lalrpop-util" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507460a910eb7b32ee961886ff48539633b788a36b65692b95f225b844c82553" +dependencies = [ + "regex-automata 0.4.4", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2431,6 +2889,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "levenshtein" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" + [[package]] name = "libc" version = "0.2.155" @@ -2490,6 +2954,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -2532,6 +3002,9 @@ name = "log" version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +dependencies = [ + "value-bag", +] [[package]] name = "loom" @@ -2892,7 +3365,7 @@ dependencies = [ "sentry", "serde", "sha3", - "siphasher", + "siphasher 1.0.0", "slog", "slog-async", "slog-atomic", @@ -3481,6 +3954,7 @@ dependencies = [ "ed25519-dalek", "grpcio", "hex", + "httpmock", "itertools 0.10.5", "ledger-mob", "libsqlite3-sys", @@ -4645,6 +5119,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + [[package]] name = "nom" version = "7.1.3" @@ -4928,6 +5408,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -5024,6 +5510,21 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher 0.3.11", +] + +[[package]] +name = "pico-args" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -5036,6 +5537,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -5058,6 +5570,36 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "626dec3cac7cc0e1577a2ec3fc496277ec2baa084bebad95bb6fdbfae235f84c" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24f040dee2588b4963afb4e420540439d126f73fdacf4a9c486a96d840bac3c9" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.34", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "polyval" version = "0.6.1" @@ -5082,6 +5624,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "predicates" version = "3.1.0" @@ -5792,6 +6340,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.34" @@ -5801,7 +6363,7 @@ dependencies = [ "bitflags 2.4.2", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", "windows-sys 0.52.0", ] @@ -6164,6 +6726,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_regex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf" +dependencies = [ + "regex", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.5" @@ -6283,6 +6855,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "similar" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" + [[package]] name = "simplelog" version = "0.12.1" @@ -6294,6 +6872,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "siphasher" version = "1.0.0" @@ -6417,6 +7001,16 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -6482,6 +7076,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "string_cache" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" +dependencies = [ + "new_debug_unreachable", + "once_cell", + "parking_lot", + "phf_shared", + "precomputed-hash", +] + [[package]] name = "strsim" version = "0.8.0" @@ -6700,8 +7307,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", - "rustix", + "fastrand 2.0.1", + "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -6842,6 +7449,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -6871,7 +7487,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -7228,6 +7844,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" + [[package]] name = "vcpkg" version = "0.2.15" @@ -7268,6 +7890,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "walkdir" version = "2.4.0" @@ -7384,7 +8012,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix", + "rustix 0.38.34", ] [[package]] diff --git a/full-service/Cargo.toml b/full-service/Cargo.toml index 98ced4a08..b0d304c6e 100644 --- a/full-service/Cargo.toml +++ b/full-service/Cargo.toml @@ -114,6 +114,7 @@ mc-fog-report-validation = { path = "../mobilecoin/fog/report/validation", featu mc-fog-report-validation-test-utils = { path = "../mobilecoin/fog/report/validation/test-utils" } bs58 = "0.5.0" +httpmock = "0.7.0" tempdir = "0.3" tokio = "1.27" url = "2.3" diff --git a/full-service/src/bin/main.rs b/full-service/src/bin/main.rs index 11fbab34d..b27a0bd3c 100644 --- a/full-service/src/bin/main.rs +++ b/full-service/src/bin/main.rs @@ -14,7 +14,7 @@ use mc_consensus_scp::QuorumSet; use mc_fog_report_resolver::FogResolver; use mc_full_service::{ check_host, - config::{APIConfig, NetworkConfig}, + config::{APIConfig, NetworkConfig, WebhookConfig}, wallet::{consensus_backed_rocket, validator_backed_rocket, APIKeyState, WalletState}, ValidatorLedgerSyncThread, WalletDb, WalletService, }; @@ -117,6 +117,11 @@ fn rocket() -> Rocket { tx_sources, }; + let webhook_config = config.deposits_webhook_url.clone().map(|wu| WebhookConfig { + url: wu, + poll_interval: config.poll_interval.clone(), + }); + let rocket = if let Some(validator_uri) = config.validator.as_ref() { validator_backed_full_service( validator_uri, @@ -124,10 +129,18 @@ fn rocket() -> Rocket { network_config, wallet_db, rocket_config, + webhook_config, logger, ) } else { - consensus_backed_full_service(&config, network_config, wallet_db, rocket_config, logger) + consensus_backed_full_service( + &config, + network_config, + wallet_db, + rocket_config, + webhook_config, + logger, + ) }; let api_key = env::var("MC_API_KEY").unwrap_or_default(); @@ -139,6 +152,7 @@ fn consensus_backed_full_service( network_config: NetworkConfig, wallet_db: Option, rocket_config: rocket::Config, + webhook_config: Option, logger: Logger, ) -> Rocket { // Create enclave trusted identity. @@ -245,6 +259,7 @@ fn consensus_backed_full_service( config.get_fog_resolver_factory(logger.clone()), config.offline, config.t3_sync_config.clone(), + webhook_config, logger, ); @@ -260,6 +275,7 @@ fn validator_backed_full_service( network_config: NetworkConfig, wallet_db: Option, rocket_config: rocket::Config, + webhook_config: Option, logger: Logger, ) -> Rocket { if config.watcher_db.is_some() { @@ -342,6 +358,7 @@ fn validator_backed_full_service( }), false, config.t3_sync_config.clone(), + webhook_config, logger, ); diff --git a/full-service/src/config.rs b/full-service/src/config.rs index 38504a5eb..ff2ee5e84 100644 --- a/full-service/src/config.rs +++ b/full-service/src/config.rs @@ -20,6 +20,7 @@ use mc_util_uri::{ConnectionUri, ConsensusClientUri, FogUri}; use mc_validator_api::ValidatorUri; use clap::Parser; +use reqwest::Url; use serde::{Deserialize, Serialize}; use std::{ convert::TryFrom, @@ -89,6 +90,33 @@ pub struct APIConfig { /// T3 Server to connect to and the api key to use for authorization. #[clap(flatten)] pub t3_sync_config: T3Config, + + /// Webhook configuration to notify an external server listening for + /// deposit notifications. + /// + /// The format of the webhook is a POST request with the following query + /// parameters: + /// + /// POST /webhook -H "Content-Type: application/json" \ + /// -d '{"accounts": [A,B,C], "restart": false}' + /// + /// The first time full-service is caught up with the network ledger, + /// it will send a webhook with {"restart": true, "accounts": [A,]} + /// where "accounts" may be empty. + /// + /// Where the num_txos provided indicate how many txos were received + /// in the last scan period for any account in the wallet. + /// + /// The expected action to take in response to the webhook is to call + /// the `get_txos` API endpoint for the given accounts to retrieve more + /// details about the TXOs received. + /// + /// We expect a 200 response code to indicate that the webhook was + /// received, and we do not further inspect the response body. Even if + /// not a 200 response, we will continue to attempt to reach the webhook + /// on subsequent deposits. + #[clap(long, value_parser = Url::parse, env = "MC_DEPOSITS_WEBHOOK_URL")] + pub deposits_webhook_url: Option, } fn parse_quorum_set_from_json(src: &str) -> Result, String> { @@ -392,3 +420,10 @@ impl LedgerDbConfig { ledger_db } } + +/// The Webhook Setup object. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct WebhookConfig { + pub url: Url, + pub poll_interval: Duration, +} diff --git a/full-service/src/db/account.rs b/full-service/src/db/account.rs index 9c8d4aede..a94df46ea 100644 --- a/full-service/src/db/account.rs +++ b/full-service/src/db/account.rs @@ -29,9 +29,10 @@ use mc_core::slip10::Slip10KeyGenerator; use mc_crypto_digestible::{Digestible, MerlinTranscript}; use mc_crypto_keys::{RistrettoPrivate, RistrettoPublic}; use mc_transaction_core::{get_tx_out_shared_secret, TokenId}; +use serde_derive::Serialize; use std::fmt; -#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize)] pub struct AccountID(pub String); impl From<&AccountKey> for AccountID { diff --git a/full-service/src/db/transaction_log.rs b/full-service/src/db/transaction_log.rs index 353e60bbd..07c471644 100644 --- a/full-service/src/db/transaction_log.rs +++ b/full-service/src/db/transaction_log.rs @@ -849,7 +849,11 @@ impl TransactionLogModel for TransactionLog { #[cfg(test)] mod tests { - use std::ops::DerefMut; + use std::{ + collections::HashMap, + ops::DerefMut, + sync::{Arc, Mutex}, + }; use mc_account_keys::{PublicAddress, CHANGE_SUBADDRESS_INDEX}; use mc_common::logger::{async_test_with_logger, Logger}; @@ -892,7 +896,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1075,7 +1084,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1166,7 +1180,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1275,7 +1294,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1350,7 +1374,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1582,7 +1611,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1813,7 +1847,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, diff --git a/full-service/src/json_rpc/v1/api/test_utils.rs b/full-service/src/json_rpc/v1/api/test_utils.rs index a2e4fd3c0..6461679db 100644 --- a/full-service/src/json_rpc/v1/api/test_utils.rs +++ b/full-service/src/json_rpc/v1/api/test_utils.rs @@ -122,6 +122,7 @@ pub fn create_test_setup( get_resolver_factory(rng).unwrap(), false, T3Config::default(), + None, logger, ); diff --git a/full-service/src/json_rpc/v2/api/test_utils.rs b/full-service/src/json_rpc/v2/api/test_utils.rs index f3c06c06f..8bb85ddc5 100644 --- a/full-service/src/json_rpc/v2/api/test_utils.rs +++ b/full-service/src/json_rpc/v2/api/test_utils.rs @@ -40,6 +40,7 @@ use rocket::{ use tempdir::TempDir; use url::Url; +use crate::config::WebhookConfig; use std::{ convert::TryFrom, sync::{ @@ -138,6 +139,7 @@ pub fn create_test_setup( mut rng: &mut StdRng, use_wallet_db: bool, use_watcher_db: bool, + webhook_config: Option, logger: Logger, ) -> ( rocket::Rocket, @@ -178,6 +180,7 @@ pub fn create_test_setup( get_resolver_factory(rng).unwrap(), false, T3Config::default(), + webhook_config, logger, ); @@ -201,7 +204,29 @@ pub fn setup( Arc>>>, ) { let (rocket_instance, ledger_db, db_test_context, network_state) = - create_test_setup(rng, true, false, logger); + create_test_setup(rng, true, false, None, logger); + + let rocket = rocket_instance.manage(APIKeyState("".to_string())); + ( + Client::untracked(rocket).expect("valid rocket instance"), + ledger_db, + db_test_context, + network_state, + ) +} + +pub fn setup_with_webhook( + rng: &mut StdRng, + webhook_config: WebhookConfig, + logger: Logger, +) -> ( + Client, + LedgerDB, + WalletDbTestContext, + Arc>>>, +) { + let (rocket_instance, ledger_db, db_test_context, network_state) = + create_test_setup(rng, true, false, Some(webhook_config), logger); let rocket = rocket_instance.manage(APIKeyState("".to_string())); ( @@ -222,7 +247,7 @@ pub fn setup_with_watcher( Arc>>>, ) { let (rocket_instance, ledger_db, db_test_context, network_state) = - create_test_setup(rng, true, true, logger); + create_test_setup(rng, true, true, None, logger); let rocket = rocket_instance.manage(APIKeyState("".to_string())); ( @@ -243,7 +268,7 @@ pub fn setup_no_wallet_db( Arc>>>, ) { let (rocket_instance, ledger_db, db_test_context, network_state) = - create_test_setup(rng, false, false, logger); + create_test_setup(rng, false, false, None, logger); let rocket = rocket_instance.manage(APIKeyState("".to_string())); ( @@ -265,7 +290,7 @@ pub fn setup_with_api_key( Arc>>>, ) { let (rocket_instance, ledger_db, db_test_context, network_state) = - create_test_setup(rng, true, false, logger); + create_test_setup(rng, true, false, None, logger); let rocket = rocket_instance.manage(APIKeyState(api_key)); diff --git a/full-service/src/json_rpc/v2/e2e_tests/mod.rs b/full-service/src/json_rpc/v2/e2e_tests/mod.rs index 215cf4f51..5c96ff2ab 100644 --- a/full-service/src/json_rpc/v2/e2e_tests/mod.rs +++ b/full-service/src/json_rpc/v2/e2e_tests/mod.rs @@ -1,3 +1,4 @@ mod account; mod other; mod transaction; +mod webhook; diff --git a/full-service/src/json_rpc/v2/e2e_tests/webhook.rs b/full-service/src/json_rpc/v2/e2e_tests/webhook.rs new file mode 100644 index 000000000..1bd0eafaa --- /dev/null +++ b/full-service/src/json_rpc/v2/e2e_tests/webhook.rs @@ -0,0 +1,146 @@ +// Copyright (c) &2020-2022 MobileCoin Inc. + +//! End-to-end tests for the Full Service Wallet API. + +#[cfg(test)] +mod e2e_webhook { + use crate::{ + config::WebhookConfig, + db::{ + account::{AccountID, AccountModel}, + models::Account, + }, + json_rpc::v2::api::test_utils::{dispatch, setup_with_webhook}, + test_utils::{add_block_to_ledger_db, MOB}, + util::b58::b58_decode_public_address, + }; + use httpmock::{Method::POST, MockServer}; + use mc_common::logger::{log, test_with_logger, Logger}; + use mc_ledger_db::Ledger; + use mc_rand::RngCore; + use mc_transaction_core::ring_signature::KeyImage; + use rand::{rngs::StdRng, SeedableRng}; + use reqwest::{ + blocking::Client, + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, + Url, + }; + use serde_json::json; + use std::{ops::DerefMut, thread, time::Duration}; + + #[test_with_logger] + fn test_webhook(logger: Logger) { + let mut rng: StdRng = SeedableRng::from_seed([20u8; 32]); + + // The mock webhook server is listening for which accounts received txos + let server = MockServer::start(); + let mut sanity_webhook_mock = server.mock(|when, then| { + when.method(POST) + .path("/received_txos") + .body(json!( + { + "accounts": ["6c683070306016817d5fc963b5d3f794eb8c120428fb14c0ff33ed39ce9bd062"], + "restart": false + } + ).to_string()); + then.status(200) + .body(""); + }); + let webhook_url = Url::parse(&server.url("/received_txos")).unwrap(); + let webhook_config = WebhookConfig { + url: webhook_url.clone(), + poll_interval: Duration::from_millis(10), + }; + + let (client, mut ledger_db, db_ctx, _network_state) = + setup_with_webhook(&mut rng, webhook_config, logger.clone()); + + let reqwest_client = Client::builder().build().unwrap(); + let mut reqwest_json_headers = HeaderMap::new(); + reqwest_json_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + // Sanity check: we can hit the webhook server with reqwest + let response = reqwest_client + .post(webhook_url) + .body(json!( + { + "accounts": ["6c683070306016817d5fc963b5d3f794eb8c120428fb14c0ff33ed39ce9bd062"], + "restart": false + } + ).to_string()) + .send() + .unwrap() + .error_for_status() + .unwrap(); + assert_eq!(response.status(), 200); + sanity_webhook_mock.assert(); // assert exact contents of the "when" above + sanity_webhook_mock.assert_hits(1); + sanity_webhook_mock.delete(); + + // Add an account and let the sync and webhook threads launch & run + let body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "create_account", + "params": { + "name": "Alice Main Account", + } + }); + let res = dispatch(&client, body, &logger); + let account_obj = res.get("result").unwrap().get("account").unwrap(); + let account_id = account_obj.get("id").unwrap().as_str().unwrap(); + let public_address = + b58_decode_public_address(account_obj.get("main_address").unwrap().as_str().unwrap()) + .unwrap(); + + // Set up the server for webhooks that will fire when the account is done + // syncing + let webhook_mock = + server.mock(|when, then| { + when.method(POST).path("/received_txos").body( + json!( + { + "accounts": [account_id] + } + ) + .to_string(), + ); + then.status(200) + .header("content-type", "application/json") + .body(json!({"received": true}).to_string()); // FIXME: we don't really care about the response body + }); + + // Add blocks for this account - the syncing thread will automatically be + // running as these blocks are added + for i in 0..10 { + add_block_to_ledger_db( + &mut ledger_db, + &vec![public_address.clone()], + 100 * MOB + (i as u64), + &[KeyImage::from(rng.next_u64())], + &mut rng, + ); + } + + // Wait for the account sync thread to finish syncing + let wallet_db = &db_ctx.get_db_instance(logger.clone()); + let mut pooled_conn = wallet_db.get_pooled_conn().unwrap(); + let conn = pooled_conn.deref_mut(); + loop { + let account = Account::get(&AccountID(account_id.to_string()), conn).unwrap(); + if account.next_block_index as u64 >= ledger_db.num_blocks().unwrap() { + // We have to give the account sync thread a chance to set + // the shared accounts_with_deposits, and then the webhook thread + // the chance to fire the webhook + thread::sleep(Duration::from_millis(100)); + break; + } + } + + assert_eq!(ledger_db.num_blocks().unwrap(), 22); + log::debug!(logger, "webhook was called {} times", webhook_mock.hits()); + // Should call the webhook at least once during syncing - depends on + // the race between the sync thread and the ledger adding blocks + assert!(webhook_mock.hits() >= 1); + } +} diff --git a/full-service/src/service/account.rs b/full-service/src/service/account.rs index c9279b873..50eee5350 100644 --- a/full-service/src/service/account.rs +++ b/full-service/src/service/account.rs @@ -880,7 +880,7 @@ mod tests { &mut rng, ); - let service = setup_wallet_service(ledger_db.clone(), logger); + let service = setup_wallet_service(ledger_db.clone(), None, logger); let wallet_db = &service.wallet_db.as_ref().unwrap(); assert_eq!(ledger_db.num_blocks().unwrap(), block_count as u64); @@ -982,7 +982,7 @@ mod tests { let initial_block_count = 12; let mut ledger_db = get_test_ledger(5, &[], initial_block_count, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let wallet_db = service.wallet_db.as_ref().unwrap(); let account_a = service @@ -1140,7 +1140,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db, logger); + let service = setup_wallet_service(ledger_db, None, logger); let wallet_db = &service.wallet_db.as_ref().unwrap(); // Create an account. @@ -1265,7 +1265,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let wallet_db = &service.wallet_db.as_ref().unwrap(); let mut pooled_conn = wallet_db.get_pooled_conn().unwrap(); let conn = pooled_conn.deref_mut(); diff --git a/full-service/src/service/address.rs b/full-service/src/service/address.rs index b09721a78..4c7736f92 100644 --- a/full-service/src/service/address.rs +++ b/full-service/src/service/address.rs @@ -214,7 +214,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db, logger); + let service = setup_wallet_service(ledger_db, None, logger); let pooled_conn = &mut service.get_pooled_conn().unwrap(); let conn = pooled_conn.deref_mut(); @@ -241,7 +241,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db, logger); + let service = setup_wallet_service(ledger_db, None, logger); let pooled_conn = &mut service.get_pooled_conn().unwrap(); let conn = pooled_conn.deref_mut(); @@ -279,7 +279,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db, logger); + let service = setup_wallet_service(ledger_db, None, logger); let account_key = AccountKey::random(&mut rng); let public_address = account_key.subaddress(rng.next_u64()); @@ -297,7 +297,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db, logger); + let service = setup_wallet_service(ledger_db, None, logger); // Empty string should fail let public_address_b58 = ""; diff --git a/full-service/src/service/balance.rs b/full-service/src/service/balance.rs index ac8c860fc..d3b3cee88 100644 --- a/full-service/src/service/balance.rs +++ b/full-service/src/service/balance.rs @@ -483,7 +483,7 @@ mod tests { ]; let ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let account = service .import_account_from_legacy_root_entropy( diff --git a/full-service/src/service/gift_code.rs b/full-service/src/service/gift_code.rs index 7a5dcf8b3..31457e438 100644 --- a/full-service/src/service/gift_code.rs +++ b/full-service/src/service/gift_code.rs @@ -882,7 +882,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1065,7 +1065,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service diff --git a/full-service/src/service/mod.rs b/full-service/src/service/mod.rs index 14f92697e..60e1730e9 100644 --- a/full-service/src/service/mod.rs +++ b/full-service/src/service/mod.rs @@ -23,5 +23,6 @@ pub mod txo; pub mod watcher; mod wallet_service; +mod webhook; pub use wallet_service::WalletService; diff --git a/full-service/src/service/models/tx_proposal.rs b/full-service/src/service/models/tx_proposal.rs index d2c0fdea1..ca1483637 100644 --- a/full-service/src/service/models/tx_proposal.rs +++ b/full-service/src/service/models/tx_proposal.rs @@ -481,7 +481,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service diff --git a/full-service/src/service/receipt.rs b/full-service/src/service/receipt.rs index 90524ab8f..58e0ea891 100644 --- a/full-service/src/service/receipt.rs +++ b/full-service/src/service/receipt.rs @@ -381,7 +381,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let alice = service .create_account( Some("Alice's Main Account".to_string()), @@ -516,7 +516,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let alice = service .create_account( Some("Alice's Main Account".to_string()), @@ -643,7 +643,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let alice = service .create_account( Some("Alice's Main Account".to_string()), @@ -791,7 +791,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let alice = service .create_account( Some("Alice's Main Account".to_string()), diff --git a/full-service/src/service/sync.rs b/full-service/src/service/sync.rs index 5c0c24289..e533b252e 100644 --- a/full-service/src/service/sync.rs +++ b/full-service/src/service/sync.rs @@ -17,7 +17,7 @@ use crate::{ use mc_account_keys::{AccountKey, ViewAccountKey}; use mc_common::{ logger::{log, Logger}, - HashMap, + HashMap as MCHashMap, }; use mc_crypto_keys::{RistrettoPrivate, RistrettoPublic}; use mc_ledger_db::{Ledger, LedgerDB}; @@ -31,13 +31,14 @@ use mc_transaction_core::{ use rayon::prelude::*; use std::{ + collections::HashMap, convert::TryFrom, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, thread, - time::Instant, + time::{Duration, Instant}, }; const BLOCKS_CHUNK_SIZE: u64 = 1_000; @@ -52,11 +53,17 @@ pub struct SyncThread { } impl SyncThread { - pub fn start(ledger_db: LedgerDB, wallet_db: WalletDb, logger: Logger) -> Self { + pub fn start( + ledger_db: LedgerDB, + wallet_db: WalletDb, + accounts_with_deposits: Arc>>, + logger: Logger, + ) -> Self { // Start the sync thread. let stop_requested = Arc::new(AtomicBool::new(false)); let thread_stop_requested = stop_requested.clone(); + let thread_accounts_with_deposits = accounts_with_deposits.clone(); let join_handle = Some( thread::Builder::new() @@ -73,14 +80,20 @@ impl SyncThread { log::debug!(logger, "SyncThread stop requested."); break; } - match sync_all_accounts(&ledger_db, conn, &logger) { + + match sync_all_accounts( + &ledger_db, + conn, + thread_accounts_with_deposits.clone(), + &logger, + ) { Ok(()) => (), Err(e) => log::error!(&logger, "Error during account sync:\n{:?}", e), } // This sleep is to allow other API calls that need access to the database a // chance to execute, because the sync process requires a write lock on the // database. - thread::sleep(std::time::Duration::from_millis(10)); + thread::sleep(Duration::from_millis(10)); } log::debug!(logger, "SyncThread stopped."); }) @@ -110,6 +123,7 @@ impl Drop for SyncThread { pub fn sync_all_accounts( ledger_db: &LedgerDB, conn: Conn, + accounts_with_deposits: Arc>>, logger: &Logger, ) -> Result<(), SyncError> { // Get the current number of blocks in ledger. @@ -117,7 +131,8 @@ pub fn sync_all_accounts( .num_blocks() .expect("failed getting number of blocks"); if num_blocks == 0 { - return Ok(()); + return Ok(()); // FIXME: we want it to fire in this case with empty + // accounts } // Go over our list of accounts and see which ones need to process more blocks. @@ -130,13 +145,29 @@ pub fn sync_all_accounts( // If the account is currently resyncing, we need to set it to false // here. if account.next_block_index as u64 > num_blocks - 1 { + // For any account that we've found deposits, set the "fully-synced" flag + // to true, which will enable the webhook to fire for it. The WebhookThread + // will then clear that entry from the HashMap. + let mut account_set = accounts_with_deposits.lock().unwrap(); + account_set + .entry(AccountID(account.id.clone())) + .and_modify(|v| *v = true); + if account.resyncing { account.update_resyncing(false, conn)?; } continue; } - sync_account_next_chunk(ledger_db, conn, &account.id, logger)?; + let found_txos = sync_account_next_chunk(ledger_db, conn, &account.id, logger)?; + if found_txos > 0 && !account.resyncing { + // Start tracking the accounts with deposits, but do not fire the webhook + // until they are fully synced. + accounts_with_deposits + .lock() + .unwrap() + .insert(AccountID(account.id), false); + } } Ok(()) @@ -147,7 +178,7 @@ pub fn sync_account_next_chunk( conn: Conn, account_id_hex: &str, logger: &Logger, -) -> Result<(), SyncError> { +) -> Result { exclusive_transaction(conn, |conn| { // Get the account data. If it is no longer available, the account has been // removed and we can simply return. @@ -186,11 +217,11 @@ pub fn sync_account_next_chunk( // If no blocks were found, exit. if end_block_index.is_none() { - return Ok(()); + return Ok(0); } let end_block_index = end_block_index.unwrap(); - if account.view_only { + let num_received_txos = if account.view_only { let view_account_key: ViewAccountKey = mc_util_serial::decode(&account.account_key)?; // Attempt to decode each transaction as received by this account. @@ -231,7 +262,7 @@ pub fn sync_account_next_chunk( } // Match key images to mark existing unspent transactions as spent. - let unspent_key_images: HashMap = + let unspent_key_images: MCHashMap = Txo::list_unspent_or_pending_key_images(account_id_hex, None, conn)?; let spent_txos: Vec<(u64, String)> = key_images .into_par_iter() @@ -276,6 +307,7 @@ pub fn sync_account_next_chunk( num_spent_txos, unspent_key_images.len() ); + num_received_txos } else { let account_key: AccountKey = mc_util_serial::decode(&account.account_key)?; @@ -320,7 +352,7 @@ pub fn sync_account_next_chunk( } // Match key images to mark existing unspent transactions as spent. - let unspent_key_images: HashMap = + let unspent_key_images: MCHashMap = Txo::list_unspent_or_pending_key_images(account_id_hex, None, conn)?; let spent_txos: Vec<(u64, String)> = key_images .into_par_iter() @@ -364,9 +396,10 @@ pub fn sync_account_next_chunk( num_spent_txos, unspent_key_images.len() ); + num_received_txos }; - Ok(()) + Ok(num_received_txos) }) } @@ -489,7 +522,7 @@ mod tests { &mut rng, ); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let wallet_db = &service.wallet_db.as_ref().unwrap(); // Import the account diff --git a/full-service/src/service/transaction.rs b/full-service/src/service/transaction.rs index 9bd5fffdc..f850c0f40 100644 --- a/full-service/src/service/transaction.rs +++ b/full-service/src/service/transaction.rs @@ -753,7 +753,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -925,7 +925,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1169,7 +1169,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1232,7 +1232,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1344,7 +1344,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1534,7 +1534,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service @@ -1754,7 +1754,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let mut pooled_conn = service.get_pooled_conn().unwrap(); let conn = pooled_conn.deref_mut(); diff --git a/full-service/src/service/transaction_builder.rs b/full-service/src/service/transaction_builder.rs index cb85917ce..903e1578c 100644 --- a/full-service/src/service/transaction_builder.rs +++ b/full-service/src/service/transaction_builder.rs @@ -583,7 +583,7 @@ fn extract_fog_uri(addr: &PublicAddress) -> Result, WalletTransac #[cfg(test)] mod tests { - use std::ops::DerefMut; + use std::{collections::HashMap, ops::DerefMut, sync::Mutex}; use super::*; use crate::{ @@ -608,7 +608,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -666,7 +671,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 25, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); // Give ourselves enough MOB that we have more than u64::MAX, 18_446_745 MOB // This is 55_000_000 * MOB @@ -774,7 +784,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -871,7 +886,12 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -971,7 +991,12 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); // These are values close to u64::MAX, but easier to work with and test (quick // maths) @@ -1042,7 +1067,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1120,7 +1150,12 @@ mod tests { let conn = pooled_conn.deref_mut(); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1209,7 +1244,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1312,7 +1352,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1368,7 +1413,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, @@ -1435,7 +1485,12 @@ mod tests { let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); // Start sync thread - let _sync_thread = SyncThread::start(ledger_db.clone(), wallet_db.clone(), logger.clone()); + let _sync_thread = SyncThread::start( + ledger_db.clone(), + wallet_db.clone(), + Arc::new(Mutex::new(HashMap::::new())), + logger.clone(), + ); let account_key = random_account_with_seed_values( &wallet_db, diff --git a/full-service/src/service/transaction_log.rs b/full-service/src/service/transaction_log.rs index 5f5c54a1f..f3c298d78 100644 --- a/full-service/src/service/transaction_log.rs +++ b/full-service/src/service/transaction_log.rs @@ -149,7 +149,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); // Create our main account for the wallet let alice = service diff --git a/full-service/src/service/txo.rs b/full-service/src/service/txo.rs index abe2364f4..888d5bbd0 100644 --- a/full-service/src/service/txo.rs +++ b/full-service/src/service/txo.rs @@ -375,7 +375,7 @@ mod tests { let known_recipients: Vec = Vec::new(); let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng); - let service = setup_wallet_service(ledger_db.clone(), logger.clone()); + let service = setup_wallet_service(ledger_db.clone(), None, logger.clone()); let alice = service .create_account( Some("Alice's Main Account".to_string()), diff --git a/full-service/src/service/wallet_service.rs b/full-service/src/service/wallet_service.rs index f3648b027..5f8cc1685 100644 --- a/full-service/src/service/wallet_service.rs +++ b/full-service/src/service/wallet_service.rs @@ -3,11 +3,12 @@ //! The Wallet Service for interacting with the wallet. use crate::{ - config::NetworkConfig, - db::{WalletDb, WalletDbError}, + config::{NetworkConfig, WebhookConfig}, + db::{account::AccountID, WalletDb, WalletDbError}, service::{ sync::SyncThread, t3_sync::{T3Config, T3SyncThread}, + webhook::WebhookThread, }, }; use diesel::{ @@ -24,7 +25,10 @@ use mc_ledger_sync::PollingNetworkState; use mc_rand::rand_core::RngCore; use mc_util_uri::FogUri; use mc_watcher::watcher_db::WatcherDB; -use std::sync::{atomic::AtomicUsize, Arc, RwLock}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc, Mutex, RwLock}, +}; /// Service for interacting with the wallet /// @@ -64,6 +68,9 @@ pub struct WalletService< /// Background T3 sync thread. _t3_sync_thread: Option, + /// Webhook Thread + _webhook_thread: Option, + /// Monotonically increasing counter. This is used for node round-robin /// selection. pub submit_node_offset: Arc, @@ -91,17 +98,35 @@ impl< fog_resolver_factory: Arc Result + Send + Sync>, offline: bool, t3_sync_config: T3Config, + webhook_config: Option, logger: Logger, ) -> Self { - let sync_thread = if let Some(wallet_db) = wallet_db.clone() { + let (sync_thread, webhook_thread) = if let Some(wallet_db) = wallet_db.clone() { log::info!(logger, "Starting Wallet TXO Sync Task Thread"); - Some(SyncThread::start( - ledger_db.clone(), - wallet_db, - logger.clone(), - )) + + let accounts_with_deposits = Arc::new(Mutex::new(HashMap::::new())); + + ( + Some(SyncThread::start( + ledger_db.clone(), + wallet_db, + accounts_with_deposits.clone(), + logger.clone(), + )), + // As a companion to the account syncing, start the webhook syncing + // if configured + if let Some(wh_config) = webhook_config { + Some(WebhookThread::start( + wh_config, + accounts_with_deposits.clone(), + logger.clone(), + )) + } else { + None + }, + ) } else { - None + (None, None) }; let t3_sync_thread = if let (Some(wallet_db), Some(t3_uri), Some(t3_api_key)) = ( @@ -131,6 +156,7 @@ impl< fog_resolver_factory, _sync_thread: sync_thread, _t3_sync_thread: t3_sync_thread, + _webhook_thread: webhook_thread, submit_node_offset: Arc::new(AtomicUsize::new(rng.next_u64() as usize)), offline, logger, diff --git a/full-service/src/service/webhook.rs b/full-service/src/service/webhook.rs new file mode 100644 index 000000000..5993d24f6 --- /dev/null +++ b/full-service/src/service/webhook.rs @@ -0,0 +1,136 @@ +// Copyright (c) 2018-2024 MobileCoin Inc. + +//! Manages sending a webhook for synced accounts that have received deposits + +use crate::db::account::AccountID; +use mc_common::logger::{log, Logger}; + +use crate::config::WebhookConfig; +use reqwest::{ + blocking::Client, + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, +}; +use serde_json::json; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + thread, +}; + +pub struct WebhookThread { + /// The main sync thread handle. + join_handle: Option>, + + /// Stop trigger, used to signal the thread to terminate. + stop_requested: Arc, +} + +impl WebhookThread { + pub fn start( + webhook_config: WebhookConfig, + accounts_with_deposits: Arc>>, + logger: Logger, + ) -> Self { + // Start the webhook thread. + + let stop_requested = Arc::new(AtomicBool::new(false)); + let thread_stop_requested = stop_requested.clone(); + + // Question: Should we consider only spawning a thread when there + // have been received txos, and therefore something to send? + // Answer: For now, we are ok with having this thread running all the time, + // because it is lightweight enough. If we find that it is causing issues, + // we will revisit. The solution would be to make this async, or to only + // spawn the thread when there are txos to send. There may be an advantage to + // leaving the connection open. + + let join_handle = Some( + thread::Builder::new() + .name("webhook".to_string()) + .spawn(move || { + log::debug!(logger, "Webhook thread started."); + + let client = Client::builder() + .build() + .expect("Failed creating reqwest client"); + let mut json_headers = HeaderMap::new(); + json_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + loop { + if thread_stop_requested.load(Ordering::SeqCst) { + log::debug!(logger, "WebhookThread stop requested."); + break; + } + // Gather the current accounts_to_send, then wipe the contents + let accounts_to_send: Vec<_> = accounts_with_deposits + .lock() + .unwrap() + .clone() + .iter() + .filter(|&(_k, &v)| v == true) + .map(|(k, _v)| k.clone()) + .collect(); + + // Delete the keys that we're alerting on + for key in accounts_to_send.iter() { + log::debug!(logger, "Account to send: {:?}", key); + accounts_with_deposits.lock().unwrap().remove(&key); + } + + if accounts_to_send.len() > 0 { + // Question: will this keep the connection open? Or will it + // close the connection after this request? + match client + .post(webhook_config.url.clone()) + .body( + json!( + { + "accounts": accounts_to_send, + } + ) + .to_string(), + ) + .send() + { + Ok(response) => match response.error_for_status() { + Ok(_) => (), + Err(e) => { + log::error!( + logger, + "Failed getting webhook response: {:?}", + e + ); + } + }, + Err(e) => { + log::error!(logger, "Failed sending webhook request: {:?}", e); + } + } + } + // for new blocks from consensus + thread::sleep(webhook_config.poll_interval); + } + }) + .expect("failed starting webhook thread"), + ); + Self { + join_handle, + stop_requested, + } + } + pub fn stop(&mut self) { + self.stop_requested.store(true, Ordering::SeqCst); + if let Some(join_handle) = self.join_handle.take() { + join_handle.join().expect("WebhookThread join failed"); + } + } +} + +impl Drop for WebhookThread { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/full-service/src/test_utils.rs b/full-service/src/test_utils.rs index 9b15f7579..ac3a12b14 100644 --- a/full-service/src/test_utils.rs +++ b/full-service/src/test_utils.rs @@ -1,5 +1,6 @@ use crate::service::t3_sync::T3Config; // Copyright (c) 2020-2021 MobileCoin Inc. +use crate::config::WebhookConfig; #[cfg(test)] use crate::{ config::NetworkConfig, @@ -650,16 +651,17 @@ pub fn get_resolver_factory( pub fn setup_wallet_service( ledger_db: LedgerDB, + webhook_config: Option, logger: Logger, ) -> WalletService, MockFogPubkeyResolver> { - setup_wallet_service_impl(ledger_db, logger, false, false) + setup_wallet_service_impl(ledger_db, logger, false, false, webhook_config) } pub fn setup_wallet_service_offline( ledger_db: LedgerDB, logger: Logger, ) -> WalletService, MockFogPubkeyResolver> { - setup_wallet_service_impl(ledger_db, logger, true, false) + setup_wallet_service_impl(ledger_db, logger, true, false, None) } fn setup_wallet_service_impl( @@ -667,6 +669,7 @@ fn setup_wallet_service_impl( logger: Logger, offline: bool, no_wallet_db: bool, + webhook_config: Option, ) -> WalletService, MockFogPubkeyResolver> { let mut rng: StdRng = SeedableRng::from_seed([20u8; 32]); @@ -695,6 +698,7 @@ fn setup_wallet_service_impl( get_resolver_factory(&mut rng).unwrap(), offline, T3Config::default(), + webhook_config, logger, ) }