feat(layer): cancellable get_or_maybe_download (#5744)

With the layer implementation as was done in #4938, it is possible via
cancellation to cause two concurrent downloads on the same path, due to
how `RemoteTimelineClient::download_remote_layer` does tempfiles. Thread
the init semaphore through the spawned task of downloading to make this
impossible to happen.
This commit is contained in:
Joonas Koivunen
2023-11-02 10:06:32 +02:00
committed by GitHub
parent 0b790b6d00
commit 2dca4c03fc
2 changed files with 105 additions and 47 deletions

View File

@@ -60,8 +60,8 @@ impl<T> OnceCell<T> {
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
@@ -72,28 +72,55 @@ impl<T> OnceCell<T> {
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.lock().unwrap();
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
}
}
}
/// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used
/// to complete initializing the inner value.
///
/// # Panics
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
// cannot assert that this permit is for self.inner.semaphore
let guard = self.inner.lock().unwrap();
if guard.init_semaphore.try_acquire().is_ok() {
drop(guard);
panic!("semaphore is of wrong origin");
}
Self::set0(value, guard)
}
fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner<T>>) -> Guard<'_, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
Guard(guard)
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
@@ -135,7 +162,7 @@ impl<'a, T> Guard<'a, T> {
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
@@ -145,11 +172,14 @@ impl<'a, T> Guard<'a, T> {
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, permit))
.map(|v| (v, InitPermit(permit)))
.expect("guard is not created unless value has been initialized")
}
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
#[cfg(test)]
mod tests {
use super::*;
@@ -185,11 +215,11 @@ mod tests {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|| {
.get_or_init(|permit| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>(i)
Ok::<_, Infallible>((i, permit))
}
})
.await
@@ -243,7 +273,7 @@ mod tests {
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
.await
.unwrap();
@@ -258,18 +288,32 @@ mod tests {
assert_eq!(*cell.get().unwrap(), reinit);
}
#[test]
fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
let (mol, permit) = cell.get().unwrap().take_and_deinit();
cell.set(5, permit);
assert_eq!(*cell.get().unwrap(), 5);
let (five, permit) = cell.get().unwrap().take_and_deinit();
assert_eq!(5, five);
cell.set(mol, permit);
assert_eq!(*cell.get().unwrap(), 42);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|| async { Err("whatever error") })
cell.get_or_init(|_permit| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.await
.unwrap();
assert_eq!(*g, "finally success");
@@ -281,11 +325,11 @@ mod tests {
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|| async {
let initializer = cell.get_or_init(|permit| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>("never reached")
Ok::<_, Infallible>(("never reached", permit))
});
tokio::select! {
@@ -298,7 +342,7 @@ mod tests {
assert!(cell.get().is_none());
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.await
.unwrap();
assert_eq!(*g, "now initialized");