diff --git a/Cargo.lock b/Cargo.lock index a20f9e560f..b260d1b611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2826,7 +2826,7 @@ dependencies = [ [[package]] name = "libsql-sqlite3-parser" -version = "0.11.0" +version = "0.11.1" dependencies = [ "bitflags 2.4.1", "cc", diff --git a/libsql-server/src/connection/mod.rs b/libsql-server/src/connection/mod.rs index 8dc677b4e2..5a2fe691fb 100644 --- a/libsql-server/src/connection/mod.rs +++ b/libsql-server/src/connection/mod.rs @@ -156,14 +156,14 @@ pub trait MakeConnection: Send + Sync + 'static { fn throttled( self, - conccurency: usize, + semaphore: Arc, timeout: Option, max_total_response_size: u64, ) -> MakeThrottledConnection where Self: Sized, { - MakeThrottledConnection::new(conccurency, self, timeout, max_total_response_size) + MakeThrottledConnection::new(semaphore, self, timeout, max_total_response_size) } } @@ -194,13 +194,13 @@ pub struct MakeThrottledConnection { impl MakeThrottledConnection { fn new( - conccurency: usize, + semaphore: Arc, connection_maker: F, timeout: Option, max_total_response_size: u64, ) -> Self { Self { - semaphore: Arc::new(Semaphore::new(conccurency)), + semaphore, connection_maker, timeout, max_total_response_size, @@ -419,8 +419,11 @@ pub mod test { #[tokio::test] async fn throttle_db_creation() { - let factory = - (|| async { Ok(DummyDb) }).throttled(10, Some(Duration::from_millis(100)), u64::MAX); + let factory = (|| async { Ok(DummyDb) }).throttled( + Arc::new(Semaphore::new(10)), + Some(Duration::from_millis(100)), + u64::MAX, + ); let mut conns = Vec::with_capacity(10); for _ in 0..10 { diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index a0f3a48596..92dfa801ba 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -36,7 +36,7 @@ use net::Connector; use once_cell::sync::Lazy; use replication::NamespacedSnapshotCallback; use tokio::runtime::Runtime; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::{mpsc, Notify, Semaphore}; use tokio::task::JoinSet; use tokio::time::Duration; use url::Url; @@ -72,7 +72,6 @@ mod stats; mod test; mod utils; -const MAX_CONCURRENT_DBS: usize = 128; const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1); const DEFAULT_AUTO_CHECKPOINT: u32 = 1000; const LIBSQL_PAGE_SIZE: u64 = 4096; @@ -103,6 +102,7 @@ pub struct Server, pub max_active_namespaces: usize, pub meta_store_config: Option, + pub max_concurrent_connections: usize, } impl Default for Server { @@ -122,6 +122,7 @@ impl Default for Server { shutdown: Default::default(), max_active_namespaces: 100, meta_store_config: None, + max_concurrent_connections: 128, } } } @@ -393,6 +394,7 @@ where disable_namespaces: self.disable_namespaces, max_active_namespaces: self.max_active_namespaces, meta_store_config: self.meta_store_config.take(), + max_concurrent_connections: self.max_concurrent_connections, }; let (namespaces, proxy_service, replication_service) = replica.configure().await?; self.rpc_client_config = None; @@ -435,6 +437,7 @@ where join_set: &mut join_set, auth: auth.clone(), meta_store_config: self.meta_store_config.take(), + max_concurrent_connections: self.max_concurrent_connections, }; let (namespaces, proxy_service, replication_service) = primary.configure().await?; self.rpc_server_config = None; @@ -503,6 +506,7 @@ struct Primary<'a, A> { auth: Arc, join_set: &'a mut JoinSet>, meta_store_config: Option, + max_concurrent_connections: usize, } impl Primary<'_, A> @@ -530,6 +534,7 @@ where checkpoint_interval: self.db_config.checkpoint_interval, disable_namespace: self.disable_namespaces, encryption_key: self.db_config.encryption_key.clone(), + max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)), }; let factory = PrimaryNamespaceMaker::new(conf); @@ -619,6 +624,7 @@ struct Replica { disable_namespaces: bool, max_active_namespaces: usize, meta_store_config: Option, + max_concurrent_connections: usize, } impl Replica { @@ -640,6 +646,7 @@ impl Replica { max_response_size: self.db_config.max_response_size, max_total_response_size: self.db_config.max_total_response_size, encryption_key: self.db_config.encryption_key.clone(), + max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)), }; let factory = ReplicaNamespaceMaker::new(conf); diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 7c8ce4b3e5..e746dce56b 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -227,6 +227,9 @@ struct Cli { /// encryption_key for encryption at rest #[clap(long, env = "SQLD_ENCRYPTION_KEY")] encryption_key: Option, + + #[clap(long, default_value = "128")] + max_concurrent_connections: usize, } #[derive(clap::Subcommand, Debug)] @@ -590,6 +593,7 @@ async fn build_server(config: &Cli) -> anyhow::Result { shutdown, max_active_namespaces: config.max_active_namespaces, meta_store_config, + max_concurrent_connections: config.max_concurrent_connections, }) } diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 2d55359d7d..1a14fef1cf 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -24,7 +24,7 @@ use rusqlite::ErrorCode; use serde::de::Visitor; use serde::Deserialize; use tokio::io::AsyncBufReadExt; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant}; use tokio_util::io::StreamReader; @@ -46,7 +46,6 @@ use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger} use crate::stats::Stats; use crate::{ run_periodic_checkpoint, StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT, - MAX_CONCURRENT_DBS, }; use crate::namespace::fork::PointInTimeRestore; @@ -839,6 +838,7 @@ pub struct ReplicaNamespaceConfig { /// Stats monitor pub stats_sender: StatsSender, pub encryption_key: Option, + pub max_concurrent_connections: Arc, } impl Namespace { @@ -958,7 +958,7 @@ impl Namespace { ) .await? .throttled( - MAX_CONCURRENT_DBS, + config.max_concurrent_connections.clone(), Some(DB_CREATE_TIMEOUT), config.max_total_response_size, ); @@ -989,6 +989,7 @@ pub struct PrimaryNamespaceConfig { pub checkpoint_interval: Option, pub disable_namespace: bool, pub encryption_key: Option, + pub max_concurrent_connections: Arc, } pub type DumpStream = @@ -1137,7 +1138,7 @@ impl Namespace { ) .await? .throttled( - MAX_CONCURRENT_DBS, + config.max_concurrent_connections.clone(), Some(DB_CREATE_TIMEOUT), config.max_total_response_size, ) diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index d9f2d9573f..c09f93f937 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -114,6 +114,7 @@ async fn configure_server( rpc_client_config: None, shutdown: Default::default(), meta_store_config: None, + max_concurrent_connections: 128, } }