From f5b0e723cba810a25110f31c6240bbbb6bccb17a Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 7 Feb 2024 06:50:43 +0000 Subject: [PATCH] test: rewrite back from macro_rules --- libs/utils/src/sync/heavier_once_cell.rs | 148 +++++++++++++---------- 1 file changed, 85 insertions(+), 63 deletions(-) diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 19950dca93..81625b907e 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -304,9 +304,12 @@ impl Drop for InitPermit { #[cfg(test)] mod tests { + use futures::Future; + use super::*; use std::{ convert::Infallible, + pin::{pin, Pin}, sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; @@ -474,74 +477,93 @@ mod tests { assert_eq!(*g, "now initialized"); } - macro_rules! assertion_failure_reproduction { - ($method:ident) => {{ - let cell = OnceCell::default(); - - // use this permit to force two instances to clone out the semaphore and move to force - // two tasks t1 and t2 to be awaiting for a permit. - let permit = cell - .inner - .read() - .await - .init_semaphore - .clone() - .try_acquire_owned() - .unwrap(); - - let t1 = async { - cell.$method(|init| async { Ok::<_, Infallible>(("t1", init)) }) - .await - }; - let mut t1 = std::pin::pin!(t1); - - let t2 = async { - cell.$method(|init| async { Ok::<_, Infallible>(("t2", init)) }) - .await - }; - let mut t2 = std::pin::pin!(t2); - - // drive t2 first to the queue - tokio::select! { - _ = &mut t2 => unreachable!("it cannot get permit"), - _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} - } - - // followed by t1 in the queue - tokio::select! { - _ = &mut t1 => unreachable!("it cannot get permit"), - _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} - } - - // now let "the other" proceed and initialize - drop(permit); - drop(t2.await); - - let (s, permit) = { cell.get_mut().await.unwrap().take_and_deinit() }; - assert_eq!("t2", s); - - // now t1 will see the original semaphore as closed. it cannot yet get a permit from - // the new one. - tokio::select! { - _ = &mut t1 => unreachable!("it cannot get permit"), - _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} - } - - // only now we get to initialize it - drop(permit); - drop(t1.await); - - assert_eq!("t1", *cell.get().await.unwrap()); - }}; - } - #[tokio::test(start_paused = true)] async fn reproduce_init_take_deinit_race() { - assertion_failure_reproduction!(get_or_init); + init_take_deinit_scenario(|cell, factory| { + Box::pin(async { + cell.get_or_init(factory).await.unwrap(); + }) + }) + .await; } #[tokio::test(start_paused = true)] async fn reproduce_init_take_deinit_race_mut() { - assertion_failure_reproduction!(get_mut_or_init); + init_take_deinit_scenario(|cell, factory| { + Box::pin(async { + cell.get_mut_or_init(factory).await.unwrap(); + }) + }) + .await; + } + + type BoxedInitFuture = Pin>>>; + type BoxedInitFunction = Box BoxedInitFuture>; + + /// Reproduce an assertion failure with both initialization methods. + /// + /// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`. + /// Alternative would be a macro_rules! but that is the last resort. + async fn init_take_deinit_scenario(init_way: F) + where + F: for<'a> Fn( + &'a OnceCell<&'static str>, + BoxedInitFunction<&'static str, Infallible>, + ) -> Pin + 'a>>, + { + let cell = OnceCell::default(); + + // acquire the init_semaphore only permit to drive initializing tasks in order to waiting + // on the same semaphore. + let permit = cell + .inner + .read() + .await + .init_semaphore + .clone() + .try_acquire_owned() + .unwrap(); + + let mut t1 = pin!(init_way( + &cell, + Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })), + )); + + let mut t2 = pin!(init_way( + &cell, + Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })), + )); + + // drive t2 first to the init_semaphore + tokio::select! { + _ = &mut t2 => unreachable!("it cannot get permit"), + _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} + } + + // followed by t1 in the init_semaphore + tokio::select! { + _ = &mut t1 => unreachable!("it cannot get permit"), + _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} + } + + // now let t2 proceed and initialize + drop(permit); + t2.await; + + let (s, permit) = { cell.get_mut().await.unwrap().take_and_deinit() }; + assert_eq!("t2", s); + + // now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from + // the new one. + tokio::select! { + _ = &mut t1 => unreachable!("it cannot get permit"), + _ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {} + } + + // only now we get to initialize it + drop(permit); + t1.await; + + assert_eq!("t1", *cell.get().await.unwrap()); } }