diff --git a/Cargo.lock b/Cargo.lock index 86787b8f6a..4bdbd9108e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4564,6 +4564,7 @@ dependencies = [ "byteorder", "bytes", "criterion", + "either", "futures", "heapless", "hex", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 391bc52a80..549bf3262d 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -37,6 +37,7 @@ uuid = { version = "1.2", features = ["v4", "serde"] } metrics.workspace = true workspace_hack.workspace = true +either.workspace = true [dev-dependencies] byteorder.workspace = true diff --git a/libs/utils/src/seqwait.rs b/libs/utils/src/seqwait.rs index e3f0b505da..fd45cf2dd8 100644 --- a/libs/utils/src/seqwait.rs +++ b/libs/utils/src/seqwait.rs @@ -1,12 +1,13 @@ #![warn(missing_docs)] +use either::Either; use std::cmp::{Eq, Ordering, PartialOrd}; use std::collections::BinaryHeap; use std::fmt::Debug; use std::mem; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use std::time::Duration; -use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::time::timeout; /// An error happened while waiting for a number @@ -36,45 +37,48 @@ pub trait MonotonicCounter { } /// Internal components of a `SeqWait` -struct SeqWaitInt +struct SeqWaitInt where S: MonotonicCounter, V: Ord, + T: Clone, { - waiters: BinaryHeap>, + waiters: BinaryHeap>, current: S, shutdown: bool, + data: T, } -struct Waiter +struct Waiter where - T: Ord, + V: Ord, + T: Clone, { - wake_num: T, // wake me when this number arrives ... - wake_channel: Sender<()>, // ... by sending a message to this channel + wake_num: V, // wake me when this number arrives ... + wake_channel: Sender, // ... by sending a message to this channel } // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here // to get that. -impl PartialOrd for Waiter { +impl PartialOrd for Waiter { fn partial_cmp(&self, other: &Self) -> Option { other.wake_num.partial_cmp(&self.wake_num) } } -impl Ord for Waiter { +impl Ord for Waiter { fn cmp(&self, other: &Self) -> Ordering { other.wake_num.cmp(&self.wake_num) } } -impl PartialEq for Waiter { +impl PartialEq for Waiter { fn eq(&self, other: &Self) -> bool { other.wake_num == self.wake_num } } -impl Eq for Waiter {} +impl Eq for Waiter {} /// A tool for waiting on a sequence number /// @@ -92,25 +96,28 @@ impl Eq for Waiter {} /// /// means Storage, is type of counter that this storage exposes. /// -pub struct SeqWait +pub struct SeqWait where S: MonotonicCounter, V: Ord, + T: Clone, { - internal: Mutex>, + internal: Mutex>, } -impl SeqWait +impl SeqWait where S: MonotonicCounter + Copy, V: Ord + Copy, + T: Clone, { /// Create a new `SeqWait`, initialized to a particular number - pub fn new(starting_num: S) -> Self { + pub fn new(starting_num: S, data: T) -> Self { let internal = SeqWaitInt { waiters: BinaryHeap::new(), current: starting_num, shutdown: false, + data, }; SeqWait { internal: Mutex::new(internal), @@ -144,10 +151,13 @@ where /// /// This call won't complete until someone has called `advance` /// with a number greater than or equal to the one we're waiting for. - pub async fn wait_for(&self, num: V) -> Result<(), SeqWaitError> { + pub async fn wait_for(&self, num: V) -> Result { match self.queue_for_wait(num) { - Ok(None) => Ok(()), - Ok(Some(mut rx)) => rx.changed().await.map_err(|_| SeqWaitError::Shutdown), + Ok(Either::Left(data)) => Ok(data), + Ok(Either::Right(rx)) => match rx.await { + Err(_) => Err(SeqWaitError::Shutdown), + Ok(data) => Ok(data), + }, Err(e) => Err(e), } } @@ -163,11 +173,11 @@ where &self, num: V, timeout_duration: Duration, - ) -> Result<(), SeqWaitError> { + ) -> Result { match self.queue_for_wait(num) { - Ok(None) => Ok(()), - Ok(Some(mut rx)) => match timeout(timeout_duration, rx.changed()).await { - Ok(Ok(())) => Ok(()), + Ok(Either::Left(data)) => Ok(data), + Ok(Either::Right(rx)) => match timeout(timeout_duration, rx).await { + Ok(Ok(data)) => Ok(data), Ok(Err(_)) => Err(SeqWaitError::Shutdown), Err(_) => Err(SeqWaitError::Timeout), }, @@ -177,41 +187,47 @@ where /// Register and return a channel that will be notified when a number arrives, /// or None, if it has already arrived. - fn queue_for_wait(&self, num: V) -> Result>, SeqWaitError> { + fn queue_for_wait(&self, num: V) -> Result>, SeqWaitError> { let mut internal = self.internal.lock().unwrap(); if internal.current.cnt_value() >= num { - return Ok(None); + return Ok(Either::Left(internal.data.clone())); } if internal.shutdown { return Err(SeqWaitError::Shutdown); } // Create a new channel. - let (tx, rx) = channel(()); + let (tx, rx) = channel(); internal.waiters.push(Waiter { wake_num: num, wake_channel: tx, }); // Drop the lock as we exit this scope. - Ok(Some(rx)) + Ok(Either::Right(rx)) } /// Announce a new number has arrived /// /// All waiters at this value or below will be woken. /// + /// If `new_data` is Some(), it will update the internal data, + /// even if `num` is smaller than the internal counter. + /// It will not cause a wake-up though, in this case. + /// /// Returns the old number. - pub fn advance(&self, num: V) -> V { + pub fn advance(&self, num: V, new_data: Option) -> V { let old_value; - let wake_these = { + let (wake_these, with_data) = { let mut internal = self.internal.lock().unwrap(); + if let Some(new_data) = new_data { + internal.data = new_data; + } old_value = internal.current.cnt_value(); if old_value >= num { return old_value; } internal.current.cnt_advance(num); - // Pop all waiters <= num from the heap. Collect them in a vector, and // wake them up after releasing the lock. let mut wake_these = Vec::new(); @@ -221,13 +237,13 @@ where } wake_these.push(internal.waiters.pop().unwrap().wake_channel); } - wake_these + (wake_these, internal.data.clone()) }; for tx in wake_these { // This can fail if there are no receivers. // We don't care; discard the error. - let _ = tx.send(()); + let _ = tx.send(with_data.clone()); } old_value } @@ -236,6 +252,106 @@ where pub fn load(&self) -> S { self.internal.lock().unwrap().current } + + /// Split the seqwait into a part than can only do wait, + /// and another part that can do advance + wait. + /// + /// The wait-only part can be cloned, the advance part cannot be cloned. + /// This provides a single-producer multi-consumer scheme. + pub fn split_spmc(self) -> (Wait, Advance) { + let inner = Arc::new(self); + let w = Wait { + inner: inner.clone(), + }; + let a = Advance { inner }; + (w, a) + } +} + +/// See [`SeqWait::split_spmc`]. +pub struct Wait +where + S: MonotonicCounter + Copy, + V: Ord + Copy, + T: Clone, +{ + inner: Arc>, +} + +/// See [`SeqWait::split_spmc`]. +pub struct Advance +where + S: MonotonicCounter + Copy, + V: Ord + Copy, + T: Clone, +{ + inner: Arc>, +} + +impl Wait +where + S: MonotonicCounter + Copy, + V: Ord + Copy, + T: Clone, +{ + /// See [`SeqWait::wait_for`]. + pub async fn wait_for(&self, num: V) -> Result { + self.inner.wait_for(num).await + } + + /// See [`SeqWait::wait_for_timeout`]. + pub async fn wait_for_timeout( + &self, + num: V, + timeout_duration: Duration, + ) -> Result { + self.inner.wait_for_timeout(num, timeout_duration).await + } +} + +impl Advance +where + S: MonotonicCounter + Copy, + V: Ord + Copy, + T: Clone, +{ + /// See [`SeqWait::advance`]. + pub fn advance(&self, num: V, new_data: Option) -> V { + self.inner.advance(num, new_data) + } + + /// See [`SeqWait::wait_for`]. + pub async fn wait_for(&self, num: V) -> Result { + self.inner.wait_for(num).await + } + + /// See [`SeqWait::wait_for_timeout`]. + pub async fn wait_for_timeout( + &self, + num: V, + timeout_duration: Duration, + ) -> Result { + self.inner.wait_for_timeout(num, timeout_duration).await + } + + /// Get a `Clone::clone` of the current data inside the seqwait. + pub fn get_current_data(&self) -> (V, T) { + let inner = self.inner.internal.lock().unwrap(); + (inner.current.cnt_value(), inner.data.clone()) + } +} + +impl Clone for Wait +where + S: MonotonicCounter + Copy, + V: Ord + Copy, + T: Clone, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } } #[cfg(test)] @@ -256,12 +372,12 @@ mod tests { #[tokio::test] async fn seqwait() { - let seq = Arc::new(SeqWait::new(0)); + let seq = Arc::new(SeqWait::new(0, ())); let seq2 = Arc::clone(&seq); let seq3 = Arc::clone(&seq); let jh1 = tokio::task::spawn(async move { seq2.wait_for(42).await.expect("wait_for 42"); - let old = seq2.advance(100); + let old = seq2.advance(100, None); assert_eq!(old, 99); seq2.wait_for_timeout(999, Duration::from_millis(100)) .await @@ -272,12 +388,12 @@ mod tests { seq3.wait_for(0).await.expect("wait_for 0"); }); tokio::time::sleep(Duration::from_millis(200)).await; - let old = seq.advance(99); + let old = seq.advance(99, None); assert_eq!(old, 0); seq.wait_for(100).await.expect("wait_for 100"); // Calling advance with a smaller value is a no-op - assert_eq!(seq.advance(98), 100); + assert_eq!(seq.advance(98, None), 100); assert_eq!(seq.load(), 100); jh1.await.unwrap(); @@ -288,7 +404,7 @@ mod tests { #[tokio::test] async fn seqwait_timeout() { - let seq = Arc::new(SeqWait::new(0)); + let seq = Arc::new(SeqWait::new(0, ())); let seq2 = Arc::clone(&seq); let jh = tokio::task::spawn(async move { let timeout = Duration::from_millis(1); @@ -298,10 +414,104 @@ mod tests { tokio::time::sleep(Duration::from_millis(200)).await; // This will attempt to wake, but nothing will happen // because the waiter already dropped its Receiver. - let old = seq.advance(99); + let old = seq.advance(99, None); assert_eq!(old, 0); jh.await.unwrap(); seq.shutdown(); } + + #[tokio::test] + async fn data_basic() { + let seq = Arc::new(SeqWait::new(0, "a")); + let seq2 = Arc::clone(&seq); + let jh = tokio::task::spawn(async move { + let data = seq.wait_for(2).await.unwrap(); + assert_eq!(data, "b"); + }); + seq2.advance(1, Some("x")); + seq2.advance(2, Some("b")); + jh.await.unwrap(); + } + + #[test] + fn data_always_most_recent() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + let seq = Arc::new(SeqWait::new(0, "a")); + let seq2 = Arc::clone(&seq); + + let jh = rt.spawn(async move { + let data = seq.wait_for(2).await.unwrap(); + assert_eq!(data, "d"); + }); + + // jh is not running until we poll it, thanks to current thread runtime + + rt.block_on(async move { + seq2.advance(2, Some("b")); + seq2.advance(3, Some("c")); + seq2.advance(4, Some("d")); + }); + + rt.block_on(jh).unwrap(); + } + + #[tokio::test] + async fn split_spmc_api_surface() { + let seq = SeqWait::new(0, 1); + let (w, a) = seq.split_spmc(); + + let _ = w.wait_for(1); + let _ = w.wait_for_timeout(0, Duration::from_secs(10)); + let _ = w.clone(); + + let _ = a.advance(1, None); + let _ = a.wait_for(1); + let _ = a.wait_for_timeout(0, Duration::from_secs(10)); + + // TODO would be nice to have must-not-compile tests for Advance not being clonable. + } + + #[tokio::test] + async fn new_data_same_lsn() { + let seq = Arc::new(SeqWait::new(0, "a")); + + seq.advance(1, Some("b")); + let data = seq.wait_for(1).await.unwrap(); + assert_eq!(data, "b", "the regular case where lsn and data advance"); + + seq.advance(1, Some("c")); + let data = seq.wait_for(1).await.unwrap(); + assert_eq!( + data, "c", + "no lsn advance still gives new data for old lsn wait_for's" + ); + + let (start_wait_for_sender, start_wait_for_receiver) = tokio::sync::oneshot::channel(); + // ensure we don't wake waiters for data-only change + let jh = tokio::spawn({ + let seq = seq.clone(); + async move { + start_wait_for_receiver.await.unwrap(); + match tokio::time::timeout(Duration::from_secs(2), seq.wait_for(2)).await { + Ok(_) => { + assert!( + false, + "advance should not wake waiters if data changes but LSN doesn't" + ); + } + Err(_) => { + // Good, we weren't woken up. + } + } + } + }); + + seq.advance(1, Some("d")); + start_wait_for_sender.send(()).unwrap(); + jh.await.unwrap(); + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 29d8b544cc..f1daa416d9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -145,7 +145,7 @@ pub struct Timeline { // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the // first WAL record when the node is started up. But here, we just // keep track of it. - last_record_lsn: SeqWait, + last_record_lsn: SeqWait, // All WAL records have been processed and stored durably on files on // local disk, up to this LSN. On crash and restart, we need to re-process @@ -1270,10 +1270,13 @@ impl Timeline { remote_client: remote_client.map(Arc::new), // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. - last_record_lsn: SeqWait::new(RecordLsn { - last: disk_consistent_lsn, - prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)), - }), + last_record_lsn: SeqWait::new( + RecordLsn { + last: disk_consistent_lsn, + prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)), + }, + (), + ), disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0), last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0), @@ -2420,7 +2423,7 @@ impl Timeline { assert!(new_lsn.is_aligned()); self.metrics.last_record_gauge.set(new_lsn.0 as i64); - self.last_record_lsn.advance(new_lsn); + self.last_record_lsn.advance(new_lsn, None); } fn freeze_inmem_layer(&self, write_lock_held: bool) {