Skip to content

Commit

Permalink
Reorganize inscription submission to batch requests together
Browse files Browse the repository at this point in the history
The inscription submission is currently linear and as a result cannot churn through
enough inscriptions at once.

This change attempts to batch the inscriptions, as many as it can up to the threshold,
currently 10, and then submits them in parallel using join_all.
  • Loading branch information
Ayiga committed Nov 14, 2024
1 parent acb93d8 commit 803fcb2
Showing 1 changed file with 102 additions and 58 deletions.
160 changes: 102 additions & 58 deletions inscriptions/src/service/client_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use alloy::{
sol_types::SolStruct,
};
use async_std::{
prelude::FutureExt,
sync::{RwLock, RwLockWriteGuard},
task::JoinHandle,
};
Expand All @@ -24,7 +25,7 @@ use governor::{
state::{InMemoryState, NotKeyed},
RateLimiter,
};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};
use surf_disco::Client;
use url::Url;

Expand Down Expand Up @@ -531,6 +532,8 @@ pub struct ProcessPutInscriptionToChainTask {
pub task_handle: Option<JoinHandle<()>>,
}

const INSCRIPTION_BATCH_SUBMISSION_SIZE: usize = 10;

impl ProcessPutInscriptionToChainTask {
pub fn new<S, Persistence>(
rate_limiter: RateLimiter<NotKeyed, InMemoryState, clock::DefaultClock>,
Expand Down Expand Up @@ -558,9 +561,31 @@ impl ProcessPutInscriptionToChainTask {
}
}

fn created_signed_inscription(
inscription: &EspressoInscription,
signer: &PrivateKeySigner,
) -> InscriptionAndSignatureFromService {
let signature_result =
signer.sign_hash_sync(&inscription.eip712_signing_hash(&ESPRESSO_EIP712_DOMAIN));
let signature = match signature_result {
Ok(signature) => signature,
Err(err) => {
tracing::error!("error signing inscription: {}", err);
panic!("Error signing inscription: {}", err);
}
};

// We create the new inscription and signature
InscriptionAndSignatureFromService {
inscription: inscription.clone(),
signature: HexSignature(signature),
}
}

async fn process_record_inscription<S, Persistence>(
rate_limiter: RateLimiter<NotKeyed, InMemoryState, clock::DefaultClock>,
data_state: Arc<RwLock<DataState<Persistence>>>,
// We're turning off this logging for now.
_data_state: Arc<RwLock<DataState<Persistence>>>,
inscription_namespace_id: NamespaceId,
signer: PrivateKeySigner,
mut stream: S,
Expand All @@ -570,79 +595,98 @@ impl ProcessPutInscriptionToChainTask {
Persistence: InscriptionPersistence,
{
let client = Client::<hotshot_query_service::Error, Version01>::new(base_url);
let mut should_exit = false;

loop {
if should_exit {
tracing::error!(
"inscription detail stream closed. shutting down client handling stream.",
);
return;
}

// Wait for the Rate Limiter
tracing::debug!("waiting for rate limiter");
rate_limiter.until_ready().await;
tracing::debug!("ready to submit next inscription");

let Some(inscription) = stream.next().await else {
tracing::error!(
"inscription detail stream closed. shutting down client handling stream.",
);
panic!("Inscription detail stream closed, unable to process new requests from clients.");
};
tracing::debug!("have inscription to submit to mempool");

// Sign the message using the server key
// We're going to try and submit this in batches of 10 transactions
// at a time.

let signature_result =
signer.sign_hash_sync(&inscription.eip712_signing_hash(&ESPRESSO_EIP712_DOMAIN));
let signature = match signature_result {
Ok(signature) => signature,
Err(err) => {
tracing::error!("error signing inscription: {}", err);
panic!("Error signing inscription: {}", err);
}
};
let mut pending_inscriptions = Vec::new();

// We create the new inscription and signature
let inscription_and_signature = InscriptionAndSignatureFromService {
inscription,
signature: HexSignature(signature),
};
// Let's wait for the first inscription to come in, then once it
// does, we'll attempt to eagerly grab as many as we can up to the
// INSCRIPTION_BATCH_SUBMISSION_SIZE

// Next we encode the object.
let serialize_result = bincode::serialize(&inscription_and_signature);

let serialized = match serialize_result {
Ok(serialized) => {
{
let read_lock = data_state.read_arc().await;
if let Err(err) = read_lock
.persistence()
.record_submit_put_inscription(&inscription_and_signature.inscription)
.await
{
tracing::error!("error recording inscription and signature: {:?}", err);
}
}
// We have the serialized object, we can now send it to the
// next stage.
serialized
}
Err(err) => {
tracing::error!("error serializing inscription and signature: {}", err);
panic!("Error serializing inscription and signature: {}", err);
let inscription = match stream.next().await {
Some(inscription) => inscription,
None => {
should_exit = true;
continue;
}
};

let transaction = Transaction::new(inscription_namespace_id, serialized);
pending_inscriptions.push(Self::created_signed_inscription(&inscription, &signer));

let send_result = client
.post::<()>("submit")
.body_binary(&transaction)
.unwrap()
.send()
.await;
for _ in 1..INSCRIPTION_BATCH_SUBMISSION_SIZE {
let inscription = match stream.next().timeout(Duration::from_millis(20)).await {
Err(_) => {
// This is a timeout, we're going to break out of the
//loop
break;
}
Ok(None) => {
// This is the end of the stream, we're going to break
// out of the loop.
should_exit = true;
break;
}
Ok(Some(inscription)) => inscription,
};

if let Err(err) = send_result {
tracing::error!("error sending inscription to service: {}", err);
panic!("Error sending inscription to service: {}", err);
tracing::debug!("have inscription to submit to mempool");
pending_inscriptions.push(Self::created_signed_inscription(&inscription, &signer));
}

tracing::debug!("successfully submitted inscription to mempool");
let pending_inscription_count = pending_inscriptions.len();
futures::future::join_all(pending_inscriptions.iter().map(
|inscription_and_signature| {
let client = client.clone();
// let inscription_and_signature = inscription_and_signature.clone();
async move {
let serialized = match bincode::serialize(inscription_and_signature) {
Ok(serialized) => serialized,
Err(err) => {
tracing::error!(
"error serializing inscription for the server: {}",
err
);
return;
}
};

let transaction = Transaction::new(inscription_namespace_id, serialized);

let send_result = client
.post::<()>("submit")
.body_binary(&transaction)
.unwrap()
.send()
.await;

if let Err(err) = send_result {
tracing::error!("error sending inscription to service: {}", err);
}
}
},
))
.await;

tracing::debug!(
"successfully submitted {} inscriptions to mempool",
pending_inscription_count
);
}
}
}
Expand Down

0 comments on commit 803fcb2

Please sign in to comment.