From 201bc157db5a927587005aeba57e906fa506e1b5 Mon Sep 17 00:00:00 2001 From: Gustavo Date: Mon, 10 Jun 2024 08:03:48 -0600 Subject: [PATCH] update readme Signed-off-by: Gustavo --- README.md | 86 ++++++------------------------------------------------- 1 file changed, 8 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index 7e62dfe..34520ae 100644 --- a/README.md +++ b/README.md @@ -1,86 +1,16 @@ -## Substreams Sink Rust +# elric-rs +This is a Rust implementation of the [substreams-sink-rust](https://github.com/streamingfast/substreams-sink-rust) with focus to insert data into Clickhouse as fast as possible. -This repository show cases a functional base Rust project that consumes a Substreams `.spkg` (local file only). +The main difference between this and [sustreams-sink-sql](https://github.com/streamingfast/substreams-sink-sql) is about the clickhouse driver used. Our version uses [clickhouse.rs](https://github.com/loyd/clickhouse.rs) which contains some optimizations: instead of batching, we stream the data directly to clickhouse using async-inserts so the memory usage is way lower and the network load is spread throughout the program lifetime. -To run: +## Detail implementation -```bash -SUBSTREAMS_API_TOKEN="" cargo run https://mainnet.eth.streamingfast.io:443 -``` +### Cursor Persistence -### Details +We use replace on duplicates to persist cursor. This means that we are constantly inserting the cursor and use the latest of them to recover from a stop. -The presented Rust project contains a `SubstreamsStream` wrapper that handles automatic reconnection in case of error. It is implemented as a Rust `TryStream` which enable consuming the retryable stream easily using standard Rust syntax: -```rust -let stream = SubstreamsStream::new(...); +### Block Undo Signal -loop { - match stream.next().await { - None => { /* Stream completed, reached end block */ }, - Some(Ok(BlockResponse::New(data))) => { /* Got a BlockScopedData message */ }, - Some(Err(err)) => { /* Fatal error or retry limit reached */ }, - } -} -``` - -The `main.rs` file accepts three argument the endpoint to reach (in the form `http(s)?://:`), the local file `.spkg` to use for the request and the output module's name to stream from. - -#### Incomplete Implementation - -##### Cursor Persistence - -For now `cursor` handling is not properly loaded/saved to database, something that would be required on a production system to ensure the stream is resumed at the right location and that a block is never miss. - -Should be implemented in [main.rs](./src/main.rs) in `persist_cursor` function and in `load_persisted_cursor`. - -> **Warning** If you don't implement cursor persistence, if your process restart, it will start back from specified `start_block` (currently hard-coded to `0`). - -##### Logging - -We use `println` statements within [main.rs](./src/main.rs) and [substreams_stream.rs](./src/substreams_stream.rs) to demo what is happening within the codebase. Those, specially in [substreams_stream.rs](./src/substreams_stream.rs), should be replaced to be logged to a logging system. - -Also, more places could be instrumented to log extra details. - -##### Block Undo Signal - -`BlockUndoSignal` must be treated as "delete every data that has been recorded after block height specified by block in BlockUndoSignal". In the example above, this means you must delete changes done by `Block #7b` and `Block #6b`. The exact details depends on your own logic. If for example all your added record contain a block number, a simple way is to do `delete all records where block_num > 5` which is the block num received in the `BlockUndoSignal` (this is true for append only records, so when only `INSERT` are allowed). - -This is left to be implemented by you how to deal with that. - -> **Warning** It's done using `unimplemented!` macro which will panic if an undo signal is received, so be warned for a production system. Undo signals on Ethereum Mainnet happen around 5-10 times a day, even less so you might miss the fact that they exist when testing. - -### Protobuf Generation - -Protobuf generation is done using [buf](https://buf.build/) which can be installed with: - -```bash -brew install bufbuild/buf/buf -``` - -> **Note** See [other installation methods](https://buf.build/docs/installation/) if not on Mac/Linux (`brew` can be installed on Linux). - -#### From a pre-built `.spkg` - -If you have an `.spkg` that already contains your Protobuf definitions, you can use it directly. It contains Substreams system Protobuf definitions so you can generate everything in one shot: - -```bash -buf generate --exclude-path="google" #format=bin -``` - -> **Note** An `.spkg` contains recursively all Proto definitions, some you may not desire. You can exclude generation of some element via `--exclude-path="google"` flag. You can specify many separated by a comma. - -#### Just Substreams - -You can generate against published [Substreams Buf Module](https://buf.build/streamingfast/substreams): - -```bash -buf generate buf.build/streamingfast/substreams -``` - -This will include only Substreams system Protobufs to decode packages and perform RPC operations. - -#### Deprecated Notice - -You will see `WARN Plugin "buf.build/prost/crate" is deprecated` when generating the code, this is because `buf.build/community/neoeinstein-create:` is not yet available. +We use the same strategy used by [substreams-sink-database](https://github.com/streamingfast/substreams-sink-sql) which we use a configurable buffer so we are up to chain head minus the buffer. This value is configured to be "final" so no undo blocks occours.