From f0a0d7bb7ad470e72cba404a7e857e20a8b6de55 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 2 Sep 2022 00:34:37 +0300 Subject: [PATCH] Split RcuWriteGuard::store() into two stages: store and wait. This makes it easier to explain which stages allow concurrent readers and writers. Expand the comments with examples, too. --- libs/utils/src/simple_rcu.rs | 146 +++++++++++++----- pageserver/src/layered_repository/timeline.rs | 4 +- 2 files changed, 111 insertions(+), 39 deletions(-) diff --git a/libs/utils/src/simple_rcu.rs b/libs/utils/src/simple_rcu.rs index 24423815ab..177a839d75 100644 --- a/libs/utils/src/simple_rcu.rs +++ b/libs/utils/src/simple_rcu.rs @@ -9,6 +9,36 @@ //! This implementation isn't wait-free; it uses an RwLock that is held for a //! short duration when the value is read or updated. //! +//! # Examples +//! +//! Read a value and do things with it while holding the guard: +//! +//! ``` +//! # let rcu = utils::simple_rcu::Rcu::new(1); +//! { +//! let read = rcu.read(); +//! println!("the current value is {}", *read); +//! // exiting the scope drops the read-guard, and allows concurrent writers +//! // to finish. +//! } +//! ``` +//! +//! Increment the value by one, and wait for old readers to finish: +//! +//! ``` +//! # let rcu = utils::simple_rcu::Rcu::new(1); +//! let write_guard = rcu.lock_for_write(); +//! +//! // NB: holding `write_guard` blocks new readers and writers. Keep this section short! +//! let new_value = *write_guard + 1; +//! +//! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard` +//! +//! // Concurrent reads and writes are now possible again. Wait for all the readers +//! // that still observe the old value to finish. +//! waitlist.wait(); +//! ``` +//! #![warn(missing_docs)] use std::ops::Deref; @@ -84,9 +114,10 @@ impl Rcu { /// used to read the current value, and to store a new value. /// /// Note: holding the write-guard blocks concurrent readers, so you should - /// finish the update and drop the guard quickly! + /// finish the update and drop the guard quickly! Multiple writers can be + /// waiting on the RcuWriteGuard::store step at the same time, however. /// - pub fn write(&self) -> RcuWriteGuard<'_, V> { + pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> { let inner = self.inner.write().unwrap(); RcuWriteGuard { inner } } @@ -108,7 +139,13 @@ impl Deref for RcuReadGuard { } /// -/// Read guard returned by `read` +/// Write guard returned by `write` +/// +/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so +/// it should only be held for a short duration! +/// +/// Calling `store` consumes the guard, making new reads and new writes possible +/// again. /// pub struct RcuWriteGuard<'a, V> { inner: RwLockWriteGuard<'a, RcuInner>, @@ -126,13 +163,11 @@ impl<'a, V> RcuWriteGuard<'a, V> { /// /// Store a new value. The new value will be written to the Rcu immediately, /// and will be immediately seen by any `read` calls that start afterwards. - /// But if there are any readers still holding onto the old value, or any - /// even older values, this will await until they have been released. /// - /// This will drop the write-guard before it starts waiting for the reads to - /// finish, so a new write operation can begin before this functio returns. + /// Returns a list of readers that can see old values. You can call `wait()` + /// on it to wait for them to finish. /// - pub fn store(mut self, new_val: V) { + pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList { let new_cell = Arc::new(RcuCell::new(new_val)); let mut watches = Vec::new(); @@ -151,11 +186,23 @@ impl<'a, V> RcuWriteGuard<'a, V> { } }); } - drop(self); + RcuWaitList(watches) + } +} +/// +/// List of readers who can still see old values. +/// +pub struct RcuWaitList(Vec>); + +impl RcuWaitList { + /// + /// Wait for old readers to finish. + /// + pub fn wait(mut self) { // after all the old_cells are no longer in use, we're done - for w in watches.iter_mut() { - // This will block until the Receiver is closed. That happens then + for w in self.0.iter_mut() { + // This will block until the Receiver is closed. That happens when // the RcuCell is dropped. #[allow(clippy::single_match)] match w.send(()) { @@ -177,41 +224,66 @@ mod tests { use std::time::Duration; #[test] - fn basic() { - let rcu = Arc::new(Rcu::new(1)); + fn two_writers() { + let rcu = Rcu::new(1); + + let read1 = rcu.read(); + assert_eq!(*read1, 1); + + let write2 = rcu.lock_for_write(); + assert_eq!(*write2, 1); + let wait2 = write2.store_and_unlock(2); + + let read2 = rcu.read(); + assert_eq!(*read2, 2); + + let write3 = rcu.lock_for_write(); + assert_eq!(*write3, 2); + let wait3 = write3.store_and_unlock(3); + + // new reader can see the new value, and old readers continue to see the old values. + let read3 = rcu.read(); + assert_eq!(*read3, 3); + assert_eq!(*read2, 2); + assert_eq!(*read1, 1); + let log = Arc::new(Mutex::new(Vec::new())); - - let a = rcu.read(); - assert_eq!(*a, 1); - log.lock().unwrap().push("one"); - - let (rcu_clone, log_clone) = (Arc::clone(&rcu), Arc::clone(&log)); - let thread = spawn(move || { - log_clone.lock().unwrap().push("store two start"); - let write_guard = rcu_clone.write(); - assert_eq!(*write_guard, 1); - write_guard.store(2); - log_clone.lock().unwrap().push("store two done"); + // Wait for the old readers to finish in separate threads. + let log_clone = Arc::clone(&log); + let thread2 = spawn(move || { + wait2.wait(); + log_clone.lock().unwrap().push("wait2 done"); }); + let log_clone = Arc::clone(&log); + let thread3 = spawn(move || { + wait3.wait(); + log_clone.lock().unwrap().push("wait3 done"); + }); + // without this sleep the test can pass on accident if the writer is slow - sleep(Duration::from_secs(1)); + sleep(Duration::from_millis(500)); - // new read should see the new value - let b = rcu.read(); - assert_eq!(*b, 2); + // Release first reader. This allows first write to finish, but calling + // wait() on the second one would still block. + log.lock().unwrap().push("dropping read1"); + drop(read1); + thread2.join().unwrap(); - // old guard still sees the old value - assert_eq!(*a, 1); + sleep(Duration::from_millis(500)); - // Release the old guard. This lets the store in the thread to finish. - log.lock().unwrap().push("release a"); - drop(a); - - thread.join().unwrap(); + // Release second reader, and finish second writer. + log.lock().unwrap().push("dropping read2"); + drop(read2); + thread3.join().unwrap(); assert_eq!( log.lock().unwrap().as_slice(), - &["one", "store two start", "release a", "store two done",] + &[ + "dropping read1", + "wait2 done", + "dropping read2", + "wait3 done" + ] ); } } diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index a624a3ccf5..8b90cc4e6b 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -2046,14 +2046,14 @@ impl Timeline { // // The GC cutoff should only ever move forwards. { - let write_guard = self.latest_gc_cutoff_lsn.write(); + let write_guard = self.latest_gc_cutoff_lsn.lock_for_write(); ensure!( *write_guard <= new_gc_cutoff, "Cannot move GC cutoff LSN backwards (was {}, new {})", *write_guard, new_gc_cutoff ); - write_guard.store(new_gc_cutoff); + write_guard.store_and_unlock(new_gc_cutoff).wait(); } info!("GC starting");