From 20e82629dfa6671859423bed2ef532e030881d48 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 17:21:59 +0000 Subject: [PATCH] remove the walredo usage, that'll be in the next pr --- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant.rs | 12 ++--- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 98 +++++++++++++++++----------------- 4 files changed, 57 insertions(+), 57 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 554da51b4b..57ee746726 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -967,7 +967,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status().await, + walredo: tenant.wal_redo_manager_status(), timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ef48ad78b1..42adb843d8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -332,9 +332,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -366,9 +366,9 @@ impl WalRedoManager { } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status().await, + WalRedoManager::Prod(m) => m.status(), #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1962,11 +1962,11 @@ impl Tenant { self.generation } - pub(crate) async fn wal_redo_manager_status(&self) -> Option { + pub(crate) fn wal_redo_manager_status(&self) -> Option { let Some(mgr) = self.walredo_mgr.as_ref() else { return None; }; - mgr.status().await + mgr.status() } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e84a9ddbf7..950cc46e71 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10).await; + walredo_mgr.maybe_quiesce(period * 10); } // Sleep diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 0584ada980..773e5fc051 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,12 +36,11 @@ use bytes::{Bytes, BytesMut}; use pageserver_api::key::key_to_rel_block; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::TenantShardId; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; -use utils::sync::heavier_once_cell; /// /// This is the real implementation that uses a Postgres process to @@ -54,7 +53,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: heavier_once_cell::OnceCell>, + redo_process: RwLock>>, } /// @@ -102,7 +101,6 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await }; img = Some(result?); @@ -123,11 +121,10 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap(); @@ -137,7 +134,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.get_mut().await.map(|p| p.id()), + pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()), }) } } @@ -155,41 +152,30 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: heavier_once_cell::OnceCell::default(), + redo_process: RwLock::new(None), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - // fallthrough - } else { - return; + let mut guard = self.redo_process.write().unwrap(); + *guard = None; } - } else { - return; } - } else { - return; } - drop( - self.redo_process - .get_mut() - .await - .map(|mut guard| guard.take_and_deinit()), - ); } /// /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - async fn apply_batch_postgres( + fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -206,28 +192,41 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc = self - .redo_process - .get_or_init(|init_permit| async move { - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - anyhow::Ok((proc, init_permit)) - }) - .await?; + let proc: Arc = { + let proc_guard = self.redo_process.read().unwrap(); + match &*proc_guard { + None => { + // "upgrade" to write lock to launch the process + drop(proc_guard); + let mut proc_guard = self.redo_process.write().unwrap(); + match &*proc_guard { + None => { + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM + .observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + *proc_guard = Some(Arc::clone(&proc)); + proc + } + Some(proc) => Arc::clone(proc), + } + } + Some(proc) => Arc::clone(proc), + } + }; let started_at = std::time::Instant::now(); @@ -276,11 +275,12 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - match self.redo_process.get_mut().await { - Some(mut guard) => { - if Arc::ptr_eq(&*guard, &proc) { + let mut guard = self.redo_process.write().unwrap(); + match &*guard { + Some(current_field_value) => { + if Arc::ptr_eq(current_field_value, &proc) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - drop(guard.take_and_deinit()); + *guard = None; } } None => {