mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
fix(heavier_once_cell): assertion failure can be hit (#6722)
@problame noticed that the `tokio::sync::AcquireError` branch assertion can be hit like in the added test. We haven't seen this yet in production, but I'd prefer not to see it there. There `take_and_deinit` is being used, but this race must be quite timing sensitive. Rework of earlier: #6652.
This commit is contained in:
@@ -69,37 +69,44 @@ impl<T> OnceCell<T> {
|
||||
F: FnOnce(InitPermit) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
|
||||
{
|
||||
let sem = {
|
||||
loop {
|
||||
let sem = {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
if guard.value.is_some() {
|
||||
return Ok(Guard(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
if !Arc::ptr_eq(&sem, &guard.init_semaphore) {
|
||||
// there was a take_and_deinit in between
|
||||
continue;
|
||||
}
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(Guard(guard));
|
||||
};
|
||||
|
||||
permit.forget();
|
||||
}
|
||||
|
||||
let permit = InitPermit(sem);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
let guard = self.inner.lock().unwrap();
|
||||
if guard.value.is_some() {
|
||||
return Ok(Guard(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire_owned().await
|
||||
};
|
||||
|
||||
match permit {
|
||||
Ok(permit) => {
|
||||
let permit = InitPermit(permit);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
let guard = self.inner.lock().unwrap();
|
||||
|
||||
Ok(Self::set0(value, guard))
|
||||
}
|
||||
Err(_closed) => {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(Guard(guard));
|
||||
}
|
||||
return Ok(Self::set0(value, guard));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,27 +204,41 @@ impl<'a, T> Guard<'a, T> {
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
|
||||
let mut swapped = Inner::default();
|
||||
let permit = swapped
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("we just created this");
|
||||
let sem = swapped.init_semaphore.clone();
|
||||
// acquire and forget right away, moving the control over to InitPermit
|
||||
sem.try_acquire().expect("we just created this").forget();
|
||||
std::mem::swap(&mut *self.0, &mut swapped);
|
||||
swapped
|
||||
.value
|
||||
.map(|v| (v, InitPermit(permit)))
|
||||
.map(|v| (v, InitPermit(sem)))
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
}
|
||||
|
||||
/// Type held by OnceCell (de)initializing task.
|
||||
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
///
|
||||
/// On drop, this type will return the permit.
|
||||
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
|
||||
|
||||
impl Drop for InitPermit {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(
|
||||
self.0.available_permits(),
|
||||
0,
|
||||
"InitPermit should only exist as the unique permit"
|
||||
);
|
||||
self.0.add_permits(1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::Future;
|
||||
|
||||
use super::*;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::{pin, Pin},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -380,4 +401,85 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(*g, "now initialized");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reproduce_init_take_deinit_race() {
|
||||
init_take_deinit_scenario(|cell, factory| {
|
||||
Box::pin(async {
|
||||
cell.get_or_init(factory).await.unwrap();
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
|
||||
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
|
||||
|
||||
/// Reproduce an assertion failure.
|
||||
///
|
||||
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
|
||||
/// We currently only have one, but the structure is kept.
|
||||
async fn init_take_deinit_scenario<F>(init_way: F)
|
||||
where
|
||||
F: for<'a> Fn(
|
||||
&'a OnceCell<&'static str>,
|
||||
BoxedInitFunction<&'static str, Infallible>,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
|
||||
{
|
||||
let cell = OnceCell::default();
|
||||
|
||||
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
|
||||
// on the same semaphore.
|
||||
let permit = cell
|
||||
.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.unwrap();
|
||||
|
||||
let mut t1 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
|
||||
));
|
||||
|
||||
let mut t2 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
|
||||
));
|
||||
|
||||
// drive t2 first to the init_semaphore -- the timeout will be hit once t2 future can
|
||||
// no longer make progress
|
||||
tokio::select! {
|
||||
_ = &mut t2 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// followed by t1 in the init_semaphore
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// now let t2 proceed and initialize
|
||||
drop(permit);
|
||||
t2.await;
|
||||
|
||||
let (s, permit) = { cell.get().unwrap().take_and_deinit() };
|
||||
assert_eq!("t2", s);
|
||||
|
||||
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
|
||||
// the new one.
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// only now we get to initialize it
|
||||
drop(permit);
|
||||
t1.await;
|
||||
|
||||
assert_eq!("t1", *cell.get().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user