diff --git a/Cargo.toml b/Cargo.toml index d3006985ab..57dc8e0ad5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ license = "Apache-2.0" [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" +async-channel = "2.1.1" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } azure_core = "0.18" azure_identity = "0.18" diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 706b7a3187..4658e81399 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] arc-swap.workspace = true +async-channel.workspace = true sentry.workspace = true async-trait.workspace = true anyhow.workspace = true diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 890061dc59..09d6cfcf03 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -87,6 +87,8 @@ pub mod failpoint_support; pub mod yielding_loop; +pub mod pre_spawned_pool; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/pre_spawned_pool.rs b/libs/utils/src/pre_spawned_pool.rs new file mode 100644 index 0000000000..47180e2c55 --- /dev/null +++ b/libs/utils/src/pre_spawned_pool.rs @@ -0,0 +1,138 @@ +use std::{ + collections::VecDeque, + num::NonZeroUsize, + sync::{Arc, RwLock}, + thread::JoinHandle, +}; + +use tokio::sync::{mpsc, OwnedSemaphorePermit}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, instrument}; + +pub struct Client { + cmds_tx: mpsc::UnboundedSender, + items_rx: async_channel::Receiver>, +} + +pub trait Launcher { + fn create(&self) -> anyhow::Result; +} + +#[derive(Debug)] +enum Command { + SetSlotCount(usize), +} + +enum GetError { + ShuttingDown, +} + +impl Client { + pub async fn get(&self) -> Result { + self.items_rx + .recv() + .await + .map_err(|_| GetError::ShuttingDown) + .map(|CreatedItem { permit, item }| { + drop(permit); // allow a new one to be pre-spanwed + item + }) + } + + pub fn set_slot_count_nowait(&self, count: usize) { + self.cmds_tx.send(Command::SetSlotCount(count)); + } +} + +pub struct Pool +where + T: Send + 'static, + L: Send + Launcher + 'static, +{ + launcher: L, + cmds_rx: mpsc::UnboundedReceiver, + items_tx: async_channel::Sender>, +} + +struct CreatedItem { + permit: OwnedSemaphorePermit, + item: T, +} + +impl Pool +where + T: Send + 'static, + L: Send + Launcher + 'static, +{ + pub async fn launch(launcher: L) -> Client { + let (cmds_tx, cmds_rx) = mpsc::unbounded_channel(); // callers are limited to mgmt api + let (items_tx, items_rx) = async_channel::unbounded(); // task() limits pending items itself + + // task gets cancelled by dropping the last Client + tokio::spawn( + Self { + launcher, + cmds_rx, + items_tx, + } + .task(), + ); + Client { cmds_tx, items_rx } + } + + #[instrument(skip_all)] + async fn task(mut self) { + let initial = 0; + let mut configured = initial; + let mut pending_items = Arc::new(tokio::sync::Semaphore::new(initial)); + let mut need_forget = 0; + loop { + debug!( + configured, + need_forget, + available = pending_items.available_permits(), + "iteration" + ); + let cmd = tokio::select! { + res = self.cmds_rx.recv() => { + match res { + Some(cmd) => cmd, + None => return, // dropping tx acts as cancellation + } + } + permit = Arc::clone(&pending_items).acquire_owned() => { + let permit = permit.expect("we never close this semaphore"); + if need_forget > 0 { + debug!("fogetting permit to reduce semaphore count"); + need_forget -= 1; + permit.forget(); + continue; + } + debug!("creating item"); + let item = match self.launcher.create() { + Ok(item) => item, + Err(e) => todo!(), + }; + match self.items_tx.send(CreatedItem { permit, item }).await { + Ok(()) => continue, + Err(_) => { + debug!("stopping, client has gone away"); + return; + } + } + } + }; + debug!(?cmd, "received command"); + match cmd { + Command::SetSlotCount(desired) => { + if desired > configured { + pending_items.add_permits(desired - configured); + } else if desired < configured { + need_forget += configured - desired; + } + configured = desired; + } + } + } + } +} diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 58af80238d..102db60483 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -614,10 +614,7 @@ impl Tenant { mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { - let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( - conf, - tenant_shard_id, - ))); + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(conf))); let TenantSharedResources { broker_client, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 773e5fc051..0a076a8a41 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -21,6 +21,8 @@ /// Process lifecycle and abstracction for the IPC protocol. mod process; +mod process_pool; + /// Code to apply [`NeonWalRecord`]s. mod apply_neon; @@ -146,6 +148,7 @@ impl PostgresRedoManager { pub fn new( conf: &'static PageServerConf, tenant_shard_id: TenantShardId, + pool: process_pool::Pool, ) -> PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 72f4151dca..f03ec7d661 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -52,12 +52,8 @@ impl WalRedoProcess { // // Start postgres binary in special WAL redo mode. // - #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))] - pub(crate) fn launch( - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - pg_version: u32, - ) -> anyhow::Result { + #[instrument(skip_all,fields(pg_version=pg_version))] + pub(crate) fn launch(conf: &'static PageServerConf, pg_version: u32) -> anyhow::Result { let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; @@ -66,9 +62,6 @@ impl WalRedoProcess { let child = Command::new(pg_bin_dir_path.join("postgres")) // the first arg must be --wal-redo so the child process enters into walredo mode .arg("--wal-redo") - // the child doesn't process this arg, but, having it in the argv helps indentify the - // walredo process for a particular tenant when debugging a pagserver - .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) .stdin(Stdio::piped()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) @@ -83,7 +76,7 @@ impl WalRedoProcess { // the files it opens, and // 2. to use seccomp to sandbox itself before processing the first // walredo request. - .spawn_no_leak_child(tenant_shard_id) + .spawn_no_leak_child() .context("spawn process")?; WAL_REDO_PROCESS_COUNTERS.started.inc(); let mut child = scopeguard::guard(child, |child| { @@ -144,12 +137,11 @@ impl WalRedoProcess { error!(error=?e, "failed to read from walredo stderr"); } } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) + }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), %pg_version)) ); Ok(Self { conf, - tenant_shard_id, child: Some(child), stdin: Mutex::new(ProcessInput { stdin, @@ -175,7 +167,7 @@ impl WalRedoProcess { // Apply given WAL records ('records') over an old page image. Returns // new page image. // - #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] + #[instrument(skip_all, fields(pid=%self.id()))] pub(crate) fn apply_wal_records( &self, rel: RelTag, diff --git a/pageserver/src/walredo/process/no_leak_child.rs b/pageserver/src/walredo/process/no_leak_child.rs index ca016408e6..1d9b768c51 100644 --- a/pageserver/src/walredo/process/no_leak_child.rs +++ b/pageserver/src/walredo/process/no_leak_child.rs @@ -20,7 +20,6 @@ use pageserver_api::shard::TenantShardId; /// Wrapper type around `std::process::Child` which guarantees that the child /// will be killed and waited-for by this process before being dropped. pub(crate) struct NoLeakChild { - pub(crate) tenant_id: TenantShardId, pub(crate) child: Option, } @@ -39,12 +38,9 @@ impl DerefMut for NoLeakChild { } impl NoLeakChild { - pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result { + pub(crate) fn spawn(command: &mut Command) -> io::Result { let child = command.spawn()?; - Ok(NoLeakChild { - tenant_id, - child: Some(child), - }) + Ok(NoLeakChild { child: Some(child) }) } pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) { diff --git a/pageserver/src/walredo/process_pool.rs b/pageserver/src/walredo/process_pool.rs new file mode 100644 index 0000000000..58ab81f689 --- /dev/null +++ b/pageserver/src/walredo/process_pool.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +use utils::pre_spawned_pool; + +use crate::config::PageServerConf; + +use super::process::WalRedoProcess; + +pub struct Pool { + v14: pre_spawned_pool::Pool, Launcher>, + v15: pre_spawned_pool::Pool, Launcher>, + v16: pre_spawned_pool::Pool, Launcher>, +} + +struct Launcher { + pg_version: u32, + conf: &'static PageServerConf, +} + +impl utils::pre_spawned_pool::Launcher> for Launcher{ + fn create(&self) -> anyhow::Result> { + WalRedoProcess::launch(self.conf, self.pg_version) + } +} + +impl Pool { + pub fn get(&self, pg_version: usize) -> anyhow::Result> { + let pool = match pg_version { + 14 => &self.v14, + 15 => &self.v15, + 16 => &self.v16, + x => anyhow::bail!("unknown pg version: {x}"), + }; + pool.get() + } +}