diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 0ccaf4e716..f733d107f1 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -1,6 +1,6 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, MutexGuard, + Arc, }; use tokio::sync::Semaphore; @@ -12,7 +12,7 @@ use tokio::sync::Semaphore; /// /// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit pub struct OnceCell { - inner: Mutex>, + inner: tokio::sync::RwLock>, initializers: AtomicUsize, } @@ -50,7 +50,7 @@ impl OnceCell { let sem = Semaphore::new(1); sem.close(); Self { - inner: Mutex::new(Inner { + inner: tokio::sync::RwLock::new(Inner { init_semaphore: Arc::new(sem), value: Some(value), }), @@ -61,18 +61,18 @@ impl OnceCell { /// Returns a guard to an existing initialized value, or uniquely initializes the value before /// returning the guard. /// - /// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization. + /// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization. /// /// Initialization is panic-safe and cancellation-safe. - pub async fn get_or_init(&self, factory: F) -> Result, E> + pub async fn get_mut_or_init(&self, factory: F) -> Result, E> where F: FnOnce(InitPermit) -> Fut, Fut: std::future::Future>, { let sem = { - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; if guard.value.is_some() { - return Ok(Guard(guard)); + return Ok(GuardMut(guard)); } guard.init_semaphore.clone() }; @@ -88,29 +88,72 @@ impl OnceCell { let permit = InitPermit(permit); let (value, _permit) = factory(permit).await?; - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; Ok(Self::set0(value, guard)) } Err(_closed) => { - let guard = self.inner.lock().unwrap(); + let guard = self.inner.write().await; assert!( guard.value.is_some(), "semaphore got closed, must be initialized" ); - return Ok(Guard(guard)); + return Ok(GuardMut(guard)); } } } - /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used + /// Returns a guard to an existing initialized value, or uniquely initializes the value before + /// returning the guard. + /// + /// Initialization is panic-safe and cancellation-safe. + pub async fn get_or_init(&self, factory: F) -> Result, E> + where + F: FnOnce(InitPermit) -> Fut, + Fut: std::future::Future>, + { + let sem = { + let guard = self.inner.read().await; + if guard.value.is_some() { + return Ok(GuardRef(guard)); + } + guard.init_semaphore.clone() + }; + + let permit = { + // increment the count for the duration of queued + let _guard = CountWaitingInitializers::start(self); + sem.acquire_owned().await + }; + + match permit { + Ok(permit) => { + let permit = InitPermit(permit); + let (value, _permit) = factory(permit).await?; + + let guard = self.inner.write().await; + + Ok(Self::set0(value, guard).downgrade()) + } + Err(_closed) => { + let guard = self.inner.read().await; + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(GuardRef(guard)); + } + } + } + + /// Assuming a permit is held after previous call to [`GuardMut::take_and_deinit`], it can be used /// to complete initializing the inner value. /// /// # Panics /// /// If the inner has already been initialized. - pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> { - let guard = self.inner.lock().unwrap(); + pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> { + let guard = self.inner.write().await; // cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot // give more permits right now. @@ -122,21 +165,31 @@ impl OnceCell { Self::set0(value, guard) } - fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner>) -> Guard<'_, T> { + fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner>) -> GuardMut<'_, T> { if guard.value.is_some() { drop(guard); unreachable!("we won permit, must not be initialized"); } guard.value = Some(value); guard.init_semaphore.close(); - Guard(guard) + GuardMut(guard) } /// Returns a guard to an existing initialized value, if any. - pub fn get(&self) -> Option> { - let guard = self.inner.lock().unwrap(); + pub async fn get_mut(&self) -> Option> { + let guard = self.inner.write().await; if guard.value.is_some() { - Some(Guard(guard)) + Some(GuardMut(guard)) + } else { + None + } + } + + /// Returns a guard to an existing initialized value, if any. + pub async fn get(&self) -> Option> { + let guard = self.inner.read().await; + if guard.value.is_some() { + Some(GuardRef(guard)) } else { None } @@ -168,9 +221,9 @@ impl<'a, T> Drop for CountWaitingInitializers<'a, T> { /// Uninteresting guard object to allow short-lived access to inspect or clone the held, /// initialized value. #[derive(Debug)] -pub struct Guard<'a, T>(MutexGuard<'a, Inner>); +pub struct GuardMut<'a, T>(tokio::sync::RwLockWriteGuard<'a, Inner>); -impl std::ops::Deref for Guard<'_, T> { +impl std::ops::Deref for GuardMut<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { @@ -181,7 +234,7 @@ impl std::ops::Deref for Guard<'_, T> { } } -impl std::ops::DerefMut for Guard<'_, T> { +impl std::ops::DerefMut for GuardMut<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { self.0 .value @@ -190,7 +243,7 @@ impl std::ops::DerefMut for Guard<'_, T> { } } -impl<'a, T> Guard<'a, T> { +impl<'a, T> GuardMut<'a, T> { /// Take the current value, and a new permit for it's deinitialization. /// /// The permit will be on a semaphore part of the new internal value, and any following @@ -208,6 +261,24 @@ impl<'a, T> Guard<'a, T> { .map(|v| (v, InitPermit(permit))) .expect("guard is not created unless value has been initialized") } + + pub fn downgrade(self) -> GuardRef<'a, T> { + GuardRef(self.0.downgrade()) + } +} + +#[derive(Debug)] +pub struct GuardRef<'a, T>(tokio::sync::RwLockReadGuard<'a, Inner>); + +impl std::ops::Deref for GuardRef<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0 + .value + .as_ref() + .expect("guard is not created unless value has been initialized") + } } /// Type held by OnceCell (de)initializing task. @@ -248,7 +319,7 @@ mod tests { barrier.wait().await; let won = { let g = cell - .get_or_init(|permit| { + .get_mut_or_init(|permit| { counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed); async { counters.future_polled.fetch_add(1, Ordering::Relaxed); @@ -295,7 +366,11 @@ mod tests { let cell = cell.clone(); let deinitialization_started = deinitialization_started.clone(); async move { - let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit(); + let (answer, _permit) = cell + .get_mut() + .await + .expect("initialized to value") + .take_and_deinit(); assert_eq!(answer, initial); deinitialization_started.wait().await; @@ -306,7 +381,7 @@ mod tests { deinitialization_started.wait().await; let started_at = tokio::time::Instant::now(); - cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) + cell.get_mut_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) .await .unwrap(); @@ -318,21 +393,21 @@ mod tests { jh.await.unwrap(); - assert_eq!(*cell.get().unwrap(), reinit); + assert_eq!(*cell.get_mut().await.unwrap(), reinit); } - #[test] - fn reinit_with_deinit_permit() { + #[tokio::test] + async fn reinit_with_deinit_permit() { let cell = Arc::new(OnceCell::new(42)); - let (mol, permit) = cell.get().unwrap().take_and_deinit(); - cell.set(5, permit); - assert_eq!(*cell.get().unwrap(), 5); + let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit(); + cell.set(5, permit).await; + assert_eq!(*cell.get_mut().await.unwrap(), 5); - let (five, permit) = cell.get().unwrap().take_and_deinit(); + let (five, permit) = cell.get_mut().await.unwrap().take_and_deinit(); assert_eq!(5, five); - cell.set(mol, permit); - assert_eq!(*cell.get().unwrap(), 42); + cell.set(mol, permit).await; + assert_eq!(*cell.get_mut().await.unwrap(), 42); } #[tokio::test] @@ -340,13 +415,13 @@ mod tests { let cell = OnceCell::default(); for _ in 0..10 { - cell.get_or_init(|_permit| async { Err("whatever error") }) + cell.get_mut_or_init(|_permit| async { Err("whatever error") }) .await .unwrap_err(); } let g = cell - .get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) + .get_mut_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) .await .unwrap(); assert_eq!(*g, "finally success"); @@ -358,7 +433,7 @@ mod tests { let barrier = tokio::sync::Barrier::new(2); - let initializer = cell.get_or_init(|permit| async { + let initializer = cell.get_mut_or_init(|permit| async { barrier.wait().await; futures::future::pending::<()>().await; @@ -372,10 +447,10 @@ mod tests { // now initializer is dropped - assert!(cell.get().is_none()); + assert!(cell.get_mut().await.is_none()); let g = cell - .get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) + .get_mut_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) .await .unwrap(); assert_eq!(*g, "now initialized"); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 12af866810..1f337adf53 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -299,8 +299,8 @@ impl Layer { }) } - pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { - self.0.info(reset) + pub(crate) async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + self.0.info(reset).await } pub(crate) fn access_stats(&self) -> &LayerAccessStats { @@ -611,10 +611,10 @@ impl LayerInner { let mut rx = self.status.subscribe(); let strong = { - match self.inner.get() { + match self.inner.get_mut().await { Some(mut either) => { self.wanted_evicted.store(true, Ordering::Relaxed); - either.downgrade() + ResidentOrWantedEvicted::downgrade(&mut either) } None => return Err(EvictionError::NotFound), } @@ -640,7 +640,7 @@ impl LayerInner { // use however late (compared to the initial expressing of wanted) as the // "outcome" now LAYER_IMPL_METRICS.inc_broadcast_lagged(); - match self.inner.get() { + match self.inner.get_mut().await { Some(_) => Err(EvictionError::Downloaded), None => Ok(()), } @@ -758,7 +758,7 @@ impl LayerInner { // use the already held initialization permit because it is impossible to hit the // below paths anymore essentially limiting the max loop iterations to 2. let (value, init_permit) = download(init_permit).await?; - let mut guard = self.inner.set(value, init_permit); + let mut guard = self.inner.set(value, init_permit).await; let (strong, _upgraded) = guard .get_and_upgrade() .expect("init creates strong reference, we held the init permit"); @@ -766,7 +766,7 @@ impl LayerInner { } let (weak, permit) = { - let mut locked = self.inner.get_or_init(download).await?; + let mut locked = self.inner.get_mut_or_init(download).await?; if let Some((strong, upgraded)) = locked.get_and_upgrade() { if upgraded { @@ -986,12 +986,12 @@ impl LayerInner { } } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); // this is not accurate: we could have the file locally but there was a cancellation // and now we are not in sync, or we are currently downloading it. - let remote = self.inner.get().is_none(); + let remote = self.inner.get_mut().await.is_none(); let access_stats = self.access_stats.as_api_model(reset); @@ -1050,7 +1050,7 @@ impl LayerInner { LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone); return; }; - match this.evict_blocking(version) { + match tokio::runtime::Handle::current().block_on(this.evict_blocking(version)) { Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(), Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason), } @@ -1058,7 +1058,7 @@ impl LayerInner { } } - fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> { + async fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> { // deleted or detached timeline, don't do anything. let Some(timeline) = self.timeline.upgrade() else { return Err(EvictionCancelled::TimelineGone); @@ -1067,7 +1067,7 @@ impl LayerInner { // to avoid starting a new download while we evict, keep holding on to the // permit. let _permit = { - let maybe_downloaded = self.inner.get(); + let maybe_downloaded = self.inner.get_mut().await; let (_weak, permit) = match maybe_downloaded { Some(mut guard) => { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0ba3fe728a..50ffc4d265 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1268,7 +1268,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { let historic_layer = guard.get_from_desc(&historic_layer); - historic_layers.push(historic_layer.info(reset)); + historic_layers.push(historic_layer.info(reset).await); } LayerMapInfo {