Split RcuWriteGuard::store() into two stages: store and wait.

This makes it easier to explain which stages allow concurrent readers and
writers. Expand the comments with examples, too.
This commit is contained in:
Heikki Linnakangas
2022-09-02 00:34:37 +03:00
parent 40813adba2
commit f0a0d7bb7a
2 changed files with 111 additions and 39 deletions

View File

@@ -9,6 +9,36 @@
//! This implementation isn't wait-free; it uses an RwLock that is held for a
//! short duration when the value is read or updated.
//!
//! # Examples
//!
//! Read a value and do things with it while holding the guard:
//!
//! ```
//! # let rcu = utils::simple_rcu::Rcu::new(1);
//! {
//! let read = rcu.read();
//! println!("the current value is {}", *read);
//! // exiting the scope drops the read-guard, and allows concurrent writers
//! // to finish.
//! }
//! ```
//!
//! Increment the value by one, and wait for old readers to finish:
//!
//! ```
//! # let rcu = utils::simple_rcu::Rcu::new(1);
//! let write_guard = rcu.lock_for_write();
//!
//! // NB: holding `write_guard` blocks new readers and writers. Keep this section short!
//! let new_value = *write_guard + 1;
//!
//! let waitlist = write_guard.store_and_unlock(new_value); // consumes `write_guard`
//!
//! // Concurrent reads and writes are now possible again. Wait for all the readers
//! // that still observe the old value to finish.
//! waitlist.wait();
//! ```
//!
#![warn(missing_docs)]
use std::ops::Deref;
@@ -84,9 +114,10 @@ impl<V> Rcu<V> {
/// used to read the current value, and to store a new value.
///
/// Note: holding the write-guard blocks concurrent readers, so you should
/// finish the update and drop the guard quickly!
/// finish the update and drop the guard quickly! Multiple writers can be
/// waiting on the RcuWriteGuard::store step at the same time, however.
///
pub fn write(&self) -> RcuWriteGuard<'_, V> {
pub fn lock_for_write(&self) -> RcuWriteGuard<'_, V> {
let inner = self.inner.write().unwrap();
RcuWriteGuard { inner }
}
@@ -108,7 +139,13 @@ impl<V> Deref for RcuReadGuard<V> {
}
///
/// Read guard returned by `read`
/// Write guard returned by `write`
///
/// NB: Holding this guard blocks all concurrent `read` and `write` calls, so
/// it should only be held for a short duration!
///
/// Calling `store` consumes the guard, making new reads and new writes possible
/// again.
///
pub struct RcuWriteGuard<'a, V> {
inner: RwLockWriteGuard<'a, RcuInner<V>>,
@@ -126,13 +163,11 @@ impl<'a, V> RcuWriteGuard<'a, V> {
///
/// Store a new value. The new value will be written to the Rcu immediately,
/// and will be immediately seen by any `read` calls that start afterwards.
/// But if there are any readers still holding onto the old value, or any
/// even older values, this will await until they have been released.
///
/// This will drop the write-guard before it starts waiting for the reads to
/// finish, so a new write operation can begin before this functio returns.
/// Returns a list of readers that can see old values. You can call `wait()`
/// on it to wait for them to finish.
///
pub fn store(mut self, new_val: V) {
pub fn store_and_unlock(mut self, new_val: V) -> RcuWaitList {
let new_cell = Arc::new(RcuCell::new(new_val));
let mut watches = Vec::new();
@@ -151,11 +186,23 @@ impl<'a, V> RcuWriteGuard<'a, V> {
}
});
}
drop(self);
RcuWaitList(watches)
}
}
///
/// List of readers who can still see old values.
///
pub struct RcuWaitList(Vec<SyncSender<()>>);
impl RcuWaitList {
///
/// Wait for old readers to finish.
///
pub fn wait(mut self) {
// after all the old_cells are no longer in use, we're done
for w in watches.iter_mut() {
// This will block until the Receiver is closed. That happens then
for w in self.0.iter_mut() {
// This will block until the Receiver is closed. That happens when
// the RcuCell is dropped.
#[allow(clippy::single_match)]
match w.send(()) {
@@ -177,41 +224,66 @@ mod tests {
use std::time::Duration;
#[test]
fn basic() {
let rcu = Arc::new(Rcu::new(1));
fn two_writers() {
let rcu = Rcu::new(1);
let read1 = rcu.read();
assert_eq!(*read1, 1);
let write2 = rcu.lock_for_write();
assert_eq!(*write2, 1);
let wait2 = write2.store_and_unlock(2);
let read2 = rcu.read();
assert_eq!(*read2, 2);
let write3 = rcu.lock_for_write();
assert_eq!(*write3, 2);
let wait3 = write3.store_and_unlock(3);
// new reader can see the new value, and old readers continue to see the old values.
let read3 = rcu.read();
assert_eq!(*read3, 3);
assert_eq!(*read2, 2);
assert_eq!(*read1, 1);
let log = Arc::new(Mutex::new(Vec::new()));
let a = rcu.read();
assert_eq!(*a, 1);
log.lock().unwrap().push("one");
let (rcu_clone, log_clone) = (Arc::clone(&rcu), Arc::clone(&log));
let thread = spawn(move || {
log_clone.lock().unwrap().push("store two start");
let write_guard = rcu_clone.write();
assert_eq!(*write_guard, 1);
write_guard.store(2);
log_clone.lock().unwrap().push("store two done");
// Wait for the old readers to finish in separate threads.
let log_clone = Arc::clone(&log);
let thread2 = spawn(move || {
wait2.wait();
log_clone.lock().unwrap().push("wait2 done");
});
let log_clone = Arc::clone(&log);
let thread3 = spawn(move || {
wait3.wait();
log_clone.lock().unwrap().push("wait3 done");
});
// without this sleep the test can pass on accident if the writer is slow
sleep(Duration::from_secs(1));
sleep(Duration::from_millis(500));
// new read should see the new value
let b = rcu.read();
assert_eq!(*b, 2);
// Release first reader. This allows first write to finish, but calling
// wait() on the second one would still block.
log.lock().unwrap().push("dropping read1");
drop(read1);
thread2.join().unwrap();
// old guard still sees the old value
assert_eq!(*a, 1);
sleep(Duration::from_millis(500));
// Release the old guard. This lets the store in the thread to finish.
log.lock().unwrap().push("release a");
drop(a);
thread.join().unwrap();
// Release second reader, and finish second writer.
log.lock().unwrap().push("dropping read2");
drop(read2);
thread3.join().unwrap();
assert_eq!(
log.lock().unwrap().as_slice(),
&["one", "store two start", "release a", "store two done",]
&[
"dropping read1",
"wait2 done",
"dropping read2",
"wait3 done"
]
);
}
}

View File

@@ -2046,14 +2046,14 @@ impl Timeline {
//
// The GC cutoff should only ever move forwards.
{
let write_guard = self.latest_gc_cutoff_lsn.write();
let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
ensure!(
*write_guard <= new_gc_cutoff,
"Cannot move GC cutoff LSN backwards (was {}, new {})",
*write_guard,
new_gc_cutoff
);
write_guard.store(new_gc_cutoff);
write_guard.store_and_unlock(new_gc_cutoff).wait();
}
info!("GC starting");