diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 4d66a54c98..8a5aaf2ceb 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -60,8 +60,8 @@ impl OnceCell { /// Initialization is panic-safe and cancellation-safe. pub async fn get_or_init(&self, factory: F) -> Result, E> where - F: FnOnce() -> Fut, - Fut: std::future::Future>, + F: FnOnce(InitPermit) -> Fut, + Fut: std::future::Future>, { let sem = { let guard = self.inner.lock().unwrap(); @@ -72,28 +72,55 @@ impl OnceCell { }; let permit = sem.acquire_owned().await; - if permit.is_err() { - let guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_some(), - "semaphore got closed, must be initialized" - ); - return Ok(Guard(guard)); - } else { - // now we try - let value = factory().await?; - let mut guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_none(), - "we won permit, must not be initialized" - ); - guard.value = Some(value); - guard.init_semaphore.close(); - Ok(Guard(guard)) + match permit { + Ok(permit) => { + let permit = InitPermit(permit); + let (value, _permit) = factory(permit).await?; + + let guard = self.inner.lock().unwrap(); + + Ok(Self::set0(value, guard)) + } + Err(_closed) => { + let guard = self.inner.lock().unwrap(); + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(Guard(guard)); + } } } + /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used + /// to complete initializing the inner value. + /// + /// # Panics + /// + /// If the inner has already been initialized. + pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> { + // cannot assert that this permit is for self.inner.semaphore + let guard = self.inner.lock().unwrap(); + + if guard.init_semaphore.try_acquire().is_ok() { + drop(guard); + panic!("semaphore is of wrong origin"); + } + + Self::set0(value, guard) + } + + fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner>) -> Guard<'_, T> { + if guard.value.is_some() { + drop(guard); + unreachable!("we won permit, must not be initialized"); + } + guard.value = Some(value); + guard.init_semaphore.close(); + Guard(guard) + } + /// Returns a guard to an existing initialized value, if any. pub fn get(&self) -> Option> { let guard = self.inner.lock().unwrap(); @@ -135,7 +162,7 @@ impl<'a, T> Guard<'a, T> { /// /// The permit will be on a semaphore part of the new internal value, and any following /// [`OnceCell::get_or_init`] will wait on it to complete. - pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) { + pub fn take_and_deinit(&mut self) -> (T, InitPermit) { let mut swapped = Inner::default(); let permit = swapped .init_semaphore @@ -145,11 +172,14 @@ impl<'a, T> Guard<'a, T> { std::mem::swap(&mut *self.0, &mut swapped); swapped .value - .map(|v| (v, permit)) + .map(|v| (v, InitPermit(permit))) .expect("guard is not created unless value has been initialized") } } +/// Type held by OnceCell (de)initializing task. +pub struct InitPermit(tokio::sync::OwnedSemaphorePermit); + #[cfg(test)] mod tests { use super::*; @@ -185,11 +215,11 @@ mod tests { barrier.wait().await; let won = { let g = cell - .get_or_init(|| { + .get_or_init(|permit| { counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed); async { counters.future_polled.fetch_add(1, Ordering::Relaxed); - Ok::<_, Infallible>(i) + Ok::<_, Infallible>((i, permit)) } }) .await @@ -243,7 +273,7 @@ mod tests { deinitialization_started.wait().await; let started_at = tokio::time::Instant::now(); - cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) }) + cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) .await .unwrap(); @@ -258,18 +288,32 @@ mod tests { assert_eq!(*cell.get().unwrap(), reinit); } + #[test] + fn reinit_with_deinit_permit() { + let cell = Arc::new(OnceCell::new(42)); + + let (mol, permit) = cell.get().unwrap().take_and_deinit(); + cell.set(5, permit); + assert_eq!(*cell.get().unwrap(), 5); + + let (five, permit) = cell.get().unwrap().take_and_deinit(); + assert_eq!(5, five); + cell.set(mol, permit); + assert_eq!(*cell.get().unwrap(), 42); + } + #[tokio::test] async fn initialization_attemptable_until_ok() { let cell = OnceCell::default(); for _ in 0..10 { - cell.get_or_init(|| async { Err("whatever error") }) + cell.get_or_init(|_permit| async { Err("whatever error") }) .await .unwrap_err(); } let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("finally success") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) .await .unwrap(); assert_eq!(*g, "finally success"); @@ -281,11 +325,11 @@ mod tests { let barrier = tokio::sync::Barrier::new(2); - let initializer = cell.get_or_init(|| async { + let initializer = cell.get_or_init(|permit| async { barrier.wait().await; futures::future::pending::<()>().await; - Ok::<_, Infallible>("never reached") + Ok::<_, Infallible>(("never reached", permit)) }); tokio::select! { @@ -298,7 +342,7 @@ mod tests { assert!(cell.get().is_none()); let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("now initialized") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) .await .unwrap(); assert_eq!(*g, "now initialized"); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 761fe311c6..94edfa6fe0 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -618,10 +618,10 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { - let mut permit = None; + let mut init_permit = None; loop { - let download = move || async move { + let download = move |permit| async move { // disable any scheduled but not yet running eviction deletions for this let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); @@ -644,7 +644,7 @@ impl LayerInner { .await .map_err(DownloadError::PreStatFailed)?; - if let Some(reason) = needs_download { + let permit = if let Some(reason) = needs_download { // only reset this after we've decided we really need to download. otherwise it'd // be impossible to mark cancelled downloads for eviction, like one could imagine // we would like to do for prefetching which was not needed. @@ -666,12 +666,14 @@ impl LayerInner { return Err(DownloadError::DownloadRequired); } - self.spawn_download_and_wait(timeline).await?; + self.spawn_download_and_wait(timeline, permit).await? } else { // the file is present locally, probably by a previous but cancelled call to // get_or_maybe_download. alternatively we might be running without remote storage. LAYER_IMPL_METRICS.inc_init_needed_no_download(); - } + + permit + }; let res = Arc::new(DownloadedLayer { owner: Arc::downgrade(self), @@ -684,12 +686,21 @@ impl LayerInner { LayerResidenceEventReason::ResidenceChange, ); - Ok(ResidentOrWantedEvicted::Resident(res)) + Ok((ResidentOrWantedEvicted::Resident(res), permit)) }; - let (weak, _permit) = { - // should we be able to give the permit to the `get_or_init`? would make sense. - drop(permit.take()); + if let Some(init_permit) = init_permit.take() { + // use the already held initialization permit because it is impossible to hit the + // below paths anymore essentially limiting the max loop iterations to 2. + let (value, init_permit) = download(init_permit).await?; + let mut guard = self.inner.set(value, init_permit); + let strong = guard + .get_and_upgrade() + .expect("init creates strong reference, we held the init permit"); + return Ok(strong); + } + + let (weak, permit) = { let mut locked = self.inner.get_or_init(download).await?; if let Some(strong) = locked.get_and_upgrade() { @@ -716,7 +727,7 @@ impl LayerInner { "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug" ); - permit = Some(_permit); + init_permit = Some(permit); LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download(); } @@ -752,10 +763,12 @@ impl LayerInner { async fn spawn_download_and_wait( self: &Arc, timeline: Arc, - ) -> Result<(), DownloadError> { + permit: heavier_once_cell::InitPermit, + ) -> Result { let task_name = format!("download layer {}", self); let (tx, rx) = tokio::sync::oneshot::channel(); + // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot // block tenant::mgr::remove_tenant_from_memory. @@ -789,9 +802,9 @@ impl LayerInner { } }; - if let Err(res) = tx.send(result) { + if let Err(res) = tx.send((result, permit)) { match res { - Ok(()) => { + (Ok(()), _) => { // our caller is cancellation safe so this is fine; if someone // else requests the layer, they'll find it already downloaded // or redownload. @@ -802,7 +815,7 @@ impl LayerInner { tracing::info!("layer file download completed after requester had cancelled"); LAYER_IMPL_METRICS.inc_download_completed_without_requester(); }, - Err(e) => { + (Err(e), _) => { // our caller is cancellation safe, but we might be racing with // another attempt to initialize. before we have cancellation // token support: these attempts should converge regardless of @@ -818,7 +831,7 @@ impl LayerInner { .in_current_span(), ); match rx.await { - Ok(Ok(())) => { + Ok((Ok(()), permit)) => { if let Some(reason) = self .needs_download() .await @@ -830,9 +843,10 @@ impl LayerInner { self.consecutive_failures.store(0, Ordering::Relaxed); - Ok(()) + Ok(permit) } - Ok(Err(e)) => { + Ok((Err(e), _permit)) => { + // FIXME: this should be with the spawned task and be cancellation sensitive let consecutive_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); tracing::error!(consecutive_failures, "layer file download failed: {e:#}");