From 31be301ef38b89da2f7d532ba3e869ea35b78514 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 7 Dec 2023 10:20:40 +0200 Subject: [PATCH] Make simple_rcu::RcuWaitList::wait() async (#6046) The gc_timeline() function is async, but it calls the synchronous wait() function. In the worst case, that could lead to a deadlock by using up all tokio executor threads. In the passing, fix a few typos in comments. Fixes issue #6045. --------- Co-authored-by: Joonas Koivunen --- libs/utils/src/simple_rcu.rs | 71 ++++++++++++++++--------------- pageserver/src/tenant/timeline.rs | 9 ++-- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/libs/utils/src/simple_rcu.rs b/libs/utils/src/simple_rcu.rs index 177a839d75..dc4a599111 100644 --- a/libs/utils/src/simple_rcu.rs +++ b/libs/utils/src/simple_rcu.rs @@ -1,10 +1,10 @@ //! //! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat //! similar to a lock, but it allows readers to "hold on" to an old value of RCU -//! without blocking writers, and allows writing a new values without blocking -//! readers. When you update the new value, the new value is immediately visible +//! without blocking writers, and allows writing a new value without blocking +//! readers. When you update the value, the new value is immediately visible //! to new readers, but the update waits until all existing readers have -//! finishe, so that no one sees the old value anymore. +//! finished, so that on return, no one sees the old value anymore. //! //! This implementation isn't wait-free; it uses an RwLock that is held for a //! short duration when the value is read or updated. @@ -26,6 +26,7 @@ //! Increment the value by one, and wait for old readers to finish: //! //! ``` +//! # async fn dox() { //! # let rcu = utils::simple_rcu::Rcu::new(1); //! let write_guard = rcu.lock_for_write(); //! @@ -36,15 +37,17 @@ //! //! // Concurrent reads and writes are now possible again. Wait for all the readers //! // that still observe the old value to finish. -//! waitlist.wait(); +//! waitlist.wait().await; +//! # } //! ``` //! #![warn(missing_docs)] use std::ops::Deref; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Weak}; -use std::sync::{Mutex, RwLock, RwLockWriteGuard}; +use std::sync::{RwLock, RwLockWriteGuard}; + +use tokio::sync::watch; /// /// Rcu allows multiple readers to read and hold onto a value without blocking @@ -68,22 +71,21 @@ struct RcuCell { value: V, /// A dummy channel. We never send anything to this channel. The point is - /// that when the RcuCell is dropped, any cloned Senders will be notified + /// that when the RcuCell is dropped, any subscribed Receivers will be notified /// that the channel is closed. Updaters can use this to wait out until the /// RcuCell has been dropped, i.e. until the old value is no longer in use. /// - /// We never do anything with the receiver, we just need to hold onto it so - /// that the Senders will be notified when it's dropped. But because it's - /// not Sync, we need a Mutex on it. - watch: (SyncSender<()>, Mutex>), + /// We never send anything to this, we just need to hold onto it so that the + /// Receivers will be notified when it's dropped. + watch: watch::Sender<()>, } impl RcuCell { fn new(value: V) -> Self { - let (watch_sender, watch_receiver) = sync_channel(0); + let (watch_sender, _) = watch::channel(()); RcuCell { value, - watch: (watch_sender, Mutex::new(watch_receiver)), + watch: watch_sender, } } } @@ -141,10 +143,10 @@ impl Deref for RcuReadGuard { /// /// Write guard returned by `write` /// -/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so -/// it should only be held for a short duration! +/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so it should only be +/// held for a short duration! /// -/// Calling `store` consumes the guard, making new reads and new writes possible +/// Calling [`Self::store_and_unlock`] consumes the guard, making new reads and new writes possible /// again. /// pub struct RcuWriteGuard<'a, V> { @@ -179,7 +181,7 @@ impl<'a, V> RcuWriteGuard<'a, V> { // the watches for any that do. self.inner.old_cells.retain(|weak| { if let Some(cell) = weak.upgrade() { - watches.push(cell.watch.0.clone()); + watches.push(cell.watch.subscribe()); true } else { false @@ -193,20 +195,20 @@ impl<'a, V> RcuWriteGuard<'a, V> { /// /// List of readers who can still see old values. /// -pub struct RcuWaitList(Vec>); +pub struct RcuWaitList(Vec>); impl RcuWaitList { /// /// Wait for old readers to finish. /// - pub fn wait(mut self) { + pub async fn wait(mut self) { // after all the old_cells are no longer in use, we're done for w in self.0.iter_mut() { // This will block until the Receiver is closed. That happens when // the RcuCell is dropped. #[allow(clippy::single_match)] - match w.send(()) { - Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"), + match w.changed().await { + Ok(_) => panic!("changed() unexpectedly succeeded on dummy channel"), Err(_) => { // closed, which means that the cell has been dropped, and // its value is no longer in use @@ -220,11 +222,10 @@ impl RcuWaitList { mod tests { use super::*; use std::sync::{Arc, Mutex}; - use std::thread::{sleep, spawn}; use std::time::Duration; - #[test] - fn two_writers() { + #[tokio::test] + async fn two_writers() { let rcu = Rcu::new(1); let read1 = rcu.read(); @@ -248,33 +249,35 @@ mod tests { assert_eq!(*read1, 1); let log = Arc::new(Mutex::new(Vec::new())); - // Wait for the old readers to finish in separate threads. + // Wait for the old readers to finish in separate tasks. let log_clone = Arc::clone(&log); - let thread2 = spawn(move || { - wait2.wait(); + let task2 = tokio::spawn(async move { + wait2.wait().await; log_clone.lock().unwrap().push("wait2 done"); }); let log_clone = Arc::clone(&log); - let thread3 = spawn(move || { - wait3.wait(); + let task3 = tokio::spawn(async move { + wait3.wait().await; log_clone.lock().unwrap().push("wait3 done"); }); // without this sleep the test can pass on accident if the writer is slow - sleep(Duration::from_millis(500)); + tokio::time::sleep(Duration::from_millis(100)).await; // Release first reader. This allows first write to finish, but calling - // wait() on the second one would still block. + // wait() on the 'task3' would still block. log.lock().unwrap().push("dropping read1"); drop(read1); - thread2.join().unwrap(); + task2.await.unwrap(); - sleep(Duration::from_millis(500)); + assert!(!task3.is_finished()); + + tokio::time::sleep(Duration::from_millis(100)).await; // Release second reader, and finish second writer. log.lock().unwrap().push("dropping read2"); drop(read2); - thread3.join().unwrap(); + task3.await.unwrap(); assert_eq!( log.lock().unwrap().as_slice(), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f666f1049f..882a5ef199 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -478,7 +478,7 @@ impl Timeline { .map(|ancestor| ancestor.timeline_id) } - /// Lock and get timeline's GC cuttof + /// Lock and get timeline's GC cutoff pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard { self.latest_gc_cutoff_lsn.read() } @@ -3971,7 +3971,7 @@ impl Timeline { // for details. This will block until the old value is no longer in use. // // The GC cutoff should only ever move forwards. - { + let waitlist = { let write_guard = self.latest_gc_cutoff_lsn.lock_for_write(); ensure!( *write_guard <= new_gc_cutoff, @@ -3979,8 +3979,9 @@ impl Timeline { *write_guard, new_gc_cutoff ); - write_guard.store_and_unlock(new_gc_cutoff).wait(); - } + write_guard.store_and_unlock(new_gc_cutoff) + }; + waitlist.wait().await; info!("GC starting");