diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 1b011bb73a..fa7a37adf1 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -8,6 +8,9 @@ pub mod lsn; /// SeqWait allows waiting for a future sequence number to arrive pub mod seqwait; +/// A simple Read-Copy-Update implementation. +pub mod simple_rcu; + /// append only ordered map implemented with a Vec pub mod vec_map; diff --git a/libs/utils/src/simple_rcu.rs b/libs/utils/src/simple_rcu.rs new file mode 100644 index 0000000000..24423815ab --- /dev/null +++ b/libs/utils/src/simple_rcu.rs @@ -0,0 +1,217 @@ +//! +//! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat +//! similar to a lock, but it allows readers to "hold on" to an old value of RCU +//! without blocking writers, and allows writing a new values without blocking +//! readers. When you update the new value, the new value is immediately visible +//! to new readers, but the update waits until all existing readers have +//! finishe, so that no one sees the old value anymore. +//! +//! This implementation isn't wait-free; it uses an RwLock that is held for a +//! short duration when the value is read or updated. +//! +#![warn(missing_docs)] + +use std::ops::Deref; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::sync::{Arc, Weak}; +use std::sync::{Mutex, RwLock, RwLockWriteGuard}; + +/// +/// Rcu allows multiple readers to read and hold onto a value without blocking +/// (for very long). Storing to the Rcu updates the value, making new readers +/// immediately see the new value, but it also waits for all current readers to +/// finish. +/// +pub struct Rcu { + inner: RwLock>, +} + +struct RcuInner { + current_cell: Arc>, + old_cells: Vec>>, +} + +/// +/// RcuCell holds one value. It can be the latest one, or an old one. +/// +struct RcuCell { + value: V, + + /// A dummy channel. We never send anything to this channel. The point is + /// that when the RcuCell is dropped, any cloned Senders will be notified + /// that the channel is closed. Updaters can use this to wait out until the + /// RcuCell has been dropped, i.e. until the old value is no longer in use. + /// + /// We never do anything with the receiver, we just need to hold onto it so + /// that the Senders will be notified when it's dropped. But because it's + /// not Sync, we need a Mutex on it. + watch: (SyncSender<()>, Mutex>), +} + +impl RcuCell { + fn new(value: V) -> Self { + let (watch_sender, watch_receiver) = sync_channel(0); + RcuCell { + value, + watch: (watch_sender, Mutex::new(watch_receiver)), + } + } +} + +impl Rcu { + /// Create a new `Rcu`, initialized to `starting_val` + pub fn new(starting_val: V) -> Self { + let inner = RcuInner { + current_cell: Arc::new(RcuCell::new(starting_val)), + old_cells: Vec::new(), + }; + Self { + inner: RwLock::new(inner), + } + } + + /// + /// Read current value. Any store() calls will block until the returned + /// guard object is dropped. + /// + pub fn read(&self) -> RcuReadGuard { + let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell); + RcuReadGuard { cell: current_cell } + } + + /// + /// Lock the current value for updating. Returns a guard object that can be + /// used to read the current value, and to store a new value. + /// + /// Note: holding the write-guard blocks concurrent readers, so you should + /// finish the update and drop the guard quickly! + /// + pub fn write(&self) -> RcuWriteGuard<'_, V> { + let inner = self.inner.write().unwrap(); + RcuWriteGuard { inner } + } +} + +/// +/// Read guard returned by `read` +/// +pub struct RcuReadGuard { + cell: Arc>, +} + +impl Deref for RcuReadGuard { + type Target = V; + + fn deref(&self) -> &V { + &self.cell.value + } +} + +/// +/// Read guard returned by `read` +/// +pub struct RcuWriteGuard<'a, V> { + inner: RwLockWriteGuard<'a, RcuInner>, +} + +impl<'a, V> Deref for RcuWriteGuard<'a, V> { + type Target = V; + + fn deref(&self) -> &V { + &self.inner.current_cell.value + } +} + +impl<'a, V> RcuWriteGuard<'a, V> { + /// + /// Store a new value. The new value will be written to the Rcu immediately, + /// and will be immediately seen by any `read` calls that start afterwards. + /// But if there are any readers still holding onto the old value, or any + /// even older values, this will await until they have been released. + /// + /// This will drop the write-guard before it starts waiting for the reads to + /// finish, so a new write operation can begin before this functio returns. + /// + pub fn store(mut self, new_val: V) { + let new_cell = Arc::new(RcuCell::new(new_val)); + + let mut watches = Vec::new(); + { + let old = std::mem::replace(&mut self.inner.current_cell, new_cell); + self.inner.old_cells.push(Arc::downgrade(&old)); + + // cleanup old cells that no longer have any readers, and collect + // the watches for any that do. + self.inner.old_cells.retain(|weak| { + if let Some(cell) = weak.upgrade() { + watches.push(cell.watch.0.clone()); + true + } else { + false + } + }); + } + drop(self); + + // after all the old_cells are no longer in use, we're done + for w in watches.iter_mut() { + // This will block until the Receiver is closed. That happens then + // the RcuCell is dropped. + #[allow(clippy::single_match)] + match w.send(()) { + Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"), + Err(_) => { + // closed, which means that the cell has been dropped, and + // its value is no longer in use + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use std::thread::{sleep, spawn}; + use std::time::Duration; + + #[test] + fn basic() { + let rcu = Arc::new(Rcu::new(1)); + let log = Arc::new(Mutex::new(Vec::new())); + + let a = rcu.read(); + assert_eq!(*a, 1); + log.lock().unwrap().push("one"); + + let (rcu_clone, log_clone) = (Arc::clone(&rcu), Arc::clone(&log)); + let thread = spawn(move || { + log_clone.lock().unwrap().push("store two start"); + let write_guard = rcu_clone.write(); + assert_eq!(*write_guard, 1); + write_guard.store(2); + log_clone.lock().unwrap().push("store two done"); + }); + // without this sleep the test can pass on accident if the writer is slow + sleep(Duration::from_secs(1)); + + // new read should see the new value + let b = rcu.read(); + assert_eq!(*b, 2); + + // old guard still sees the old value + assert_eq!(*a, 1); + + // Release the old guard. This lets the store in the thread to finish. + log.lock().unwrap().push("release a"); + drop(a); + + thread.join().unwrap(); + + assert_eq!( + log.lock().unwrap().as_slice(), + &["one", "store two start", "release a", "store two done",] + ); + } +} diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 36b8e3eb9e..73c30b51b8 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -254,7 +254,8 @@ impl Repository { src_timeline .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) .context(format!( - "invalid branch start lsn: less than latest GC cutoff {latest_gc_cutoff_lsn}" + "invalid branch start lsn: less than latest GC cutoff {}", + *latest_gc_cutoff_lsn ))?; { let gc_info = src_timeline.gc_info.read().unwrap(); @@ -290,7 +291,7 @@ impl Repository { dst_prev, Some(src), start_lsn, - *src_timeline.latest_gc_cutoff_lsn.read().unwrap(), + *src_timeline.latest_gc_cutoff_lsn.read(), src_timeline.initdb_lsn, ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?; diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 5f3d669dc1..1a941affe5 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -14,7 +14,7 @@ use std::fs; use std::ops::{Deref, Range}; use std::path::PathBuf; use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::time::{Duration, Instant, SystemTime}; use metrics::{ @@ -46,6 +46,7 @@ use postgres_ffi::v14::xlog_utils::to_pg_timestamp; use utils::{ lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, + simple_rcu::{Rcu, RcuReadGuard}, zid::{ZTenantId, ZTimelineId}, }; @@ -367,7 +368,7 @@ pub struct Timeline { layer_removal_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected - pub latest_gc_cutoff_lsn: RwLock, + pub latest_gc_cutoff_lsn: Rcu, // List of child timelines and their branch points. This is needed to avoid // garbage collecting data that is still needed by the child timelines. @@ -478,8 +479,8 @@ impl Timeline { } /// Lock and get timeline's GC cuttof - pub fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard { - self.latest_gc_cutoff_lsn.read().unwrap() + pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard { + self.latest_gc_cutoff_lsn.read() } /// Look up given page version. @@ -594,7 +595,7 @@ impl Timeline { pub fn check_lsn_is_in_scope( &self, lsn: Lsn, - latest_gc_cutoff_lsn: &RwLockReadGuard, + latest_gc_cutoff_lsn: &RcuReadGuard, ) -> Result<()> { ensure!( lsn >= **latest_gc_cutoff_lsn, @@ -729,7 +730,7 @@ impl Timeline { pitr_cutoff: Lsn(0), }), - latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), + latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), current_logical_size: AtomicI64::new(0), @@ -1377,7 +1378,7 @@ impl Timeline { ondisk_prev_record_lsn, ancestor_timelineid, self.ancestor_lsn, - *self.latest_gc_cutoff_lsn.read().unwrap(), + *self.latest_gc_cutoff_lsn.read(), self.initdb_lsn, ); @@ -2032,9 +2033,21 @@ impl Timeline { let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %new_gc_cutoff).entered(); - // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. - // See branch_timeline() for details. - *self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff; + // We need to ensure that no one tries to read page versions or create + // branches at a point before latest_gc_cutoff_lsn. See branch_timeline() + // for details. This will block until the old value is no longer in use. + // + // The GC cutoff should only ever move forwards. + { + let write_guard = self.latest_gc_cutoff_lsn.write(); + ensure!( + *write_guard <= new_gc_cutoff, + "Cannot move GC cutoff LSN backwards (was {}, new {})", + *write_guard, + new_gc_cutoff + ); + write_guard.store(new_gc_cutoff); + } info!("GC starting"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ebcff1f2ac..fbc70f7690 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -17,13 +17,14 @@ use std::io::{self, Read}; use std::net::TcpListener; use std::str; use std::str::FromStr; -use std::sync::{Arc, RwLockReadGuard}; +use std::sync::Arc; use tracing::*; use utils::{ auth::{self, Claims, JwtAuth, Scope}, lsn::Lsn, postgres_backend::{self, is_socket_read_timed_out, AuthType, PostgresBackend}, pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC}, + simple_rcu::RcuReadGuard, zid::{ZTenantId, ZTimelineId}, }; @@ -639,7 +640,7 @@ impl PageServerHandler { timeline: &Timeline, mut lsn: Lsn, latest: bool, - latest_gc_cutoff_lsn: &RwLockReadGuard, + latest_gc_cutoff_lsn: &RcuReadGuard, ) -> Result { if latest { // Latest page version was requested. If LSN is given, it is a hint