diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 703a6dfd52..a3aee45b58 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -110,6 +110,49 @@ impl OnceCell { } } + /// Returns a guard to an existing initialized value, or returns an unique initialization + /// permit which can be used to initialize this `OnceCell` using `OnceCell::set`. + pub async fn get_or_init_detached(&self) -> Result, InitPermit> { + // It looks like OnceCell::get_or_init could be implemented using this method instead of + // duplication. However, that makes the future be !Send due to possibly holding on to the + // MutexGuard over an await point. + loop { + let sem = { + let guard = self.inner.lock().unwrap(); + if guard.value.is_some() { + return Ok(Guard(guard)); + } + guard.init_semaphore.clone() + }; + + { + let permit = { + // increment the count for the duration of queued + let _guard = CountWaitingInitializers::start(self); + sem.acquire().await + }; + + let Ok(permit) = permit else { + let guard = self.inner.lock().unwrap(); + if !Arc::ptr_eq(&sem, &guard.init_semaphore) { + // there was a take_and_deinit in between + continue; + } + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(Guard(guard)); + }; + + permit.forget(); + } + + let permit = InitPermit(sem); + return Err(permit); + } + } + /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used /// to complete initializing the inner value. /// @@ -481,4 +524,39 @@ mod tests { assert_eq!("t1", *cell.get().unwrap()); } + + #[tokio::test(start_paused = true)] + async fn detached_init_smoke() { + let target = OnceCell::default(); + + let Err(permit) = target.get_or_init_detached().await else { + unreachable!("it is not initialized") + }; + + tokio::time::timeout( + std::time::Duration::from_secs(3600 * 24 * 7 * 365), + target.get_or_init(|permit2| async { Ok::<_, Infallible>((11, permit2)) }), + ) + .await + .expect_err("should timeout since we are already holding the permit"); + + target.set(42, permit); + + let (_answer, permit) = { + let mut guard = target + .get_or_init(|permit| async { Ok::<_, Infallible>((11, permit)) }) + .await + .unwrap(); + + assert_eq!(*guard, 42); + + guard.take_and_deinit() + }; + + assert!(target.get().is_none()); + + target.set(11, permit); + + assert_eq!(*target.get().unwrap(), 11); + } }