From 64b4b498a4e2863140ddb3977a83ae982136ca73 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 17:25:25 +0000 Subject: [PATCH] Revert "remove the walredo usage, that'll be in the next pr" This reverts commit 20e82629dfa6671859423bed2ef532e030881d48. --- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant.rs | 12 ++--- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 98 +++++++++++++++++----------------- 4 files changed, 57 insertions(+), 57 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 57ee746726..554da51b4b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -967,7 +967,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status(), + walredo: tenant.wal_redo_manager_status().await, timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 42adb843d8..ef48ad78b1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -332,9 +332,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -366,9 +366,9 @@ impl WalRedoManager { } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status(), + WalRedoManager::Prod(m) => m.status().await, #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1962,11 +1962,11 @@ impl Tenant { self.generation } - pub(crate) fn wal_redo_manager_status(&self) -> Option { + pub(crate) async fn wal_redo_manager_status(&self) -> Option { let Some(mgr) = self.walredo_mgr.as_ref() else { return None; }; - mgr.status() + mgr.status().await } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 950cc46e71..e84a9ddbf7 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10); + walredo_mgr.maybe_quiesce(period * 10).await; } // Sleep diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 773e5fc051..0584ada980 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,11 +36,12 @@ use bytes::{Bytes, BytesMut}; use pageserver_api::key::key_to_rel_block; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::TenantShardId; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; +use utils::sync::heavier_once_cell; /// /// This is the real implementation that uses a Postgres process to @@ -53,7 +54,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, + redo_process: heavier_once_cell::OnceCell>, } /// @@ -101,6 +102,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -121,10 +123,11 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap(); @@ -134,7 +137,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()), + pid: self.redo_process.get_mut().await.map(|p| p.id()), }) } } @@ -152,30 +155,41 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: RwLock::new(None), + redo_process: heavier_once_cell::OnceCell::default(), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - let mut guard = self.redo_process.write().unwrap(); - *guard = None; + // fallthrough + } else { + return; } + } else { + return; } + } else { + return; } + drop( + self.redo_process + .get_mut() + .await + .map(|mut guard| guard.take_and_deinit()), + ); } /// /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -192,41 +206,28 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { - let proc_guard = self.redo_process.read().unwrap(); - match &*proc_guard { - None => { - // "upgrade" to write lock to launch the process - drop(proc_guard); - let mut proc_guard = self.redo_process.write().unwrap(); - match &*proc_guard { - None => { - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - *proc_guard = Some(Arc::clone(&proc)); - proc - } - Some(proc) => Arc::clone(proc), - } - } - Some(proc) => Arc::clone(proc), - } - }; + let proc = self + .redo_process + .get_or_init(|init_permit| async move { + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + anyhow::Ok((proc, init_permit)) + }) + .await?; let started_at = std::time::Instant::now(); @@ -275,12 +276,11 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut guard = self.redo_process.write().unwrap(); - match &*guard { - Some(current_field_value) => { - if Arc::ptr_eq(current_field_value, &proc) { + match self.redo_process.get_mut().await { + Some(mut guard) => { + if Arc::ptr_eq(&*guard, &proc) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *guard = None; + drop(guard.take_and_deinit()); } } None => {