diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 0ccaf4e716..5df0b437ca 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -1,6 +1,6 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, MutexGuard, + Arc, }; use tokio::sync::Semaphore; @@ -12,7 +12,7 @@ use tokio::sync::Semaphore; /// /// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit pub struct OnceCell { - inner: Mutex>, + inner: tokio::sync::RwLock>, initializers: AtomicUsize, } @@ -50,7 +50,7 @@ impl OnceCell { let sem = Semaphore::new(1); sem.close(); Self { - inner: Mutex::new(Inner { + inner: tokio::sync::RwLock::new(Inner { init_semaphore: Arc::new(sem), value: Some(value), }), @@ -61,18 +61,18 @@ impl OnceCell { /// Returns a guard to an existing initialized value, or uniquely initializes the value before /// returning the guard. /// - /// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization. + /// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization. /// /// Initialization is panic-safe and cancellation-safe. - pub async fn get_or_init(&self, factory: F) -> Result, E> + pub async fn get_mut_or_init(&self, factory: F) -> Result, E> where F: FnOnce(InitPermit) -> Fut, Fut: std::future::Future>, { let sem = { - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; if guard.value.is_some() { - return Ok(Guard(guard)); + return Ok(GuardMut(guard)); } guard.init_semaphore.clone() }; @@ -88,29 +88,72 @@ impl OnceCell { let permit = InitPermit(permit); let (value, _permit) = factory(permit).await?; - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; Ok(Self::set0(value, guard)) } Err(_closed) => { - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; assert!( guard.value.is_some(), "semaphore got closed, must be initialized" ); - return Ok(Guard(guard)); + return Ok(GuardMut(guard)); } } } - /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used + /// Returns a guard to an existing initialized value, or uniquely initializes the value before + /// returning the guard. + /// + /// Initialization is panic-safe and cancellation-safe. + pub async fn get_or_init(&self, factory: F) -> Result, E> + where + F: FnOnce(InitPermit) -> Fut, + Fut: std::future::Future>, + { + let sem = { + let guard = self.inner.read().await; + if guard.value.is_some() { + return Ok(GuardRef(guard)); + } + guard.init_semaphore.clone() + }; + + let permit = { + // increment the count for the duration of queued + let _guard = CountWaitingInitializers::start(self); + sem.acquire_owned().await + }; + + match permit { + Ok(permit) => { + let permit = InitPermit(permit); + let (value, _permit) = factory(permit).await?; + + let guard = self.inner.write().await; + + Ok(Self::set0(value, guard).downgrade()) + } + Err(_closed) => { + let guard = self.inner.read().await; + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(GuardRef(guard)); + } + } + } + + /// Assuming a permit is held after previous call to [`GuardMut::take_and_deinit`], it can be used /// to complete initializing the inner value. /// /// # Panics /// /// If the inner has already been initialized. - pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> { - let guard = self.inner.lock().unwrap(); + pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> { + let guard = self.inner.write().await; // cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot // give more permits right now. @@ -122,21 +165,31 @@ impl OnceCell { Self::set0(value, guard) } - fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner>) -> Guard<'_, T> { + fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner>) -> GuardMut<'_, T> { if guard.value.is_some() { drop(guard); unreachable!("we won permit, must not be initialized"); } guard.value = Some(value); guard.init_semaphore.close(); - Guard(guard) + GuardMut(guard) } /// Returns a guard to an existing initialized value, if any. - pub fn get(&self) -> Option> { - let guard = self.inner.lock().unwrap(); + pub async fn get_mut(&self) -> Option> { + let guard = self.inner.write().await; if guard.value.is_some() { - Some(Guard(guard)) + Some(GuardMut(guard)) + } else { + None + } + } + + /// Returns a guard to an existing initialized value, if any. + pub async fn get(&self) -> Option> { + let guard = self.inner.read().await; + if guard.value.is_some() { + Some(GuardRef(guard)) } else { None } @@ -168,9 +221,9 @@ impl<'a, T> Drop for CountWaitingInitializers<'a, T> { /// Uninteresting guard object to allow short-lived access to inspect or clone the held, /// initialized value. #[derive(Debug)] -pub struct Guard<'a, T>(MutexGuard<'a, Inner>); +pub struct GuardMut<'a, T>(tokio::sync::RwLockWriteGuard<'a, Inner>); -impl std::ops::Deref for Guard<'_, T> { +impl std::ops::Deref for GuardMut<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { @@ -181,7 +234,7 @@ impl std::ops::Deref for Guard<'_, T> { } } -impl std::ops::DerefMut for Guard<'_, T> { +impl std::ops::DerefMut for GuardMut<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { self.0 .value @@ -190,7 +243,7 @@ impl std::ops::DerefMut for Guard<'_, T> { } } -impl<'a, T> Guard<'a, T> { +impl<'a, T> GuardMut<'a, T> { /// Take the current value, and a new permit for it's deinitialization. /// /// The permit will be on a semaphore part of the new internal value, and any following @@ -208,6 +261,24 @@ impl<'a, T> Guard<'a, T> { .map(|v| (v, InitPermit(permit))) .expect("guard is not created unless value has been initialized") } + + pub fn downgrade(self) -> GuardRef<'a, T> { + GuardRef(self.0.downgrade()) + } +} + +#[derive(Debug)] +pub struct GuardRef<'a, T>(tokio::sync::RwLockReadGuard<'a, Inner>); + +impl std::ops::Deref for GuardRef<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0 + .value + .as_ref() + .expect("guard is not created unless value has been initialized") + } } /// Type held by OnceCell (de)initializing task. @@ -248,7 +319,7 @@ mod tests { barrier.wait().await; let won = { let g = cell - .get_or_init(|permit| { + .get_mut_or_init(|permit| { counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed); async { counters.future_polled.fetch_add(1, Ordering::Relaxed); @@ -295,7 +366,11 @@ mod tests { let cell = cell.clone(); let deinitialization_started = deinitialization_started.clone(); async move { - let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit(); + let (answer, _permit) = cell + .get_mut() + .await + .expect("initialized to value") + .take_and_deinit(); assert_eq!(answer, initial); deinitialization_started.wait().await; @@ -306,7 +381,7 @@ mod tests { deinitialization_started.wait().await; let started_at = tokio::time::Instant::now(); - cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) + cell.get_mut_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) .await .unwrap(); @@ -318,21 +393,21 @@ mod tests { jh.await.unwrap(); - assert_eq!(*cell.get().unwrap(), reinit); + assert_eq!(*cell.get_mut().unwrap(), reinit); } #[test] fn reinit_with_deinit_permit() { let cell = Arc::new(OnceCell::new(42)); - let (mol, permit) = cell.get().unwrap().take_and_deinit(); + let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit(); cell.set(5, permit); - assert_eq!(*cell.get().unwrap(), 5); + assert_eq!(*cell.get_mut().unwrap(), 5); - let (five, permit) = cell.get().unwrap().take_and_deinit(); + let (five, permit) = cell.get_mut().unwrap().take_and_deinit(); assert_eq!(5, five); cell.set(mol, permit); - assert_eq!(*cell.get().unwrap(), 42); + assert_eq!(*cell.get_mut().unwrap(), 42); } #[tokio::test] @@ -340,13 +415,13 @@ mod tests { let cell = OnceCell::default(); for _ in 0..10 { - cell.get_or_init(|_permit| async { Err("whatever error") }) + cell.get_mut_or_init(|_permit| async { Err("whatever error") }) .await .unwrap_err(); } let g = cell - .get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) + .get_mut_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) .await .unwrap(); assert_eq!(*g, "finally success"); @@ -358,7 +433,7 @@ mod tests { let barrier = tokio::sync::Barrier::new(2); - let initializer = cell.get_or_init(|permit| async { + let initializer = cell.get_mut_or_init(|permit| async { barrier.wait().await; futures::future::pending::<()>().await; @@ -372,10 +447,10 @@ mod tests { // now initializer is dropped - assert!(cell.get().is_none()); + assert!(cell.get_mut().await.is_none()); let g = cell - .get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) + .get_mut_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) .await .unwrap(); assert_eq!(*g, "now initialized"); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 57ee746726..554da51b4b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -967,7 +967,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status(), + walredo: tenant.wal_redo_manager_status().await, timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 10b0237faf..ef48ad78b1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -332,9 +332,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -366,9 +366,9 @@ impl WalRedoManager { } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status(), + WalRedoManager::Prod(m) => m.status().await, #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1962,8 +1962,11 @@ impl Tenant { self.generation } - pub(crate) fn wal_redo_manager_status(&self) -> Option { - self.walredo_mgr.as_ref().and_then(|mgr| mgr.status()) + pub(crate) async fn wal_redo_manager_status(&self) -> Option { + let Some(mgr) = self.walredo_mgr.as_ref() else { + return None; + }; + mgr.status().await } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 12af866810..1f337adf53 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -299,8 +299,8 @@ impl Layer { }) } - pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { - self.0.info(reset) + pub(crate) async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + self.0.info(reset).await } pub(crate) fn access_stats(&self) -> &LayerAccessStats { @@ -611,10 +611,10 @@ impl LayerInner { let mut rx = self.status.subscribe(); let strong = { - match self.inner.get() { + match self.inner.get_mut().await { Some(mut either) => { self.wanted_evicted.store(true, Ordering::Relaxed); - either.downgrade() + ResidentOrWantedEvicted::downgrade(&mut either) } None => return Err(EvictionError::NotFound), } @@ -640,7 +640,7 @@ impl LayerInner { // use however late (compared to the initial expressing of wanted) as the // "outcome" now LAYER_IMPL_METRICS.inc_broadcast_lagged(); - match self.inner.get() { + match self.inner.get_mut().await { Some(_) => Err(EvictionError::Downloaded), None => Ok(()), } @@ -758,7 +758,7 @@ impl LayerInner { // use the already held initialization permit because it is impossible to hit the // below paths anymore essentially limiting the max loop iterations to 2. let (value, init_permit) = download(init_permit).await?; - let mut guard = self.inner.set(value, init_permit); + let mut guard = self.inner.set(value, init_permit).await; let (strong, _upgraded) = guard .get_and_upgrade() .expect("init creates strong reference, we held the init permit"); @@ -766,7 +766,7 @@ impl LayerInner { } let (weak, permit) = { - let mut locked = self.inner.get_or_init(download).await?; + let mut locked = self.inner.get_mut_or_init(download).await?; if let Some((strong, upgraded)) = locked.get_and_upgrade() { if upgraded { @@ -986,12 +986,12 @@ impl LayerInner { } } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); // this is not accurate: we could have the file locally but there was a cancellation // and now we are not in sync, or we are currently downloading it. - let remote = self.inner.get().is_none(); + let remote = self.inner.get_mut().await.is_none(); let access_stats = self.access_stats.as_api_model(reset); @@ -1050,7 +1050,7 @@ impl LayerInner { LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone); return; }; - match this.evict_blocking(version) { + match tokio::runtime::Handle::current().block_on(this.evict_blocking(version)) { Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(), Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason), } @@ -1058,7 +1058,7 @@ impl LayerInner { } } - fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> { + async fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> { // deleted or detached timeline, don't do anything. let Some(timeline) = self.timeline.upgrade() else { return Err(EvictionCancelled::TimelineGone); @@ -1067,7 +1067,7 @@ impl LayerInner { // to avoid starting a new download while we evict, keep holding on to the // permit. let _permit = { - let maybe_downloaded = self.inner.get(); + let maybe_downloaded = self.inner.get_mut().await; let (_weak, permit) = match maybe_downloaded { Some(mut guard) => { diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 950cc46e71..e84a9ddbf7 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10); + walredo_mgr.maybe_quiesce(period * 10).await; } // Sleep diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7edcb9c249..d6b0075306 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1268,7 +1268,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { let historic_layer = guard.get_from_desc(&historic_layer); - historic_layers.push(historic_layer.info(reset)); + historic_layers.push(historic_layer.info(reset).await); } LayerMapInfo { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 049a6d445d..0584ada980 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -127,7 +127,7 @@ impl PostgresRedoManager { } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap(); @@ -137,7 +137,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.get().map(|p| p.id()), + pid: self.redo_process.get_mut().await.map(|p| p.id()), }) } } @@ -162,19 +162,27 @@ impl PostgresRedoManager { /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - drop( - self.redo_process - .get() - .map(|mut guard| guard.take_and_deinit()), - ); + // fallthrough + } else { + return; } + } else { + return; } + } else { + return; } + drop( + self.redo_process + .get_mut() + .await + .map(|mut guard| guard.take_and_deinit()), + ); } /// @@ -268,7 +276,7 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - match self.redo_process.get() { + match self.redo_process.get_mut().await { Some(mut guard) => { if Arc::ptr_eq(&*guard, &proc) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation.