Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Commit

Permalink
update readme
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo <[email protected]>
  • Loading branch information
gusinacio committed Jun 10, 2024
1 parent 5124c02 commit 201bc15
Showing 1 changed file with 8 additions and 78 deletions.
86 changes: 8 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,86 +1,16 @@
## Substreams Sink Rust
# elric-rs

This is a Rust implementation of the [substreams-sink-rust](https://github.com/streamingfast/substreams-sink-rust) with focus to insert data into Clickhouse as fast as possible.

This repository show cases a functional base Rust project that consumes a Substreams `.spkg` (local file only).
The main difference between this and [sustreams-sink-sql](https://github.com/streamingfast/substreams-sink-sql) is about the clickhouse driver used. Our version uses [clickhouse.rs](https://github.com/loyd/clickhouse.rs) which contains some optimizations: instead of batching, we stream the data directly to clickhouse using async-inserts so the memory usage is way lower and the network load is spread throughout the program lifetime.

To run:
## Detail implementation

```bash
SUBSTREAMS_API_TOKEN="<StreamingFast API Token>" cargo run https://mainnet.eth.streamingfast.io:443 <path/to/substreams.spkg> <module_name>
```
### Cursor Persistence

### Details
We use replace on duplicates to persist cursor. This means that we are constantly inserting the cursor and use the latest of them to recover from a stop.

The presented Rust project contains a `SubstreamsStream` wrapper that handles automatic reconnection in case of error. It is implemented as a Rust `TryStream` which enable consuming the retryable stream easily using standard Rust syntax:

```rust
let stream = SubstreamsStream::new(...);
### Block Undo Signal

loop {
match stream.next().await {
None => { /* Stream completed, reached end block */ },
Some(Ok(BlockResponse::New(data))) => { /* Got a BlockScopedData message */ },
Some(Err(err)) => { /* Fatal error or retry limit reached */ },
}
}
```

The `main.rs` file accepts three argument the endpoint to reach (in the form `http(s)?://<url>:<port>`), the local file `.spkg` to use for the request and the output module's name to stream from.

#### Incomplete Implementation

##### Cursor Persistence

For now `cursor` handling is not properly loaded/saved to database, something that would be required on a production system to ensure the stream is resumed at the right location and that a block is never miss.

Should be implemented in [main.rs](./src/main.rs) in `persist_cursor` function and in `load_persisted_cursor`.

> **Warning** If you don't implement cursor persistence, if your process restart, it will start back from specified `start_block` (currently hard-coded to `0`).
##### Logging

We use `println` statements within [main.rs](./src/main.rs) and [substreams_stream.rs](./src/substreams_stream.rs) to demo what is happening within the codebase. Those, specially in [substreams_stream.rs](./src/substreams_stream.rs), should be replaced to be logged to a logging system.

Also, more places could be instrumented to log extra details.

##### Block Undo Signal

`BlockUndoSignal` must be treated as "delete every data that has been recorded after block height specified by block in BlockUndoSignal". In the example above, this means you must delete changes done by `Block #7b` and `Block #6b`. The exact details depends on your own logic. If for example all your added record contain a block number, a simple way is to do `delete all records where block_num > 5` which is the block num received in the `BlockUndoSignal` (this is true for append only records, so when only `INSERT` are allowed).

This is left to be implemented by you how to deal with that.

> **Warning** It's done using `unimplemented!` macro which will panic if an undo signal is received, so be warned for a production system. Undo signals on Ethereum Mainnet happen around 5-10 times a day, even less so you might miss the fact that they exist when testing.
### Protobuf Generation

Protobuf generation is done using [buf](https://buf.build/) which can be installed with:

```bash
brew install bufbuild/buf/buf
```

> **Note** See [other installation methods](https://buf.build/docs/installation/) if not on Mac/Linux (`brew` can be installed on Linux).
#### From a pre-built `.spkg`

If you have an `.spkg` that already contains your Protobuf definitions, you can use it directly. It contains Substreams system Protobuf definitions so you can generate everything in one shot:

```bash
buf generate --exclude-path="google" <path/to/substream.spkg>#format=bin
```

> **Note** An `.spkg` contains recursively all Proto definitions, some you may not desire. You can exclude generation of some element via `--exclude-path="google"` flag. You can specify many separated by a comma.
#### Just Substreams

You can generate against published [Substreams Buf Module](https://buf.build/streamingfast/substreams):

```bash
buf generate buf.build/streamingfast/substreams
```

This will include only Substreams system Protobufs to decode packages and perform RPC operations.

#### Deprecated Notice

You will see `WARN Plugin "buf.build/prost/crate" is deprecated` when generating the code, this is because `buf.build/community/neoeinstein-create:<version>` is not yet available.
We use the same strategy used by [substreams-sink-database](https://github.com/streamingfast/substreams-sink-sql) which we use a configurable buffer so we are up to chain head minus the buffer. This value is configured to be "final" so no undo blocks occours.

0 comments on commit 201bc15

Please sign in to comment.