From 8fe3c9ff55eaa8765c531e9e4960cf5d4be16c75 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 14:42:00 +0000 Subject: [PATCH] wip --- pageserver/src/tenant.rs | 10 ++++++-- pageserver/src/walredo.rs | 33 +++++++++++++++++++------- pageserver/src/walredo/process_pool.rs | 12 +++++----- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 102db60483..35bfeb9b38 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -187,6 +187,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted"; /// as the shared remote storage client and process initialization state. #[derive(Clone)] pub struct TenantSharedResources { + pub walredo_process_pool: Arc, pub broker_client: storage_broker::BrokerClientChannel, pub remote_storage: Option, pub deletion_queue_client: DeletionQueueClient, @@ -614,14 +615,19 @@ impl Tenant { mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { - let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(conf))); - let TenantSharedResources { + walredo_process_pool, broker_client, remote_storage, deletion_queue_client, } = resources; + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( + conf, + tenant_shard_id, + resources.walredo_process_pool, + ))); + let tenant = Arc::new(Tenant::new( TenantState::Attaching, conf, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 0a076a8a41..4ce00f22e3 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,6 +22,7 @@ mod process; mod process_pool; +pub(crate) use process_pool::Pool as ProcessPool; /// Code to apply [`NeonWalRecord`]s. mod apply_neon; @@ -44,6 +45,8 @@ use std::time::Instant; use tracing::*; use utils::lsn::Lsn; +use self::process::WalRedoProcess; + /// /// This is the real implementation that uses a Postgres process to /// perform WAL replay. Only one thread can use the process at a time, @@ -55,7 +58,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, + redo_process: RwLock>>, } /// @@ -141,6 +144,21 @@ impl PostgresRedoManager { } } +struct TaintedProcess { + tenant_shard_id: TenantShardId, + process: Option>, +} + +impl Drop for TaintedProcess { + fn drop(&mut self) { + // ensure tenant_id and span_id are in span + let span = info_span!("walredo", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()); + let _entered = span.enter(); + let process = self.process.take().expect("we are the only takers"); + drop(process); + } +} + impl PostgresRedoManager { /// /// Create a new PostgresRedoManager. @@ -205,14 +223,11 @@ impl PostgresRedoManager { match &*proc_guard { None => { let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); + let pool: Arc = todo!(); + let proc = Arc::new(TaintedProcess { + tenant_shard_id: self.tenant_shard_id, + process: Some(pool.get(pg_version))?, + }); let duration = start.elapsed(); WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM .observe(duration.as_secs_f64()); diff --git a/pageserver/src/walredo/process_pool.rs b/pageserver/src/walredo/process_pool.rs index 4712fe3657..3216f845de 100644 --- a/pageserver/src/walredo/process_pool.rs +++ b/pageserver/src/walredo/process_pool.rs @@ -7,9 +7,9 @@ use crate::config::PageServerConf; use super::process::WalRedoProcess; pub struct Pool { - v14: pre_spawned_pool::Client>, - v15: pre_spawned_pool::Client>, - v16: pre_spawned_pool::Client>, + v14: pre_spawned_pool::Client>, + v15: pre_spawned_pool::Client>, + v16: pre_spawned_pool::Client>, } struct Launcher { @@ -17,8 +17,8 @@ struct Launcher { conf: &'static PageServerConf, } -impl utils::pre_spawned_pool::Launcher> for Launcher { - fn create(&self) -> anyhow::Result> { +impl utils::pre_spawned_pool::Launcher> for Launcher { + fn create(&self) -> anyhow::Result> { Ok(Arc::new(WalRedoProcess::launch( self.conf, self.pg_version, @@ -49,7 +49,7 @@ impl Pool { pub fn get( &self, pg_version: usize, - ) -> Result, pre_spawned_pool::GetError> { + ) -> Result, pre_spawned_pool::GetError> { let pool = match pg_version { 14 => &self.v14, 15 => &self.v15,