From 1102d3f0bfba6916c52eaf82425d52428f75013a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 16:43:08 +0000 Subject: [PATCH] Revert "switch to tokio::RwLock" This reverts commit e8f1af552775e7c312c6dd242bbb342538f403f7. --- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant.rs | 15 ++++++--------- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 33 ++++++++++++--------------------- 4 files changed, 20 insertions(+), 32 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 554da51b4b..57ee746726 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -967,7 +967,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status().await, + walredo: tenant.wal_redo_manager_status(), timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ef48ad78b1..10b0237faf 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -332,9 +332,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -366,9 +366,9 @@ impl WalRedoManager { } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status().await, + WalRedoManager::Prod(m) => m.status(), #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1962,11 +1962,8 @@ impl Tenant { self.generation } - pub(crate) async fn wal_redo_manager_status(&self) -> Option { - let Some(mgr) = self.walredo_mgr.as_ref() else { - return None; - }; - mgr.status().await + pub(crate) fn wal_redo_manager_status(&self) -> Option { + self.walredo_mgr.as_ref().and_then(|mgr| mgr.status()) } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e84a9ddbf7..950cc46e71 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10).await; + walredo_mgr.maybe_quiesce(period * 10); } // Sleep diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 969e61b232..773e5fc051 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,7 +36,7 @@ use bytes::{Bytes, BytesMut}; use pageserver_api::key::key_to_rel_block; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::TenantShardId; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use std::time::Instant; use tracing::*; @@ -53,7 +53,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: tokio::sync::RwLock>>, + redo_process: RwLock>>, } /// @@ -101,7 +101,6 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await }; img = Some(result?); @@ -122,11 +121,10 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap(); @@ -136,7 +134,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.read().await.as_ref().map(|p| p.id()), + pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()), }) } } @@ -154,37 +152,30 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: tokio::sync::RwLock::new(None), + redo_process: RwLock::new(None), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - // fallthrough - } else { - return; + let mut guard = self.redo_process.write().unwrap(); + *guard = None; } - } else { - return; } - } else { - return; } - let mut guard = self.redo_process.write().await; - *guard = None; } /// /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - async fn apply_batch_postgres( + fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -202,12 +193,12 @@ impl PostgresRedoManager { loop { // launch the WAL redo process on first use let proc: Arc = { - let proc_guard = self.redo_process.read().await; + let proc_guard = self.redo_process.read().unwrap(); match &*proc_guard { None => { // "upgrade" to write lock to launch the process drop(proc_guard); - let mut proc_guard = self.redo_process.write().await; + let mut proc_guard = self.redo_process.write().unwrap(); match &*proc_guard { None => { let start = Instant::now(); @@ -284,7 +275,7 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut guard = self.redo_process.write().await; + let mut guard = self.redo_process.write().unwrap(); match &*guard { Some(current_field_value) => { if Arc::ptr_eq(current_field_value, &proc) {