Skip to content

Commit

Permalink
Merge pull request #913 from tursodatabase/max-conccurent-dbs
Browse files Browse the repository at this point in the history
Make connection concurrency configurable
  • Loading branch information
MarinPostma authored Jan 18, 2024
2 parents 25359fb + 797be22 commit e886b8a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ pub trait MakeConnection: Send + Sync + 'static {

fn throttled(
self,
conccurency: usize,
semaphore: Arc<Semaphore>,
timeout: Option<Duration>,
max_total_response_size: u64,
) -> MakeThrottledConnection<Self>
where
Self: Sized,
{
MakeThrottledConnection::new(conccurency, self, timeout, max_total_response_size)
MakeThrottledConnection::new(semaphore, self, timeout, max_total_response_size)
}
}

Expand Down Expand Up @@ -194,13 +194,13 @@ pub struct MakeThrottledConnection<F> {

impl<F> MakeThrottledConnection<F> {
fn new(
conccurency: usize,
semaphore: Arc<Semaphore>,
connection_maker: F,
timeout: Option<Duration>,
max_total_response_size: u64,
) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(conccurency)),
semaphore,
connection_maker,
timeout,
max_total_response_size,
Expand Down Expand Up @@ -419,8 +419,11 @@ pub mod test {

#[tokio::test]
async fn throttle_db_creation() {
let factory =
(|| async { Ok(DummyDb) }).throttled(10, Some(Duration::from_millis(100)), u64::MAX);
let factory = (|| async { Ok(DummyDb) }).throttled(
Arc::new(Semaphore::new(10)),
Some(Duration::from_millis(100)),
u64::MAX,
);

let mut conns = Vec::with_capacity(10);
for _ in 0..10 {
Expand Down
11 changes: 9 additions & 2 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use net::Connector;
use once_cell::sync::Lazy;
use replication::NamespacedSnapshotCallback;
use tokio::runtime::Runtime;
use tokio::sync::{mpsc, Notify};
use tokio::sync::{mpsc, Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Duration;
use url::Url;
Expand Down Expand Up @@ -72,7 +72,6 @@ mod stats;
mod test;
mod utils;

const MAX_CONCURRENT_DBS: usize = 128;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
const LIBSQL_PAGE_SIZE: u64 = 4096;
Expand Down Expand Up @@ -103,6 +102,7 @@ pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpCo
pub shutdown: Arc<Notify>,
pub max_active_namespaces: usize,
pub meta_store_config: Option<MetaStoreConfig>,
pub max_concurrent_connections: usize,
}

impl<C, A, D> Default for Server<C, A, D> {
Expand All @@ -122,6 +122,7 @@ impl<C, A, D> Default for Server<C, A, D> {
shutdown: Default::default(),
max_active_namespaces: 100,
meta_store_config: None,
max_concurrent_connections: 128,
}
}
}
Expand Down Expand Up @@ -393,6 +394,7 @@ where
disable_namespaces: self.disable_namespaces,
max_active_namespaces: self.max_active_namespaces,
meta_store_config: self.meta_store_config.take(),
max_concurrent_connections: self.max_concurrent_connections,
};
let (namespaces, proxy_service, replication_service) = replica.configure().await?;
self.rpc_client_config = None;
Expand Down Expand Up @@ -435,6 +437,7 @@ where
join_set: &mut join_set,
auth: auth.clone(),
meta_store_config: self.meta_store_config.take(),
max_concurrent_connections: self.max_concurrent_connections,
};
let (namespaces, proxy_service, replication_service) = primary.configure().await?;
self.rpc_server_config = None;
Expand Down Expand Up @@ -503,6 +506,7 @@ struct Primary<'a, A> {
auth: Arc<Auth>,
join_set: &'a mut JoinSet<anyhow::Result<()>>,
meta_store_config: Option<MetaStoreConfig>,
max_concurrent_connections: usize,
}

impl<A> Primary<'_, A>
Expand Down Expand Up @@ -530,6 +534,7 @@ where
checkpoint_interval: self.db_config.checkpoint_interval,
disable_namespace: self.disable_namespaces,
encryption_key: self.db_config.encryption_key.clone(),
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
};

let factory = PrimaryNamespaceMaker::new(conf);
Expand Down Expand Up @@ -619,6 +624,7 @@ struct Replica<C> {
disable_namespaces: bool,
max_active_namespaces: usize,
meta_store_config: Option<MetaStoreConfig>,
max_concurrent_connections: usize,
}

impl<C: Connector> Replica<C> {
Expand All @@ -640,6 +646,7 @@ impl<C: Connector> Replica<C> {
max_response_size: self.db_config.max_response_size,
max_total_response_size: self.db_config.max_total_response_size,
encryption_key: self.db_config.encryption_key.clone(),
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
};

let factory = ReplicaNamespaceMaker::new(conf);
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ struct Cli {
/// encryption_key for encryption at rest
#[clap(long, env = "SQLD_ENCRYPTION_KEY")]
encryption_key: Option<bytes::Bytes>,

#[clap(long, default_value = "128")]
max_concurrent_connections: usize,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -590,6 +593,7 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
shutdown,
max_active_namespaces: config.max_active_namespaces,
meta_store_config,
max_concurrent_connections: config.max_concurrent_connections,
})
}

Expand Down
9 changes: 5 additions & 4 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use rusqlite::ErrorCode;
use serde::de::Visitor;
use serde::Deserialize;
use tokio::io::AsyncBufReadExt;
use tokio::sync::watch;
use tokio::sync::{watch, Semaphore};
use tokio::task::JoinSet;
use tokio::time::{Duration, Instant};
use tokio_util::io::StreamReader;
Expand All @@ -46,7 +46,6 @@ use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger}
use crate::stats::Stats;
use crate::{
run_periodic_checkpoint, StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT,
MAX_CONCURRENT_DBS,
};

use crate::namespace::fork::PointInTimeRestore;
Expand Down Expand Up @@ -839,6 +838,7 @@ pub struct ReplicaNamespaceConfig {
/// Stats monitor
pub stats_sender: StatsSender,
pub encryption_key: Option<bytes::Bytes>,
pub max_concurrent_connections: Arc<Semaphore>,
}

impl Namespace<ReplicaDatabase> {
Expand Down Expand Up @@ -958,7 +958,7 @@ impl Namespace<ReplicaDatabase> {
)
.await?
.throttled(
MAX_CONCURRENT_DBS,
config.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
config.max_total_response_size,
);
Expand Down Expand Up @@ -989,6 +989,7 @@ pub struct PrimaryNamespaceConfig {
pub checkpoint_interval: Option<Duration>,
pub disable_namespace: bool,
pub encryption_key: Option<bytes::Bytes>,
pub max_concurrent_connections: Arc<Semaphore>,
}

pub type DumpStream =
Expand Down Expand Up @@ -1137,7 +1138,7 @@ impl Namespace<PrimaryDatabase> {
)
.await?
.throttled(
MAX_CONCURRENT_DBS,
config.max_concurrent_connections.clone(),
Some(DB_CREATE_TIMEOUT),
config.max_total_response_size,
)
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/test/bottomless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async fn configure_server(
rpc_client_config: None,
shutdown: Default::default(),
meta_store_config: None,
max_concurrent_connections: 128,
}
}

Expand Down

0 comments on commit e886b8a

Please sign in to comment.