From 2ab2608d4cacec0dc430e4cf72a89745fbdf6442 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 15:36:15 +0000 Subject: [PATCH] [DO NOT MERGE] refactor(walredo): use replace RwLock with heavier_once_cell The API is nice, exactly what we want, but we would want a more optimistic underlying sync primitive. --- pageserver/src/walredo.rs | 86 ++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 47 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 773e5fc051..049a6d445d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,11 +36,12 @@ use bytes::{Bytes, BytesMut}; use pageserver_api::key::key_to_rel_block; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::TenantShardId; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; +use utils::sync::heavier_once_cell; /// /// This is the real implementation that uses a Postgres process to @@ -53,7 +54,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, + redo_process: heavier_once_cell::OnceCell>, } /// @@ -101,6 +102,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -121,6 +123,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } @@ -134,7 +137,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()), + pid: self.redo_process.get().map(|p| p.id()), }) } } @@ -152,7 +155,7 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: RwLock::new(None), + redo_process: heavier_once_cell::OnceCell::default(), } } @@ -164,8 +167,11 @@ impl PostgresRedoManager { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - let mut guard = self.redo_process.write().unwrap(); - *guard = None; + drop( + self.redo_process + .get() + .map(|mut guard| guard.take_and_deinit()), + ); } } } @@ -175,7 +181,7 @@ impl PostgresRedoManager { /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -192,41 +198,28 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { - let proc_guard = self.redo_process.read().unwrap(); - match &*proc_guard { - None => { - // "upgrade" to write lock to launch the process - drop(proc_guard); - let mut proc_guard = self.redo_process.write().unwrap(); - match &*proc_guard { - None => { - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - *proc_guard = Some(Arc::clone(&proc)); - proc - } - Some(proc) => Arc::clone(proc), - } - } - Some(proc) => Arc::clone(proc), - } - }; + let proc = self + .redo_process + .get_or_init(|init_permit| async move { + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + anyhow::Ok((proc, init_permit)) + }) + .await?; let started_at = std::time::Instant::now(); @@ -275,12 +268,11 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut guard = self.redo_process.write().unwrap(); - match &*guard { - Some(current_field_value) => { - if Arc::ptr_eq(current_field_value, &proc) { + match self.redo_process.get() { + Some(mut guard) => { + if Arc::ptr_eq(&*guard, &proc) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *guard = None; + drop(guard.take_and_deinit()); } } None => {