Skip to content

Commit

Permalink
geyser: use runtime instaed of unconstrained
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed May 5, 2024
1 parent 4629f17 commit 6c21de5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 29 deletions.
28 changes: 18 additions & 10 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ jobs:
- name: Build release tarball
run: ./ci/create-tarball.sh

- name: Deleteing directories to avoid upload conflict
run: |
rm -rf \
target/release/client.d \
target/release/config-check.d \
target/release/grpc-google-pubsub.d \
target/release/grpc-kafka.d \
target/release/grpc-scylladb.d
- name: Remove debug information
run: |
strip \
target/release/client \
target/release/config-check \
target/release/grpc-google-pubsub \
target/release/grpc-kafka \
target/release/grpc-scylladb
- name: Rename binaries
run: |
mv target/release/client target/release/client-${{ matrix.os }}
Expand All @@ -92,15 +110,6 @@ jobs:
mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.tar.bz2 ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.tar.bz2
mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.yml ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.yml
- name: Deleteing directories to avoid upload conflict
run: |
rm -rf \
target/release/client.d \
target/release/config-check.d \
target/release/grpc-google-pubsub.d \
target/release/grpc-kafka.d \
target/release/grpc-scylladb.d
- name: Release
if: startsWith(github.ref, 'refs/tags/')
uses: softprops/action-gh-release@v2
Expand All @@ -120,7 +129,6 @@ jobs:
target/release/grpc-scylladb*
- uses: actions/upload-artifact@v4
if: matrix.os == 'ubuntu-22.04'
with:
name: yellowstone-grpc-${{ github.sha }}-${{ matrix.os }}
path: |
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- geyser: use runtime instaed of unconstrained ([#331](https://github.com/rpcpool/yellowstone-grpc/pull/331))

### Breaking

## 2024-04-30
Expand Down
22 changes: 16 additions & 6 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use {
},
tokio::{
fs,
runtime::Builder,
sync::{broadcast, mpsc, Mutex, Notify, RwLock, Semaphore},
task::spawn_blocking,
time::{sleep, Duration, Instant},
},
tokio_stream::wrappers::ReceiverStream,
Expand Down Expand Up @@ -787,12 +789,20 @@ impl GrpcService {

// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
tokio::spawn(tokio::task::unconstrained(Self::geyser_loop(
messages_rx,
blocks_meta_tx,
broadcast_tx,
block_fail_action,
)));
spawn_blocking(move || {
Builder::new_multi_thread()
.thread_name_fn(crate::get_thread_name)
.worker_threads(4)
.enable_all()
.build()
.expect("Failed to create a new runtime for geyser loop")
.block_on(Self::geyser_loop(
messages_rx,
blocks_meta_tx,
broadcast_tx,
block_fail_action,
));
});

// Run Server
let shutdown = Arc::new(Notify::new());
Expand Down
8 changes: 8 additions & 0 deletions yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,11 @@ pub mod grpc;
pub mod plugin;
pub mod prom;
pub mod version;

pub fn get_thread_name() -> String {
use std::sync::atomic::{AtomicUsize, Ordering};

static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
format!("solGeyserGrpc{id:02}")
}
15 changes: 2 additions & 13 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ use {
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
std::{
concat, env,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
},
std::{concat, env, sync::Arc, time::Duration},
tokio::{
runtime::{Builder, Runtime},
sync::{mpsc, Notify},
Expand Down Expand Up @@ -68,11 +61,7 @@ impl GeyserPlugin for Plugin {

// Create inner
let runtime = Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
format!("solGeyserGrpc{id:02}")
})
.thread_name_fn(crate::get_thread_name)
.enable_all()
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Expand Down

0 comments on commit 6c21de5

Please sign in to comment.