diff --git a/Cargo.lock b/Cargo.lock index ea5a29a142..4e734b3f8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-channel" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +dependencies = [ + "concurrent-queue", + "event-listener 4.0.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.5" @@ -2355,7 +2368,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" dependencies = [ "anyhow", - "async-channel", + "async-channel 1.9.0", "base64 0.13.1", "futures-lite", "infer", @@ -6241,6 +6254,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arc-swap", + "async-channel 2.1.1", "async-trait", "bincode", "byteorder", diff --git a/libs/utils/src/pre_spawned_pool.rs b/libs/utils/src/pre_spawned_pool.rs index 47180e2c55..fda98c7f4e 100644 --- a/libs/utils/src/pre_spawned_pool.rs +++ b/libs/utils/src/pre_spawned_pool.rs @@ -23,7 +23,7 @@ enum Command { SetSlotCount(usize), } -enum GetError { +pub enum GetError { ShuttingDown, } diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index f03ec7d661..dec6f13e10 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -27,7 +27,6 @@ mod protocol; pub struct WalRedoProcess { #[allow(dead_code)] conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, // Some() on construction, only becomes None on Drop. child: Option, stdout: Mutex, diff --git a/pageserver/src/walredo/process/no_leak_child.rs b/pageserver/src/walredo/process/no_leak_child.rs index 1d9b768c51..5973d98c84 100644 --- a/pageserver/src/walredo/process/no_leak_child.rs +++ b/pageserver/src/walredo/process/no_leak_child.rs @@ -90,7 +90,6 @@ impl Drop for NoLeakChild { Some(child) => child, None => return, }; - let tenant_shard_id = self.tenant_id; // Offload the kill+wait of the child process into the background. // If someone stops the runtime, we'll leak the child process. // We can ignore that case because we only stop the runtime on pageserver exit. @@ -98,11 +97,7 @@ impl Drop for NoLeakChild { tokio::task::spawn_blocking(move || { // Intentionally don't inherit the tracing context from whoever is dropping us. // This thread here is going to outlive of our dropper. - let span = tracing::info_span!( - "walredo", - tenant_id = %tenant_shard_id.tenant_id, - shard_id = %tenant_shard_id.shard_slug() - ); + let span = tracing::info_span!("walredo"); let _entered = span.enter(); Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop); }) @@ -112,11 +107,11 @@ impl Drop for NoLeakChild { } pub(crate) trait NoLeakChildCommandExt { - fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result; + fn spawn_no_leak_child(&mut self) -> io::Result; } impl NoLeakChildCommandExt for Command { - fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result { - NoLeakChild::spawn(tenant_id, self) + fn spawn_no_leak_child(&mut self) -> io::Result { + NoLeakChild::spawn(self) } } diff --git a/pageserver/src/walredo/process_pool.rs b/pageserver/src/walredo/process_pool.rs index 58ab81f689..4712fe3657 100644 --- a/pageserver/src/walredo/process_pool.rs +++ b/pageserver/src/walredo/process_pool.rs @@ -7,9 +7,9 @@ use crate::config::PageServerConf; use super::process::WalRedoProcess; pub struct Pool { - v14: pre_spawned_pool::Pool, Launcher>, - v15: pre_spawned_pool::Pool, Launcher>, - v16: pre_spawned_pool::Pool, Launcher>, + v14: pre_spawned_pool::Client>, + v15: pre_spawned_pool::Client>, + v16: pre_spawned_pool::Client>, } struct Launcher { @@ -17,14 +17,39 @@ struct Launcher { conf: &'static PageServerConf, } -impl utils::pre_spawned_pool::Launcher> for Launcher{ +impl utils::pre_spawned_pool::Launcher> for Launcher { fn create(&self) -> anyhow::Result> { - WalRedoProcess::launch(self.conf, self.pg_version) + Ok(Arc::new(WalRedoProcess::launch( + self.conf, + self.pg_version, + )?)) } } impl Pool { - pub fn get(&self, pg_version: usize) -> anyhow::Result> { + pub async fn launch(conf: &'static PageServerConf) -> Self { + Self { + v14: pre_spawned_pool::Pool::launch(Launcher { + pg_version: 14, + conf, + }) + .await, + v15: pre_spawned_pool::Pool::launch(Launcher { + pg_version: 15, + conf, + }) + .await, + v16: pre_spawned_pool::Pool::launch(Launcher { + pg_version: 16, + conf, + }) + .await, + } + } + pub fn get( + &self, + pg_version: usize, + ) -> Result, pre_spawned_pool::GetError> { let pool = match pg_version { 14 => &self.v14, 15 => &self.v15,