diff --git a/libs/utils/src/pre_spawned_pool.rs b/libs/utils/src/pre_spawned_pool.rs index fda98c7f4e..2fbe4e1a0a 100644 --- a/libs/utils/src/pre_spawned_pool.rs +++ b/libs/utils/src/pre_spawned_pool.rs @@ -23,7 +23,9 @@ enum Command { SetSlotCount(usize), } +#[derive(thiserror::Error, Debug)] pub enum GetError { + #[error("shutting down")] ShuttingDown, } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index eaddcb4607..127bf5366d 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -309,6 +309,9 @@ fn start_pageserver( info!("Starting pageserver pg protocol handler on {pg_addr}"); let pageserver_listener = tcp_listener::bind(pg_addr)?; + let walredo_process_pool = + Arc::new(COMPUTE_REQUEST_RUNTIME.block_on(pageserver::walredo::ProcessPool::launch(conf))); + // Launch broker client // The storage_broker::connect call needs to happen inside a tokio runtime thread. let broker_client = WALRECEIVER_RUNTIME @@ -414,6 +417,7 @@ fn start_pageserver( let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, TenantSharedResources { + walredo_process_pool: walredo_process_pool.clone(), broker_client: broker_client.clone(), remote_storage: remote_storage.clone(), deletion_queue_client, @@ -545,6 +549,7 @@ fn start_pageserver( disk_usage_eviction_state, deletion_queue.new_client(), secondary_controller, + walredo_process_pool, ) .context("Failed to initialize router state")?, ); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 57ee746726..a85a3a200c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -55,7 +55,9 @@ use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::timeline::CompactFlags; use crate::tenant::timeline::Timeline; use crate::tenant::SpawnMode; +use crate::tenant::TenantSharedResources; use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; +use crate::walredo; use crate::{config::PageServerConf, tenant::mgr}; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ @@ -97,6 +99,7 @@ pub struct State { disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, + walredo_process_pool: Arc, } impl State { @@ -110,6 +113,7 @@ impl State { disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, + walredo_process_pool: Arc, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"] .iter() @@ -125,8 +129,18 @@ impl State { disk_usage_eviction_state, deletion_queue_client, secondary_controller, + walredo_process_pool, }) } + + pub(crate) fn tenant_shared_resources(&self) -> TenantSharedResources { + TenantSharedResources { + walredo_process_pool: Arc::clone(&self.walredo_process_pool), + broker_client: self.broker_client.clone(), + remote_storage: self.remote_storage.clone(), + deletion_queue_client: self.deletion_queue_client.clone(), + } + } } #[inline(always)] @@ -890,9 +904,7 @@ async fn tenant_load_handler( state.conf, tenant_id, generation, - state.broker_client.clone(), - state.remote_storage.clone(), - state.deletion_queue_client.clone(), + state.tenant_shared_resources(), &ctx, ) .instrument(info_span!("load", %tenant_id)) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 489ec58e62..61c93fa078 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1652,7 +1652,7 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy = Lazy::new(|| { }); #[rustfmt::skip] -pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy = Lazy::new(|| { +pub(crate) static WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wal_redo_process_launch_duration", "Histogram of the duration of successful WalRedoProcess::launch calls", @@ -2452,7 +2452,7 @@ pub fn preinitialize_metrics() { &WAL_REDO_TIME, &WAL_REDO_RECORDS_HISTOGRAM, &WAL_REDO_BYTES_HISTOGRAM, - &WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, + &WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO, ] .into_iter() .for_each(|h| { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f357b4fe69..d2b9014389 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -187,7 +187,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted"; /// as the shared remote storage client and process initialization state. #[derive(Clone)] pub struct TenantSharedResources { - pub walredo_process_pool: Arc, + pub walredo_process_pool: Arc, pub broker_client: storage_broker::BrokerClientChannel, pub remote_storage: Option, pub deletion_queue_client: DeletionQueueClient, @@ -620,12 +620,12 @@ impl Tenant { broker_client, remote_storage, deletion_queue_client, - } = resources; + } = resources ; let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( conf, tenant_shard_id, - resources.walredo_process_pool, + walredo_process_pool, ))); let tenant = Arc::new(Tenant::new( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 64fd709386..1756cbb35d 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1668,9 +1668,7 @@ pub(crate) async fn load_tenant( conf: &'static PageServerConf, tenant_id: TenantId, generation: Generation, - broker_client: storage_broker::BrokerClientChannel, - remote_storage: Option, - deletion_queue_client: DeletionQueueClient, + resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { // This is a legacy API (replaced by `/location_conf`). It does not support sharding @@ -1689,12 +1687,6 @@ pub(crate) async fn load_tenant( })?; } - let resources = TenantSharedResources { - broker_client, - remote_storage, - deletion_queue_client, - }; - let mut location_conf = Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?; location_conf.attach_in_generation(generation); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4ce00f22e3..0e300e579f 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,14 +22,15 @@ mod process; mod process_pool; -pub(crate) use process_pool::Pool as ProcessPool; +pub use process_pool::Pool as ProcessPool; +use utils::sync::heavier_once_cell; /// Code to apply [`NeonWalRecord`]s. mod apply_neon; use crate::config::PageServerConf; use crate::metrics::{ - WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, + WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME, }; use crate::repository::Key; @@ -58,7 +59,8 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, + redo_process: heavier_once_cell::OnceCell>, + pool: Arc, } /// @@ -106,6 +108,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -126,6 +129,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } @@ -149,6 +153,16 @@ struct TaintedProcess { process: Option>, } +impl std::ops::Deref for TaintedProcess { + type Target = WalRedoProcess; + + fn deref(&self) -> &Self::Target { + self.process + .as_ref() + .expect("only Self::drop sets it to None") + } +} + impl Drop for TaintedProcess { fn drop(&mut self) { // ensure tenant_id and span_id are in span @@ -166,7 +180,7 @@ impl PostgresRedoManager { pub fn new( conf: &'static PageServerConf, tenant_shard_id: TenantShardId, - pool: process_pool::Pool, + pool: Arc, ) -> PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { @@ -196,7 +210,7 @@ impl PostgresRedoManager { /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -213,37 +227,24 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { - let proc_guard = self.redo_process.read().unwrap(); - match &*proc_guard { - None => { - // "upgrade" to write lock to launch the process - drop(proc_guard); - let mut proc_guard = self.redo_process.write().unwrap(); - match &*proc_guard { - None => { - let start = Instant::now(); - let pool: Arc = todo!(); - let proc = Arc::new(TaintedProcess { - tenant_shard_id: self.tenant_shard_id, - process: Some(pool.get(pg_version))?, - }); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - *proc_guard = Some(Arc::clone(&proc)); - proc - } - Some(proc) => Arc::clone(proc), - } - } - Some(proc) => Arc::clone(proc), - } + let proc = { + let proc = self + .redo_process + .get_or_init(async move { + let start = Instant::now(); + let proc = Arc::new(TaintedProcess { + tenant_shard_id: self.tenant_shard_id, + process: Some(self.pool.get(pg_version).await?), + }); + let duration = start.elapsed(); + WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO.observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "acquired pre-spawned walredo process" + ); + }) + .await?; }; let started_at = std::time::Instant::now(); @@ -397,7 +398,7 @@ mod tests { async fn short_v14_redo() { let expected = std::fs::read("test_data/short_v14_redo.page").unwrap(); - let h = RedoHarness::new().unwrap(); + let h = RedoHarness::new().await.unwrap(); let page = h .manager @@ -423,7 +424,7 @@ mod tests { #[tokio::test] async fn short_v14_fails_for_wrong_key_but_returns_zero_page() { - let h = RedoHarness::new().unwrap(); + let h = RedoHarness::new().await.unwrap(); let page = h .manager @@ -452,7 +453,7 @@ mod tests { #[tokio::test] async fn test_stderr() { - let h = RedoHarness::new().unwrap(); + let h = RedoHarness::new().await.unwrap(); h .manager .request_redo( @@ -493,7 +494,7 @@ mod tests { } impl RedoHarness { - fn new() -> anyhow::Result { + async fn new() -> anyhow::Result { crate::tenant::harness::setup_logging(); let repo_dir = camino_tempfile::tempdir()?; @@ -501,7 +502,9 @@ mod tests { let conf = Box::leak(Box::new(conf)); let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); - let manager = PostgresRedoManager::new(conf, tenant_shard_id); + let pool = crate::walredo::process_pool::Pool::launch(conf).await; + + let manager = PostgresRedoManager::new(conf, tenant_shard_id, pool); Ok(RedoHarness { _repo_dir: repo_dir, diff --git a/pageserver/src/walredo/process_pool.rs b/pageserver/src/walredo/process_pool.rs index 3216f845de..2bb6232aa3 100644 --- a/pageserver/src/walredo/process_pool.rs +++ b/pageserver/src/walredo/process_pool.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Context; use utils::pre_spawned_pool; use crate::config::PageServerConf; @@ -19,7 +20,7 @@ struct Launcher { impl utils::pre_spawned_pool::Launcher> for Launcher { fn create(&self) -> anyhow::Result> { - Ok(Arc::new(WalRedoProcess::launch( + Ok(Box::new(WalRedoProcess::launch( self.conf, self.pg_version, )?)) @@ -46,16 +47,13 @@ impl Pool { .await, } } - pub fn get( - &self, - pg_version: usize, - ) -> Result, pre_spawned_pool::GetError> { + pub async fn get(&self, pg_version: u32) -> anyhow::Result> { let pool = match pg_version { 14 => &self.v14, 15 => &self.v15, 16 => &self.v16, x => anyhow::bail!("unknown pg version: {x}"), }; - pool.get() + pool.get().await.context("get pre-spawned walredo process") } }