diff --git a/Cargo.lock b/Cargo.lock index 02450709d1..f6f2b32ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", + "r2d2", "reqwest", "serde", "serde_json", @@ -1650,6 +1651,7 @@ dependencies = [ "diesel_derives", "itoa", "pq-sys", + "r2d2", "serde_json", ] @@ -4153,6 +4155,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.7.3" @@ -4866,6 +4879,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scopeguard" version = "1.1.0" diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index d3c62d74d2..64cb67d386 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -24,7 +24,8 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true -diesel = { version = "2.1.4", features = ["serde_json", "postgres"] } +diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] } +r2d2 = { version = "0.8.10" } utils = { path = "../../libs/utils/" } metrics = { path = "../../libs/metrics/" } diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index 574441c409..924bc24d76 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::str::FromStr; +use std::time::Duration; use camino::Utf8Path; use camino::Utf8PathBuf; @@ -44,7 +45,7 @@ use crate::PlacementPolicy; /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - database_url: String, + connection_pool: diesel::r2d2::Pool>, // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward @@ -64,6 +65,8 @@ pub(crate) enum DatabaseError { Query(#[from] diesel::result::Error), #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), + #[error(transparent)] + ConnectionPool(#[from] r2d2::Error), #[error("Logical error: {0}")] Logical(String), } @@ -71,9 +74,31 @@ pub(crate) enum DatabaseError { pub(crate) type DatabaseResult = Result; impl Persistence { + // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under + // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect. + const MAX_CONNECTIONS: u32 = 99; + + // We don't want to keep a lot of connections alive: close them down promptly if they aren't being used. + const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); + const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); + pub fn new(database_url: String, json_path: Option) -> Self { + let manager = diesel::r2d2::ConnectionManager::::new(database_url); + + // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time + // to execute queries (database queries are not generally on latency-sensitive paths). + let connection_pool = diesel::r2d2::Pool::builder() + .max_size(Self::MAX_CONNECTIONS) + .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) + .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) + // Always keep at least one connection ready to go + .min_idle(Some(1)) + .test_on_check_out(true) + .build(manager) + .expect("Could not build connection pool"); + Self { - database_url, + connection_pool, json_path, } } @@ -84,14 +109,10 @@ impl Persistence { F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, R: Send + 'static, { - let database_url = self.database_url.clone(); - tokio::task::spawn_blocking(move || -> DatabaseResult { - // TODO: connection pooling, such as via diesel::r2d2 - let mut conn = PgConnection::establish(&database_url)?; - func(&mut conn) - }) - .await - .expect("Task panic") + let mut conn = self.connection_pool.get()?; + tokio::task::spawn_blocking(move || -> DatabaseResult { func(&mut conn) }) + .await + .expect("Task panic") } /// When a node is first registered, persist it before using it for anything diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index f3957191c9..6941c71c2d 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -117,7 +117,9 @@ impl From for ApiError { match err { DatabaseError::Query(e) => ApiError::InternalServerError(e.into()), // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503. - DatabaseError::Connection(_e) => ApiError::ShuttingDown, + DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => { + ApiError::ShuttingDown + } DatabaseError::Logical(reason) => { ApiError::InternalServerError(anyhow::anyhow!(reason)) }