mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Introduce RCU, and use it to protect latest_gc_cutoff_lsn.
`latest_gc_cutoff_lsn` tracks the cutoff point where GC has been performed. Anything older than the cutoff might already have been GC'd away, and cannot be queried by get_page_at_lsn requests. It's protected by an RWLock. Whenever a get_page_at_lsn requests comes in, it first grabs the lock and reads the current `latest_gc_cutoff`, and holds the lock it until the request has been served. The lock ensures that GC doesn't start concurrently and remove page versions that we still need to satisfy the request. With the lock, get_page_at_lsn request could potentially be blocked for a long time. GC only holds the lock in exclusive mode for a short duration, but depending on how whether the RWLock is "fair", a read request might be queued behind the GC's exclusive request, which in turn might be queued behind a long-running read operation, like a basebackup. If the lock implementation is not fair, i.e. if a reader can always jump the queue if the lock is already held in read mode, then another problem arises: GC might be starved if a constant stream of GetPage requests comes in. To avoid the long wait or starvation, introduce a Read-Copy-Update mechanism to replace the lock on `latest_gc_cutoff_lsn`. With the RCU, reader can always read the latest value without blocking (except for a very short duration if the lock protecting the RCU is contended; that's comparable to a spinlock). And a writer can always write a new value without waiting for readers to finish using the old value. The old readers will continue to see the old value through their guard object, while new readers will see the new value. This is purely theoretical ATM, we don't have any reports of either starvation or blocking behind GC happening in practice. But it's simple to fix, so let's nip that problem in the bud.
This commit is contained in:
@@ -8,6 +8,9 @@ pub mod lsn;
|
||||
/// SeqWait allows waiting for a future sequence number to arrive
|
||||
pub mod seqwait;
|
||||
|
||||
/// A simple Read-Copy-Update implementation.
|
||||
pub mod simple_rcu;
|
||||
|
||||
/// append only ordered map implemented with a Vec
|
||||
pub mod vec_map;
|
||||
|
||||
|
||||
217
libs/utils/src/simple_rcu.rs
Normal file
217
libs/utils/src/simple_rcu.rs
Normal file
@@ -0,0 +1,217 @@
|
||||
//!
|
||||
//! RCU stands for Read-Copy-Update. It's a synchronization mechanism somewhat
|
||||
//! similar to a lock, but it allows readers to "hold on" to an old value of RCU
|
||||
//! without blocking writers, and allows writing a new values without blocking
|
||||
//! readers. When you update the new value, the new value is immediately visible
|
||||
//! to new readers, but the update waits until all existing readers have
|
||||
//! finishe, so that no one sees the old value anymore.
|
||||
//!
|
||||
//! This implementation isn't wait-free; it uses an RwLock that is held for a
|
||||
//! short duration when the value is read or updated.
|
||||
//!
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::{Mutex, RwLock, RwLockWriteGuard};
|
||||
|
||||
///
|
||||
/// Rcu allows multiple readers to read and hold onto a value without blocking
|
||||
/// (for very long). Storing to the Rcu updates the value, making new readers
|
||||
/// immediately see the new value, but it also waits for all current readers to
|
||||
/// finish.
|
||||
///
|
||||
pub struct Rcu<V> {
|
||||
inner: RwLock<RcuInner<V>>,
|
||||
}
|
||||
|
||||
struct RcuInner<V> {
|
||||
current_cell: Arc<RcuCell<V>>,
|
||||
old_cells: Vec<Weak<RcuCell<V>>>,
|
||||
}
|
||||
|
||||
///
|
||||
/// RcuCell holds one value. It can be the latest one, or an old one.
|
||||
///
|
||||
struct RcuCell<V> {
|
||||
value: V,
|
||||
|
||||
/// A dummy channel. We never send anything to this channel. The point is
|
||||
/// that when the RcuCell is dropped, any cloned Senders will be notified
|
||||
/// that the channel is closed. Updaters can use this to wait out until the
|
||||
/// RcuCell has been dropped, i.e. until the old value is no longer in use.
|
||||
///
|
||||
/// We never do anything with the receiver, we just need to hold onto it so
|
||||
/// that the Senders will be notified when it's dropped. But because it's
|
||||
/// not Sync, we need a Mutex on it.
|
||||
watch: (SyncSender<()>, Mutex<Receiver<()>>),
|
||||
}
|
||||
|
||||
impl<V> RcuCell<V> {
|
||||
fn new(value: V) -> Self {
|
||||
let (watch_sender, watch_receiver) = sync_channel(0);
|
||||
RcuCell {
|
||||
value,
|
||||
watch: (watch_sender, Mutex::new(watch_receiver)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> Rcu<V> {
|
||||
/// Create a new `Rcu`, initialized to `starting_val`
|
||||
pub fn new(starting_val: V) -> Self {
|
||||
let inner = RcuInner {
|
||||
current_cell: Arc::new(RcuCell::new(starting_val)),
|
||||
old_cells: Vec::new(),
|
||||
};
|
||||
Self {
|
||||
inner: RwLock::new(inner),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Read current value. Any store() calls will block until the returned
|
||||
/// guard object is dropped.
|
||||
///
|
||||
pub fn read(&self) -> RcuReadGuard<V> {
|
||||
let current_cell = Arc::clone(&self.inner.read().unwrap().current_cell);
|
||||
RcuReadGuard { cell: current_cell }
|
||||
}
|
||||
|
||||
///
|
||||
/// Lock the current value for updating. Returns a guard object that can be
|
||||
/// used to read the current value, and to store a new value.
|
||||
///
|
||||
/// Note: holding the write-guard blocks concurrent readers, so you should
|
||||
/// finish the update and drop the guard quickly!
|
||||
///
|
||||
pub fn write(&self) -> RcuWriteGuard<'_, V> {
|
||||
let inner = self.inner.write().unwrap();
|
||||
RcuWriteGuard { inner }
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Read guard returned by `read`
|
||||
///
|
||||
pub struct RcuReadGuard<V> {
|
||||
cell: Arc<RcuCell<V>>,
|
||||
}
|
||||
|
||||
impl<V> Deref for RcuReadGuard<V> {
|
||||
type Target = V;
|
||||
|
||||
fn deref(&self) -> &V {
|
||||
&self.cell.value
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Read guard returned by `read`
|
||||
///
|
||||
pub struct RcuWriteGuard<'a, V> {
|
||||
inner: RwLockWriteGuard<'a, RcuInner<V>>,
|
||||
}
|
||||
|
||||
impl<'a, V> Deref for RcuWriteGuard<'a, V> {
|
||||
type Target = V;
|
||||
|
||||
fn deref(&self) -> &V {
|
||||
&self.inner.current_cell.value
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> RcuWriteGuard<'a, V> {
|
||||
///
|
||||
/// Store a new value. The new value will be written to the Rcu immediately,
|
||||
/// and will be immediately seen by any `read` calls that start afterwards.
|
||||
/// But if there are any readers still holding onto the old value, or any
|
||||
/// even older values, this will await until they have been released.
|
||||
///
|
||||
/// This will drop the write-guard before it starts waiting for the reads to
|
||||
/// finish, so a new write operation can begin before this functio returns.
|
||||
///
|
||||
pub fn store(mut self, new_val: V) {
|
||||
let new_cell = Arc::new(RcuCell::new(new_val));
|
||||
|
||||
let mut watches = Vec::new();
|
||||
{
|
||||
let old = std::mem::replace(&mut self.inner.current_cell, new_cell);
|
||||
self.inner.old_cells.push(Arc::downgrade(&old));
|
||||
|
||||
// cleanup old cells that no longer have any readers, and collect
|
||||
// the watches for any that do.
|
||||
self.inner.old_cells.retain(|weak| {
|
||||
if let Some(cell) = weak.upgrade() {
|
||||
watches.push(cell.watch.0.clone());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(self);
|
||||
|
||||
// after all the old_cells are no longer in use, we're done
|
||||
for w in watches.iter_mut() {
|
||||
// This will block until the Receiver is closed. That happens then
|
||||
// the RcuCell is dropped.
|
||||
#[allow(clippy::single_match)]
|
||||
match w.send(()) {
|
||||
Ok(_) => panic!("send() unexpectedly succeeded on dummy channel"),
|
||||
Err(_) => {
|
||||
// closed, which means that the cell has been dropped, and
|
||||
// its value is no longer in use
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::{sleep, spawn};
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
let rcu = Arc::new(Rcu::new(1));
|
||||
let log = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let a = rcu.read();
|
||||
assert_eq!(*a, 1);
|
||||
log.lock().unwrap().push("one");
|
||||
|
||||
let (rcu_clone, log_clone) = (Arc::clone(&rcu), Arc::clone(&log));
|
||||
let thread = spawn(move || {
|
||||
log_clone.lock().unwrap().push("store two start");
|
||||
let write_guard = rcu_clone.write();
|
||||
assert_eq!(*write_guard, 1);
|
||||
write_guard.store(2);
|
||||
log_clone.lock().unwrap().push("store two done");
|
||||
});
|
||||
// without this sleep the test can pass on accident if the writer is slow
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
// new read should see the new value
|
||||
let b = rcu.read();
|
||||
assert_eq!(*b, 2);
|
||||
|
||||
// old guard still sees the old value
|
||||
assert_eq!(*a, 1);
|
||||
|
||||
// Release the old guard. This lets the store in the thread to finish.
|
||||
log.lock().unwrap().push("release a");
|
||||
drop(a);
|
||||
|
||||
thread.join().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
log.lock().unwrap().as_slice(),
|
||||
&["one", "store two start", "release a", "store two done",]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -254,7 +254,8 @@ impl Repository {
|
||||
src_timeline
|
||||
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
|
||||
.context(format!(
|
||||
"invalid branch start lsn: less than latest GC cutoff {latest_gc_cutoff_lsn}"
|
||||
"invalid branch start lsn: less than latest GC cutoff {}",
|
||||
*latest_gc_cutoff_lsn
|
||||
))?;
|
||||
{
|
||||
let gc_info = src_timeline.gc_info.read().unwrap();
|
||||
@@ -290,7 +291,7 @@ impl Repository {
|
||||
dst_prev,
|
||||
Some(src),
|
||||
start_lsn,
|
||||
*src_timeline.latest_gc_cutoff_lsn.read().unwrap(),
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(),
|
||||
src_timeline.initdb_lsn,
|
||||
);
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?;
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use metrics::{
|
||||
@@ -46,6 +46,7 @@ use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
|
||||
use utils::{
|
||||
lsn::{AtomicLsn, Lsn, RecordLsn},
|
||||
seqwait::SeqWait,
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
@@ -367,7 +368,7 @@ pub struct Timeline {
|
||||
layer_removal_cs: Mutex<()>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
pub latest_gc_cutoff_lsn: RwLock<Lsn>,
|
||||
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
|
||||
|
||||
// List of child timelines and their branch points. This is needed to avoid
|
||||
// garbage collecting data that is still needed by the child timelines.
|
||||
@@ -478,8 +479,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Lock and get timeline's GC cuttof
|
||||
pub fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read().unwrap()
|
||||
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Look up given page version.
|
||||
@@ -594,7 +595,7 @@ impl Timeline {
|
||||
pub fn check_lsn_is_in_scope(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
@@ -729,7 +730,7 @@ impl Timeline {
|
||||
pitr_cutoff: Lsn(0),
|
||||
}),
|
||||
|
||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
|
||||
current_logical_size: AtomicI64::new(0),
|
||||
@@ -1377,7 +1378,7 @@ impl Timeline {
|
||||
ondisk_prev_record_lsn,
|
||||
ancestor_timelineid,
|
||||
self.ancestor_lsn,
|
||||
*self.latest_gc_cutoff_lsn.read().unwrap(),
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.initdb_lsn,
|
||||
);
|
||||
|
||||
@@ -2032,9 +2033,21 @@ impl Timeline {
|
||||
|
||||
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %new_gc_cutoff).entered();
|
||||
|
||||
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
|
||||
// See branch_timeline() for details.
|
||||
*self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff;
|
||||
// We need to ensure that no one tries to read page versions or create
|
||||
// branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
|
||||
// for details. This will block until the old value is no longer in use.
|
||||
//
|
||||
// The GC cutoff should only ever move forwards.
|
||||
{
|
||||
let write_guard = self.latest_gc_cutoff_lsn.write();
|
||||
ensure!(
|
||||
*write_guard <= new_gc_cutoff,
|
||||
"Cannot move GC cutoff LSN backwards (was {}, new {})",
|
||||
*write_guard,
|
||||
new_gc_cutoff
|
||||
);
|
||||
write_guard.store(new_gc_cutoff);
|
||||
}
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
|
||||
@@ -17,13 +17,14 @@ use std::io::{self, Read};
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
auth::{self, Claims, JwtAuth, Scope},
|
||||
lsn::Lsn,
|
||||
postgres_backend::{self, is_socket_read_timed_out, AuthType, PostgresBackend},
|
||||
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
|
||||
simple_rcu::RcuReadGuard,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
@@ -639,7 +640,7 @@ impl PageServerHandler {
|
||||
timeline: &Timeline,
|
||||
mut lsn: Lsn,
|
||||
latest: bool,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
) -> Result<Lsn> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
|
||||
Reference in New Issue
Block a user